This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e0d8519a9 [Hotfix][Connector-v2][kafka] Fix the short interval of pull 
data settings and revise the format (#4875)
e0d8519a9 is described below

commit e0d8519a9d7b6dfac29c1daf4199b6fe87781d0a
Author: monster <[email protected]>
AuthorDate: Fri Jun 2 16:40:06 2023 +0800

    [Hotfix][Connector-v2][kafka] Fix the short interval of pull data settings 
and revise the format (#4875)
---
 .../e2e/connector/kafka/CanalToKafkaIT.java        | 55 ++++++++++++----------
 .../seatunnel/e2e/connector/kafka/KafkaIT.java     | 11 +++--
 .../kafka_source_canal_cdc_to_pgsql.conf}          |  4 --
 .../kafka_source_canal_to_kafka.conf}              |  3 --
 .../kafka_source_json_to_console.conf}             | 10 ----
 .../kafka_default_sink_fake_to_kafka.conf          |  3 --
 ...to_kafka.conf => kafka_sink_fake_to_kafka.conf} |  3 --
 .../fake_source_to_text_sink_kafka.conf}           |  3 --
 .../kafka_source_text_to_console.conf}             | 11 -----
 9 files changed, 36 insertions(+), 67 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/CanalToKafkaIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/CanalToKafkaIT.java
index f63381d19..0d8bb567a 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/CanalToKafkaIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/CanalToKafkaIT.java
@@ -65,11 +65,13 @@ import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
+import static org.awaitility.Awaitility.await;
 import static org.awaitility.Awaitility.given;
 
 @DisabledOnContainer(
         value = {},
-        type = {EngineType.SPARK})
+        type = {EngineType.SPARK},
+        disabledReason = "Spark engine will lose the row kind of record")
 public class CanalToKafkaIT extends TestSuiteBase implements TestResource {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(CanalToKafkaIT.class);
@@ -80,8 +82,6 @@ public class CanalToKafkaIT extends TestSuiteBase implements 
TestResource {
 
     private static final String CANAL_HOST = "canal_e2e";
 
-    private static final int CANAL_PORT = 11111;
-
     // 
----------------------------------------------------------------------------
     // kafka
     private static final String KAFKA_IMAGE_NAME = 
"confluentinc/cp-kafka:7.0.9";
@@ -98,7 +98,7 @@ public class CanalToKafkaIT extends TestSuiteBase implements 
TestResource {
     // mysql
     private static final String MYSQL_HOST = "mysql_e2e";
 
-    private static final MySqlContainer MYSQL_CONTAINER = 
createMySqlContainer(MySqlVersion.V8_0);
+    private static final MySqlContainer MYSQL_CONTAINER = 
createMySqlContainer();
 
     private final UniqueDatabase inventoryDatabase =
             new UniqueDatabase(MYSQL_CONTAINER, "canal", "mysqluser", 
"mysqlpw");
@@ -124,18 +124,16 @@ public class CanalToKafkaIT extends TestSuiteBase 
implements TestResource {
                 Assertions.assertEquals(0, extraCommands.getExitCode());
             };
 
-    private static MySqlContainer createMySqlContainer(MySqlVersion version) {
-        MySqlContainer mySqlContainer =
-                new MySqlContainer(version)
-                        
.withConfigurationOverride("docker/server-gtids/my.cnf")
-                        .withSetupSQL("docker/setup.sql")
-                        .withNetwork(NETWORK)
-                        .withNetworkAliases(MYSQL_HOST)
-                        .withDatabaseName("canal")
-                        .withUsername("st_user")
-                        .withPassword("seatunnel")
-                        .withLogConsumer(new Slf4jLogConsumer(LOG));
-        return mySqlContainer;
+    private static MySqlContainer createMySqlContainer() {
+        return new MySqlContainer(MySqlVersion.V8_0)
+                .withConfigurationOverride("docker/server-gtids/my.cnf")
+                .withSetupSQL("docker/setup.sql")
+                .withNetwork(NETWORK)
+                .withNetworkAliases(MYSQL_HOST)
+                .withDatabaseName("canal")
+                .withUsername("st_user")
+                .withPassword("seatunnel")
+                .withLogConsumer(new Slf4jLogConsumer(LOG));
     }
 
     private void createCanalContainer() {
@@ -165,7 +163,7 @@ public class CanalToKafkaIT extends TestSuiteBase 
implements TestResource {
                                         
DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME)));
     }
 
-    private void createPostgreSQLContainer() throws ClassNotFoundException {
+    private void createPostgreSQLContainer() {
         POSTGRESQL_CONTAINER =
                 new PostgreSQLContainer<>(DockerImageName.parse(PG_IMAGE))
                         .withNetwork(NETWORK)
@@ -219,9 +217,9 @@ public class CanalToKafkaIT extends TestSuiteBase 
implements TestResource {
     @TestTemplate
     public void testKafkaSinkCanalFormat(TestContainer container)
             throws IOException, InterruptedException {
-        Container.ExecResult execResult = 
container.executeJob("/kafkasource_canal_to_kafka.conf");
+        Container.ExecResult execResult =
+                
container.executeJob("/canalFormatIT/kafka_source_canal_to_kafka.conf");
         Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
-        ArrayList<Object> result = new ArrayList<>();
         List<String> expectedResult =
                 Arrays.asList(
                         
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel 
scooter\",\"weight\":\"3.14\"},\"type\":\"INSERT\"}",
@@ -239,22 +237,27 @@ public class CanalToKafkaIT extends TestSuiteBase 
implements TestResource {
                         
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted 
rocks\",\"weight\":\"7.88\"},\"type\":\"INSERT\"}",
                         "{\"data\":{\"id\":109,\"name\":\"spare 
tire\",\"description\":\"24 inch spare 
tire\",\"weight\":\"22.2\"},\"type\":\"DELETE\"}");
 
+        ArrayList<String> result = new ArrayList<>();
         ArrayList<String> topics = new ArrayList<>();
         topics.add(KAFKA_TOPIC);
         kafkaConsumer.subscribe(topics);
-        ConsumerRecords<String, String> consumerRecords =
-                kafkaConsumer.poll(Duration.ofSeconds(10000));
-        for (ConsumerRecord<String, String> record : consumerRecords) {
-            result.add(record.value());
-        }
-        Assertions.assertEquals(expectedResult, result);
+        await().atMost(60000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            ConsumerRecords<String, String> consumerRecords =
+                                    
kafkaConsumer.poll(Duration.ofMillis(1000));
+                            for (ConsumerRecord<String, String> record : 
consumerRecords) {
+                                result.add(record.value());
+                            }
+                            Assertions.assertEquals(expectedResult, result);
+                        });
     }
 
     @TestTemplate
     public void testCanalFormatKafkaCdcToPgsql(TestContainer container)
             throws IOException, InterruptedException, SQLException {
         Container.ExecResult execResult =
-                container.executeJob("/kafkasource_canal_cdc_to_pgsql.conf");
+                
container.executeJob("/canalFormatIT/kafka_source_canal_cdc_to_pgsql.conf");
         Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
         List<Object> actual = new ArrayList<>();
         try (Connection connection =
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index 1055adea5..922798c3d 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -137,7 +137,7 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
 
     @TestTemplate
     public void testSinkKafka(TestContainer container) throws IOException, 
InterruptedException {
-        Container.ExecResult execResult = 
container.executeJob("/kafkasink_fake_to_kafka.conf");
+        Container.ExecResult execResult = 
container.executeJob("/kafka_sink_fake_to_kafka.conf");
         Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
 
         String topicName = "test_topic";
@@ -153,7 +153,8 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
     @TestTemplate
     public void testTextFormatSinkKafka(TestContainer container)
             throws IOException, InterruptedException {
-        Container.ExecResult execResult = 
container.executeJob("/kafkaTextsink_fake_to_kafka.conf");
+        Container.ExecResult execResult =
+                
container.executeJob("/textFormatIT/fake_source_to_text_sink_kafka.conf");
         Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
 
         String topicName = "test_text_topic";
@@ -201,7 +202,8 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
                 row -> new ProducerRecord<>("test_topic_text", null, 
serializer.serialize(row)),
                 0,
                 100);
-        Container.ExecResult execResult = 
container.executeJob("/kafkasource_text_to_console.conf");
+        Container.ExecResult execResult =
+                
container.executeJob("/textFormatIT/kafka_source_text_to_console.conf");
         Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
     }
 
@@ -215,7 +217,8 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
                         DEFAULT_FORMAT,
                         DEFAULT_FIELD_DELIMITER);
         generateTestData(row -> serializer.serializeRow(row), 0, 100);
-        Container.ExecResult execResult = 
container.executeJob("/kafkasource_json_to_console.conf");
+        Container.ExecResult execResult =
+                
container.executeJob("/jsonFormatIT/kafka_source_json_to_console.conf");
         Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
     }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_cdc_to_pgsql.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/canalFormatIT/kafka_source_canal_cdc_to_pgsql.conf
similarity index 93%
rename from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_cdc_to_pgsql.conf
rename to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/canalFormatIT/kafka_source_canal_cdc_to_pgsql.conf
index cd393455f..9ce69a234 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_cdc_to_pgsql.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/canalFormatIT/kafka_source_canal_cdc_to_pgsql.conf
@@ -14,9 +14,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-######
-###### This config file is a demonstration of streaming processing in 
seatunnel config
-######
 
 env {
   execution.parallelism = 1
@@ -48,7 +45,6 @@ source {
   }
 }
 
-
 sink {
   Jdbc {
     driver = org.postgresql.Driver
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_to_kafka.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/canalFormatIT/kafka_source_canal_to_kafka.conf
similarity index 93%
rename from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_to_kafka.conf
rename to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/canalFormatIT/kafka_source_canal_to_kafka.conf
index c229c88f9..c51238c70 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_to_kafka.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/canalFormatIT/kafka_source_canal_to_kafka.conf
@@ -14,9 +14,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-######
-###### This config file is a demonstration of streaming processing in 
seatunnel config
-######
 
 env {
   execution.parallelism = 1
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_json_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/jsonFormatIT/kafka_source_json_to_console.conf
similarity index 87%
rename from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_json_to_console.conf
rename to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/jsonFormatIT/kafka_source_json_to_console.conf
index 02209f261..90b1a3ad5 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_json_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/jsonFormatIT/kafka_source_json_to_console.conf
@@ -14,9 +14,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-######
-###### This config file is a demonstration of streaming processing in 
seatunnel config
-######
 
 env {
   execution.parallelism = 1
@@ -57,12 +54,6 @@ source {
       }
     }
   }
-
-  # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
-  # please go to 
https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
-}
-
-transform {
 }
 
 sink {
@@ -93,6 +84,5 @@ sink {
           }
         ]
       }
-
   }
 }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka_default_sink_fake_to_kafka.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka_default_sink_fake_to_kafka.conf
index a63214874..d47dd9e35 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka_default_sink_fake_to_kafka.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka_default_sink_fake_to_kafka.conf
@@ -56,9 +56,6 @@ source {
   }
 }
 
-transform {
-}
-
 sink {
   Kafka {
     bootstrap.servers = "kafkaCluster:9092"
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasink_fake_to_kafka.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka_sink_fake_to_kafka.conf
similarity index 99%
rename from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasink_fake_to_kafka.conf
rename to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka_sink_fake_to_kafka.conf
index fe344a71d..83d931e6c 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasink_fake_to_kafka.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka_sink_fake_to_kafka.conf
@@ -56,9 +56,6 @@ source {
   }
 }
 
-transform {
-}
-
 sink {
   Kafka {
     bootstrap.servers = "kafkaCluster:9092"
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkaTextsink_fake_to_kafka.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/fake_source_to_text_sink_kafka.conf
similarity index 99%
rename from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkaTextsink_fake_to_kafka.conf
rename to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/fake_source_to_text_sink_kafka.conf
index 17ec327b3..ecc38bf4d 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkaTextsink_fake_to_kafka.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/fake_source_to_text_sink_kafka.conf
@@ -56,9 +56,6 @@ source {
   }
 }
 
-transform {
-}
-
 sink {
   Kafka {
     bootstrap.servers = "kafkaCluster:9092"
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_text_to_console.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console.conf
similarity index 86%
rename from 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_text_to_console.conf
rename to 
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console.conf
index d794a0e6a..4c2783cd3 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_text_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console.conf
@@ -14,9 +14,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-######
-###### This config file is a demonstration of streaming processing in 
seatunnel config
-######
 
 env {
   execution.parallelism = 1
@@ -57,15 +54,8 @@ source {
       }
     }
     format = text
-    # The default field delimiter is ","
     field_delimiter = ","
   }
-
-  # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
-  # please go to 
https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
-}
-
-transform {
 }
 
 sink {
@@ -96,6 +86,5 @@ sink {
           }
         ]
       }
-
   }
 }
\ No newline at end of file

Reply via email to