This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e0d8519a9 [Hotfix][Connector-v2][kafka] Fix the short interval of pull
data settings and revise the format (#4875)
e0d8519a9 is described below
commit e0d8519a9d7b6dfac29c1daf4199b6fe87781d0a
Author: monster <[email protected]>
AuthorDate: Fri Jun 2 16:40:06 2023 +0800
[Hotfix][Connector-v2][kafka] Fix the short interval of pull data settings
and revise the format (#4875)
---
.../e2e/connector/kafka/CanalToKafkaIT.java | 55 ++++++++++++----------
.../seatunnel/e2e/connector/kafka/KafkaIT.java | 11 +++--
.../kafka_source_canal_cdc_to_pgsql.conf} | 4 --
.../kafka_source_canal_to_kafka.conf} | 3 --
.../kafka_source_json_to_console.conf} | 10 ----
.../kafka_default_sink_fake_to_kafka.conf | 3 --
...to_kafka.conf => kafka_sink_fake_to_kafka.conf} | 3 --
.../fake_source_to_text_sink_kafka.conf} | 3 --
.../kafka_source_text_to_console.conf} | 11 -----
9 files changed, 36 insertions(+), 67 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/CanalToKafkaIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/CanalToKafkaIT.java
index f63381d19..0d8bb567a 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/CanalToKafkaIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/CanalToKafkaIT.java
@@ -65,11 +65,13 @@ import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
+import static org.awaitility.Awaitility.await;
import static org.awaitility.Awaitility.given;
@DisabledOnContainer(
value = {},
- type = {EngineType.SPARK})
+ type = {EngineType.SPARK},
+ disabledReason = "Spark engine will lose the row kind of record")
public class CanalToKafkaIT extends TestSuiteBase implements TestResource {
private static final Logger LOG =
LoggerFactory.getLogger(CanalToKafkaIT.class);
@@ -80,8 +82,6 @@ public class CanalToKafkaIT extends TestSuiteBase implements
TestResource {
private static final String CANAL_HOST = "canal_e2e";
- private static final int CANAL_PORT = 11111;
-
//
----------------------------------------------------------------------------
// kafka
private static final String KAFKA_IMAGE_NAME =
"confluentinc/cp-kafka:7.0.9";
@@ -98,7 +98,7 @@ public class CanalToKafkaIT extends TestSuiteBase implements
TestResource {
// mysql
private static final String MYSQL_HOST = "mysql_e2e";
- private static final MySqlContainer MYSQL_CONTAINER =
createMySqlContainer(MySqlVersion.V8_0);
+ private static final MySqlContainer MYSQL_CONTAINER =
createMySqlContainer();
private final UniqueDatabase inventoryDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "canal", "mysqluser",
"mysqlpw");
@@ -124,18 +124,16 @@ public class CanalToKafkaIT extends TestSuiteBase
implements TestResource {
Assertions.assertEquals(0, extraCommands.getExitCode());
};
- private static MySqlContainer createMySqlContainer(MySqlVersion version) {
- MySqlContainer mySqlContainer =
- new MySqlContainer(version)
-
.withConfigurationOverride("docker/server-gtids/my.cnf")
- .withSetupSQL("docker/setup.sql")
- .withNetwork(NETWORK)
- .withNetworkAliases(MYSQL_HOST)
- .withDatabaseName("canal")
- .withUsername("st_user")
- .withPassword("seatunnel")
- .withLogConsumer(new Slf4jLogConsumer(LOG));
- return mySqlContainer;
+ private static MySqlContainer createMySqlContainer() {
+ return new MySqlContainer(MySqlVersion.V8_0)
+ .withConfigurationOverride("docker/server-gtids/my.cnf")
+ .withSetupSQL("docker/setup.sql")
+ .withNetwork(NETWORK)
+ .withNetworkAliases(MYSQL_HOST)
+ .withDatabaseName("canal")
+ .withUsername("st_user")
+ .withPassword("seatunnel")
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
}
private void createCanalContainer() {
@@ -165,7 +163,7 @@ public class CanalToKafkaIT extends TestSuiteBase
implements TestResource {
DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME)));
}
- private void createPostgreSQLContainer() throws ClassNotFoundException {
+ private void createPostgreSQLContainer() {
POSTGRESQL_CONTAINER =
new PostgreSQLContainer<>(DockerImageName.parse(PG_IMAGE))
.withNetwork(NETWORK)
@@ -219,9 +217,9 @@ public class CanalToKafkaIT extends TestSuiteBase
implements TestResource {
@TestTemplate
public void testKafkaSinkCanalFormat(TestContainer container)
throws IOException, InterruptedException {
- Container.ExecResult execResult =
container.executeJob("/kafkasource_canal_to_kafka.conf");
+ Container.ExecResult execResult =
+
container.executeJob("/canalFormatIT/kafka_source_canal_to_kafka.conf");
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
- ArrayList<Object> result = new ArrayList<>();
List<String> expectedResult =
Arrays.asList(
"{\"data\":{\"id\":101,\"name\":\"scooter\",\"description\":\"Small 2-wheel
scooter\",\"weight\":\"3.14\"},\"type\":\"INSERT\"}",
@@ -239,22 +237,27 @@ public class CanalToKafkaIT extends TestSuiteBase
implements TestResource {
"{\"data\":{\"id\":107,\"name\":\"rocks\",\"description\":\"box of assorted
rocks\",\"weight\":\"7.88\"},\"type\":\"INSERT\"}",
"{\"data\":{\"id\":109,\"name\":\"spare
tire\",\"description\":\"24 inch spare
tire\",\"weight\":\"22.2\"},\"type\":\"DELETE\"}");
+ ArrayList<String> result = new ArrayList<>();
ArrayList<String> topics = new ArrayList<>();
topics.add(KAFKA_TOPIC);
kafkaConsumer.subscribe(topics);
- ConsumerRecords<String, String> consumerRecords =
- kafkaConsumer.poll(Duration.ofSeconds(10000));
- for (ConsumerRecord<String, String> record : consumerRecords) {
- result.add(record.value());
- }
- Assertions.assertEquals(expectedResult, result);
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ ConsumerRecords<String, String> consumerRecords =
+
kafkaConsumer.poll(Duration.ofMillis(1000));
+ for (ConsumerRecord<String, String> record :
consumerRecords) {
+ result.add(record.value());
+ }
+ Assertions.assertEquals(expectedResult, result);
+ });
}
@TestTemplate
public void testCanalFormatKafkaCdcToPgsql(TestContainer container)
throws IOException, InterruptedException, SQLException {
Container.ExecResult execResult =
- container.executeJob("/kafkasource_canal_cdc_to_pgsql.conf");
+
container.executeJob("/canalFormatIT/kafka_source_canal_cdc_to_pgsql.conf");
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
List<Object> actual = new ArrayList<>();
try (Connection connection =
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index 1055adea5..922798c3d 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -137,7 +137,7 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
@TestTemplate
public void testSinkKafka(TestContainer container) throws IOException,
InterruptedException {
- Container.ExecResult execResult =
container.executeJob("/kafkasink_fake_to_kafka.conf");
+ Container.ExecResult execResult =
container.executeJob("/kafka_sink_fake_to_kafka.conf");
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
String topicName = "test_topic";
@@ -153,7 +153,8 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
@TestTemplate
public void testTextFormatSinkKafka(TestContainer container)
throws IOException, InterruptedException {
- Container.ExecResult execResult =
container.executeJob("/kafkaTextsink_fake_to_kafka.conf");
+ Container.ExecResult execResult =
+
container.executeJob("/textFormatIT/fake_source_to_text_sink_kafka.conf");
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
String topicName = "test_text_topic";
@@ -201,7 +202,8 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
row -> new ProducerRecord<>("test_topic_text", null,
serializer.serialize(row)),
0,
100);
- Container.ExecResult execResult =
container.executeJob("/kafkasource_text_to_console.conf");
+ Container.ExecResult execResult =
+
container.executeJob("/textFormatIT/kafka_source_text_to_console.conf");
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
}
@@ -215,7 +217,8 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
DEFAULT_FORMAT,
DEFAULT_FIELD_DELIMITER);
generateTestData(row -> serializer.serializeRow(row), 0, 100);
- Container.ExecResult execResult =
container.executeJob("/kafkasource_json_to_console.conf");
+ Container.ExecResult execResult =
+
container.executeJob("/jsonFormatIT/kafka_source_json_to_console.conf");
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_cdc_to_pgsql.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/canalFormatIT/kafka_source_canal_cdc_to_pgsql.conf
similarity index 93%
rename from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_cdc_to_pgsql.conf
rename to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/canalFormatIT/kafka_source_canal_cdc_to_pgsql.conf
index cd393455f..9ce69a234 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_cdc_to_pgsql.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/canalFormatIT/kafka_source_canal_cdc_to_pgsql.conf
@@ -14,9 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-######
-###### This config file is a demonstration of streaming processing in
seatunnel config
-######
env {
execution.parallelism = 1
@@ -48,7 +45,6 @@ source {
}
}
-
sink {
Jdbc {
driver = org.postgresql.Driver
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_to_kafka.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/canalFormatIT/kafka_source_canal_to_kafka.conf
similarity index 93%
rename from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_to_kafka.conf
rename to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/canalFormatIT/kafka_source_canal_to_kafka.conf
index c229c88f9..c51238c70 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_to_kafka.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/canalFormatIT/kafka_source_canal_to_kafka.conf
@@ -14,9 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-######
-###### This config file is a demonstration of streaming processing in
seatunnel config
-######
env {
execution.parallelism = 1
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_json_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/jsonFormatIT/kafka_source_json_to_console.conf
similarity index 87%
rename from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_json_to_console.conf
rename to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/jsonFormatIT/kafka_source_json_to_console.conf
index 02209f261..90b1a3ad5 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_json_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/jsonFormatIT/kafka_source_json_to_console.conf
@@ -14,9 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-######
-###### This config file is a demonstration of streaming processing in
seatunnel config
-######
env {
execution.parallelism = 1
@@ -57,12 +54,6 @@ source {
}
}
}
-
- # If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
- # please go to
https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
-}
-
-transform {
}
sink {
@@ -93,6 +84,5 @@ sink {
}
]
}
-
}
}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka_default_sink_fake_to_kafka.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka_default_sink_fake_to_kafka.conf
index a63214874..d47dd9e35 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka_default_sink_fake_to_kafka.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka_default_sink_fake_to_kafka.conf
@@ -56,9 +56,6 @@ source {
}
}
-transform {
-}
-
sink {
Kafka {
bootstrap.servers = "kafkaCluster:9092"
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasink_fake_to_kafka.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka_sink_fake_to_kafka.conf
similarity index 99%
rename from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasink_fake_to_kafka.conf
rename to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka_sink_fake_to_kafka.conf
index fe344a71d..83d931e6c 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasink_fake_to_kafka.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka_sink_fake_to_kafka.conf
@@ -56,9 +56,6 @@ source {
}
}
-transform {
-}
-
sink {
Kafka {
bootstrap.servers = "kafkaCluster:9092"
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkaTextsink_fake_to_kafka.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/fake_source_to_text_sink_kafka.conf
similarity index 99%
rename from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkaTextsink_fake_to_kafka.conf
rename to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/fake_source_to_text_sink_kafka.conf
index 17ec327b3..ecc38bf4d 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkaTextsink_fake_to_kafka.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/fake_source_to_text_sink_kafka.conf
@@ -56,9 +56,6 @@ source {
}
}
-transform {
-}
-
sink {
Kafka {
bootstrap.servers = "kafkaCluster:9092"
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_text_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console.conf
similarity index 86%
rename from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_text_to_console.conf
rename to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console.conf
index d794a0e6a..4c2783cd3 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_text_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/textFormatIT/kafka_source_text_to_console.conf
@@ -14,9 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-######
-###### This config file is a demonstration of streaming processing in
seatunnel config
-######
env {
execution.parallelism = 1
@@ -57,15 +54,8 @@ source {
}
}
format = text
- # The default field delimiter is ","
field_delimiter = ","
}
-
- # If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
- # please go to
https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource
-}
-
-transform {
}
sink {
@@ -96,6 +86,5 @@ sink {
}
]
}
-
}
}
\ No newline at end of file