PatrickRen commented on code in PR #3124:
URL: https://github.com/apache/flink-cdc/pull/3124#discussion_r1557171918


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/pom.xml:
##########
@@ -24,8 +24,9 @@ limitations under the License.
         <version>${revision}</version>
     </parent>
     <properties>
+        <oblogclient.version>1.1.1</oblogclient.version>
         <!-- Because of oceanbase docker image can not expose port quickly, so 
we need to specify testcontainers version to 1.15.3 -->
-        <jdbc.version>1.15.3</jdbc.version>
+        <testcontainers.version>1.15.3</testcontainers.version>

Review Comment:
   What about reusing the property in parent pom?



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSource.java:
##########
@@ -186,23 +186,44 @@ public Builder<T> obcdcProperties(Properties 
obcdcProperties) {
             return this;
         }
 
-        public Builder<T> deserializer(OceanBaseDeserializationSchema<T> 
deserializer) {
+        public Builder<T> debeziumProperties(Properties debeziumProperties) {
+            this.debeziumProperties = debeziumProperties;
+            return this;
+        }
+
+        public Builder<T> deserializer(DebeziumDeserializationSchema<T> 
deserializer) {
             this.deserializer = deserializer;
             return this;
         }
 
         public SourceFunction<T> build() {
-            switch (startupMode) {
-                case INITIAL:
-                    checkNotNull(hostname, "hostname shouldn't be null on 
startup mode 'initial'");
-                    checkNotNull(port, "port shouldn't be null on startup mode 
'initial'");
-                    checkNotNull(
-                            compatibleMode,
-                            "compatibleMode shouldn't be null on startup mode 
'initial'");
-                    checkNotNull(
-                            jdbcDriver, "jdbcDriver shouldn't be null on 
startup mode 'initial'");
-                    startupTimestamp = 0L;
+            checkNotNull(username, "username shouldn't be null");
+            checkNotNull(password, "password shouldn't be null");
+            checkNotNull(hostname, "hostname shouldn't be null");
+            checkNotNull(port, "port shouldn't be null");
+
+            if (startupOptions == null) {
+                startupOptions = StartupOptions.initial();
+            }
+            if (compatibleMode == null) {
+                compatibleMode = "mysql";
+            }
+            if (jdbcDriver == null) {
+                jdbcDriver = "com.mysql.cj.jdbc.Driver";
+            }
+
+            if (connectTimeout == null) {
+                connectTimeout = Duration.ofSeconds(30);
+            }
+
+            if (serverTimeZone == null) {
+                serverTimeZone = "+00:00";

Review Comment:
   Is it possible to use the system timezone as default?



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSource.java:
##########
@@ -151,7 +152,7 @@ public Builder<T> logProxyHost(String logProxyHost) {
             return this;
         }
 
-        public Builder<T> logProxyPort(int logProxyPort) {
+        public Builder<T> logProxyPort(Integer logProxyPort) {

Review Comment:
   If `logProxyPort` is not nullable, what about using `int` instead of 
`Integer`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to