This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6f30b29662 [Hotfix]Fix mongodb cdc e2e instability (#5128)
6f30b29662 is described below

commit 6f30b296627c9213026e8d502d9052646910e542
Author: monster <[email protected]>
AuthorDate: Fri Jul 21 15:00:13 2023 +0800

    [Hotfix]Fix mongodb cdc e2e instability (#5128)
    
    Co-authored-by: chenzy15 <[email protected]>
---
 .../seatunnel/cdc/mongodb/config/MongodbSourceOptions.java   |  5 +++--
 .../src/test/java/mongodb/MongodbCDCIT.java                  | 12 ++++++++++--
 .../src/test/resources/mongodbcdc_to_mysql.conf              |  6 +-----
 3 files changed, 14 insertions(+), 9 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceOptions.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceOptions.java
index df73772e07..dac939777f 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceOptions.java
@@ -19,6 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config;
 
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.SingleChoiceOption;
 import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
 import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
 import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
@@ -234,7 +235,7 @@ public class MongodbSourceOptions extends SourceOptions {
                     .withDescription(
                             "Decides if the table options contains Debezium 
client properties that start with prefix 'debezium'.");
 
-    public static final Option<StartupMode> STARTUP_MODE =
+    public static final SingleChoiceOption<StartupMode> STARTUP_MODE =
             Options.key(SourceOptions.STARTUP_MODE_KEY)
                     .singleChoice(
                             StartupMode.class,
@@ -245,7 +246,7 @@ public class MongodbSourceOptions extends SourceOptions {
                             "Optional startup mode for CDC source, valid 
enumerations are "
                                     + "\"initial\", \"earliest\", \"latest\", 
\"timestamp\"\n or \"specific\"");
 
-    public static final Option<StopMode> STOP_MODE =
+    public static final SingleChoiceOption<StopMode> STOP_MODE =
             Options.key(SourceOptions.STOP_MODE_KEY)
                     .singleChoice(StopMode.class, 
Collections.singletonList(StopMode.NEVER))
                     .defaultValue(StopMode.NEVER)
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
index dd7f985f17..c01b36ef18 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
@@ -34,7 +34,9 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.TestTemplate;
 import org.testcontainers.containers.Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
 
 import com.mongodb.client.MongoClient;
 import com.mongodb.client.MongoClients;
@@ -78,7 +80,7 @@ public class MongodbCDCIT extends TestSuiteBase implements 
TestResource {
 
     // 
----------------------------------------------------------------------------
     // mysql
-    private static final String MYSQL_HOST = "mysql_cdc_e2e";
+    private static final String MYSQL_HOST = "mysql_e2e";
 
     private static final String MYSQL_USER_NAME = "st_user";
 
@@ -104,8 +106,10 @@ public class MongodbCDCIT extends TestSuiteBase implements 
TestResource {
         mySqlContainer.withDatabaseName(MYSQL_DATABASE);
         mySqlContainer.withUsername(MYSQL_USER_NAME);
         mySqlContainer.withPassword(MYSQL_USER_PASSWORD);
+        mySqlContainer.withLogConsumer(
+                new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger("Mysql-Docker-Image")));
         // For local test use
-        // 
mySqlContainer.setPortBindings(Collections.singletonList("3308:3306"));
+        mySqlContainer.setPortBindings(Collections.singletonList("3310:3306"));
         return mySqlContainer;
     }
 
@@ -134,6 +138,9 @@ public class MongodbCDCIT extends TestSuiteBase implements 
TestResource {
         mongodbContainer = new MongoDBContainer(NETWORK);
         // For local test use
         
mongodbContainer.setPortBindings(Collections.singletonList("27017:27017"));
+        mongodbContainer.withLogConsumer(
+                new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger("Mongodb-Docker-Image")));
+
         Startables.deepStart(Stream.of(mongodbContainer)).join();
         
mongodbContainer.executeCommandFileInSeparateDatabase(MONGODB_DATABASE);
         initConnection();
@@ -213,6 +220,7 @@ public class MongodbCDCIT extends TestSuiteBase implements 
TestResource {
                 for (int i = 1; i <= columnCount; i++) {
                     objects.add(resultSet.getObject(i));
                 }
+                log.info("Print mysql sink data:" + objects);
                 result.add(objects);
             }
             return result;
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_to_mysql.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_to_mysql.conf
index 7e4a492390..12846c6a0c 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_to_mysql.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_to_mysql.conf
@@ -14,9 +14,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-######
-###### This config file is a demonstration of streaming processing in 
seatunnel config
-######
 
 env {
   # You can set engine configuration here
@@ -45,11 +42,10 @@ source {
 
 sink {
   jdbc {
-    url = 
"jdbc:mysql://mysql_cdc_e2e:3306?useSSL=false&useUnicode=true&characterEncoding=UTF-8&allowPublicKeyRetrieval=false&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8"
+    url = "jdbc:mysql://mysql_e2e:3306/mongodb_cdc"
     driver = "com.mysql.cj.jdbc.Driver"
     user = "st_user"
     password = "seatunnel"
-
     generate_sink_sql = true
     # You need to configure both database and table
     database = mongodb_cdc

Reply via email to