This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 6f30b29662 [Hotfix]Fix mongodb cdc e2e instability (#5128)
6f30b29662 is described below
commit 6f30b296627c9213026e8d502d9052646910e542
Author: monster <[email protected]>
AuthorDate: Fri Jul 21 15:00:13 2023 +0800
[Hotfix]Fix mongodb cdc e2e instability (#5128)
Co-authored-by: chenzy15 <[email protected]>
---
.../seatunnel/cdc/mongodb/config/MongodbSourceOptions.java | 5 +++--
.../src/test/java/mongodb/MongodbCDCIT.java | 12 ++++++++++--
.../src/test/resources/mongodbcdc_to_mysql.conf | 6 +-----
3 files changed, 14 insertions(+), 9 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceOptions.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceOptions.java
index df73772e07..dac939777f 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceOptions.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/config/MongodbSourceOptions.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.SingleChoiceOption;
import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
import org.apache.seatunnel.connectors.cdc.base.option.StopMode;
@@ -234,7 +235,7 @@ public class MongodbSourceOptions extends SourceOptions {
.withDescription(
"Decides if the table options contains Debezium
client properties that start with prefix 'debezium'.");
- public static final Option<StartupMode> STARTUP_MODE =
+ public static final SingleChoiceOption<StartupMode> STARTUP_MODE =
Options.key(SourceOptions.STARTUP_MODE_KEY)
.singleChoice(
StartupMode.class,
@@ -245,7 +246,7 @@ public class MongodbSourceOptions extends SourceOptions {
"Optional startup mode for CDC source, valid
enumerations are "
+ "\"initial\", \"earliest\", \"latest\",
\"timestamp\"\n or \"specific\"");
- public static final Option<StopMode> STOP_MODE =
+ public static final SingleChoiceOption<StopMode> STOP_MODE =
Options.key(SourceOptions.STOP_MODE_KEY)
.singleChoice(StopMode.class,
Collections.singletonList(StopMode.NEVER))
.defaultValue(StopMode.NEVER)
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
index dd7f985f17..c01b36ef18 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
@@ -34,7 +34,9 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
@@ -78,7 +80,7 @@ public class MongodbCDCIT extends TestSuiteBase implements
TestResource {
//
----------------------------------------------------------------------------
// mysql
- private static final String MYSQL_HOST = "mysql_cdc_e2e";
+ private static final String MYSQL_HOST = "mysql_e2e";
private static final String MYSQL_USER_NAME = "st_user";
@@ -104,8 +106,10 @@ public class MongodbCDCIT extends TestSuiteBase implements
TestResource {
mySqlContainer.withDatabaseName(MYSQL_DATABASE);
mySqlContainer.withUsername(MYSQL_USER_NAME);
mySqlContainer.withPassword(MYSQL_USER_PASSWORD);
+ mySqlContainer.withLogConsumer(
+ new
Slf4jLogConsumer(DockerLoggerFactory.getLogger("Mysql-Docker-Image")));
// For local test use
- //
mySqlContainer.setPortBindings(Collections.singletonList("3308:3306"));
+ mySqlContainer.setPortBindings(Collections.singletonList("3310:3306"));
return mySqlContainer;
}
@@ -134,6 +138,9 @@ public class MongodbCDCIT extends TestSuiteBase implements
TestResource {
mongodbContainer = new MongoDBContainer(NETWORK);
// For local test use
mongodbContainer.setPortBindings(Collections.singletonList("27017:27017"));
+ mongodbContainer.withLogConsumer(
+ new
Slf4jLogConsumer(DockerLoggerFactory.getLogger("Mongodb-Docker-Image")));
+
Startables.deepStart(Stream.of(mongodbContainer)).join();
mongodbContainer.executeCommandFileInSeparateDatabase(MONGODB_DATABASE);
initConnection();
@@ -213,6 +220,7 @@ public class MongodbCDCIT extends TestSuiteBase implements
TestResource {
for (int i = 1; i <= columnCount; i++) {
objects.add(resultSet.getObject(i));
}
+ log.info("Print mysql sink data:" + objects);
result.add(objects);
}
return result;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_to_mysql.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_to_mysql.conf
index 7e4a492390..12846c6a0c 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_to_mysql.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/resources/mongodbcdc_to_mysql.conf
@@ -14,9 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-######
-###### This config file is a demonstration of streaming processing in
seatunnel config
-######
env {
# You can set engine configuration here
@@ -45,11 +42,10 @@ source {
sink {
jdbc {
- url =
"jdbc:mysql://mysql_cdc_e2e:3306?useSSL=false&useUnicode=true&characterEncoding=UTF-8&allowPublicKeyRetrieval=false&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8"
+ url = "jdbc:mysql://mysql_e2e:3306/mongodb_cdc"
driver = "com.mysql.cj.jdbc.Driver"
user = "st_user"
password = "seatunnel"
-
generate_sink_sql = true
# You need to configure both database and table
database = mongodb_cdc