This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 077a8d27a [chore] Fix the words of `canal` & `kafka` (#4261)
077a8d27a is described below
commit 077a8d27a7ce94cfa3150c233caa8c3d3e7b8a60
Author: Zongwen Li <[email protected]>
AuthorDate: Sat Mar 4 17:36:33 2023 +0800
[chore] Fix the words of `canal` & `kafka` (#4261)
---
.../seatunnel/connectors/seatunnel/kafka/config/Config.java | 2 +-
.../seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java | 4 ++--
.../seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java | 4 ++--
.../connector/kafka/{CannalToKafakIT.java => CanalToKafkaIT.java} | 8 ++++----
4 files changed, 9 insertions(+), 9 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index 4c2a2175c..376e4f26f 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -33,7 +33,7 @@ public class Config {
public static final String TEXT_FORMAT = "text";
- public static final String CANNAL_FORMAT = "canal-json";
+ public static final String CANAL_FORMAT = "canal-json";
/** The default field delimiter is “,” */
public static final String DEFAULT_FIELD_DELIMITER = ",";
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
index 58662b245..bae76c7a9 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
@@ -32,7 +32,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.List;
import java.util.function.Function;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CANNAL_FORMAT;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CANAL_FORMAT;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FORMAT;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TEXT_FORMAT;
@@ -88,7 +88,7 @@ public class DefaultSeaTunnelRowSerializer implements
SeaTunnelRowSerializer<byt
.seaTunnelRowType(rowType)
.delimiter(delimiter)
.build();
- case CANNAL_FORMAT:
+ case CANAL_FORMAT:
return new CanalJsonSerializationSchema(rowType);
default:
throw new SeaTunnelJsonFormatException(
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index be2dfe987..5aeea30bd 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -57,7 +57,7 @@ import java.util.Map;
import java.util.Properties;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CANNAL_FORMAT;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CANAL_FORMAT;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.COMMIT_ON_CHECKPOINT;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONSUMER_GROUP;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FIELD_DELIMITER;
@@ -246,7 +246,7 @@ public class KafkaSource
.delimiter(delimiter)
.build();
break;
- case CANNAL_FORMAT:
+ case CANAL_FORMAT:
deserializationSchema =
CanalJsonDeserializationSchema.builder(typeInfo)
.setIgnoreParseErrors(true)
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/CannalToKafakIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/CanalToKafkaIT.java
similarity index 98%
rename from
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/CannalToKafakIT.java
rename to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/CanalToKafkaIT.java
index d18b131b5..42dca0860 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/CannalToKafakIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/CanalToKafkaIT.java
@@ -70,9 +70,9 @@ import static org.awaitility.Awaitility.given;
@DisabledOnContainer(
value = {},
type = {EngineType.FLINK, EngineType.SPARK})
-public class CannalToKafakIT extends TestSuiteBase implements TestResource {
+public class CanalToKafkaIT extends TestSuiteBase implements TestResource {
- private static final Logger LOG =
LoggerFactory.getLogger(CannalToKafakIT.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(CanalToKafkaIT.class);
private static GenericContainer<?> CANAL_CONTAINER;
@@ -228,7 +228,7 @@ public class CannalToKafakIT extends TestSuiteBase
implements TestResource {
}
@TestTemplate
- public void testKafakSinkCannalFormat(TestContainer container)
+ public void testKafkaSinkCanalFormat(TestContainer container)
throws IOException, InterruptedException {
Container.ExecResult execResult =
container.executeJob("/kafkasource_canal_to_kafka.conf");
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
@@ -262,7 +262,7 @@ public class CannalToKafakIT extends TestSuiteBase
implements TestResource {
}
@TestTemplate
- public void testCannalFormatKafakCdcToPgsql(TestContainer container)
+ public void testCanalFormatKafkaCdcToPgsql(TestContainer container)
throws IOException, InterruptedException, SQLException {
Container.ExecResult execResult =
container.executeJob("/kafkasource_canal_cdc_to_pgsql.conf");