This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3da19d444 [Improve][Connector-V2-kafka] Support setting read starting
offset or time at startup config (#3157)
3da19d444 is described below
commit 3da19d444408b4a8fc71d4c73a39d2754af467ca
Author: Carl-Zhou-CN <[email protected]>
AuthorDate: Tue Nov 8 19:17:22 2022 +0800
[Improve][Connector-V2-kafka] Support setting read starting offset or time
at startup config (#3157)
---
docs/en/connector-v2/source/kafka.md | 25 ++++-
.../connectors/seatunnel/kafka/config/Config.java | 15 +++
.../seatunnel/kafka/config/StartMode.java | 46 ++++++++
.../seatunnel/kafka/source/ConsumerMetadata.java | 32 ++++++
.../seatunnel/kafka/source/KafkaSource.java | 56 +++++++++-
.../kafka/source/KafkaSourceSplitEnumerator.java | 121 ++++++++++++++++-----
.../flink/v2/kafka/KafkaSourceJsonToConsoleIT.java | 65 +----------
.../kafka/KafkaSourceStartConfigToConsoleIT.java | 98 +++++++++++++++++
.../flink/v2/kafka/KafkaSourceTextToConsoleIT.java | 66 +----------
.../e2e/flink/v2/kafka/KafkaTestBaseIT.java | 90 +++++++++++++++
.../kafka/kafkasource_earliest_to_console.conf} | 89 +++++++--------
.../kafkasource_group_offset_to_console.conf} | 89 +++++++--------
.../kafka/kafkasource_latest_to_console.conf} | 89 +++++++--------
.../kafkasource_specific_offsets_to_console.conf} | 93 +++++++---------
.../kafka/kafkasource_timestamp_to_console.conf} | 90 +++++++--------
.../e2e/spark/v2/kafka/KafkaContainer.java | 2 +-
.../spark/v2/kafka/KafkaSourceJsonToConsoleIT.java | 62 +----------
.../kafka/KafkaSourceStartConfigToConsoleIT.java | 98 +++++++++++++++++
.../spark/v2/kafka/KafkaSourceTextToConsoleIT.java | 63 +----------
.../e2e/spark/v2/kafka/KafkaTestBaseIT.java | 90 +++++++++++++++
...e.conf => kafkasource_earliest_to_console.conf} | 85 ++++++---------
...nf => kafkasource_group_offset_to_console.conf} | 85 ++++++---------
.../kafka/kafkasource_json_to_console.conf | 4 +-
...ole.conf => kafkasource_latest_to_console.conf} | 85 ++++++---------
...> kafkasource_specific_offsets_to_console.conf} | 89 +++++++--------
.../kafka/kafkasource_text_to_console.conf | 4 +-
....conf => kafkasource_timestamp_to_console.conf} | 86 ++++++---------
27 files changed, 1020 insertions(+), 797 deletions(-)
diff --git a/docs/en/connector-v2/source/kafka.md
b/docs/en/connector-v2/source/kafka.md
index 425817ca5..966ebd972 100644
--- a/docs/en/connector-v2/source/kafka.md
+++ b/docs/en/connector-v2/source/kafka.md
@@ -18,7 +18,7 @@ Source connector for Apache Kafka.
## Options
| name | type | required | default value |
-| -------------------- | ------- | -------- | ------------------------ |
+|----------------------|---------| -------- |--------------------------|
| topic | String | yes | - |
| bootstrap.servers | String | yes | - |
| pattern | Boolean | no | false |
@@ -28,6 +28,9 @@ Source connector for Apache Kafka.
| common-options | | no | - |
| schema | | no | - |
| format | String | no | json |
+| start_mode | String | no | group_offsets |
+| start_mode.offsets | | no | |
+| start_mode.timestamp | Long | no | |
### topic [string]
@@ -66,6 +69,24 @@ The structure of the data, including field names and field
types.
Data format. The default format is json. Optional text format. The default
field separator is ", ".
If you customize the delimiter, add the "field_delimiter" option.
+## start_mode
+The initial consumption pattern of consumers,there are several types:
+[earliest],[group_offsets],[latest],[specific_offsets],[timestamp]
+
+## start_mode.timestamp
+The time required for consumption mode to be timestamp
+
+## start_mode.offsets
+The offset required for consumption mode to be specific_offsets
+for example:
+```hocon
+ start_mode.offsets = {
+ info-0 = 70
+ info-1 = 10
+ info-2 = 10
+ }
+```
+
## Example
### Simple
@@ -84,7 +105,7 @@ source {
format = text
field_delimiter = "#“
topic = "topic_1,topic_2,topic_3"
- bootstrap.server = "localhost:9092"
+ bootstrap.servers = "localhost:9092"
kafka.max.poll.records = 500
kafka.client.id = client_1
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index 2dedcd13d..f9c28d546 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -90,4 +90,19 @@ public class Config {
*/
public static final String PARTITION_KEY = "partition_key";
+ /**
+ * The initial consumption pattern of consumers
+ */
+ public static final String START_MODE = "start_mode";
+
+ /**
+ * The time required for consumption mode to be timestamp
+ */
+ public static final String START_MODE_TIMESTAMP = "start_mode.timestamp";
+
+ /**
+ * The offset required for consumption mode to be specific_offsets
+ */
+ public static final String START_MODE_OFFSETS = "start_mode.offsets";
+
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/StartMode.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/StartMode.java
new file mode 100644
index 000000000..21ec4ffb2
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/StartMode.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.config;
+
+public enum StartMode {
+
+ EARLIEST("earliest"),
+
+ GROUP_OFFSETS("group_offsets"),
+
+ LATEST("latest"),
+
+ TIMESTAMP("timestamp"),
+
+ SPECIFIC_OFFSETS("specific_offsets");
+
+ private String mode;
+
+ StartMode(String mode) {
+ this.mode = mode;
+ }
+
+ public String getMode() {
+ return mode;
+ }
+
+ @Override
+ public String toString() {
+ return mode;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java
index ee942aaef..615d89416 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java
@@ -17,7 +17,12 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.source;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
+
+import org.apache.kafka.common.TopicPartition;
+
import java.io.Serializable;
+import java.util.Map;
import java.util.Properties;
/**
@@ -31,6 +36,9 @@ public class ConsumerMetadata implements Serializable {
private Properties properties;
private String consumerGroup;
private boolean commitOnCheckpoint = false;
+ private StartMode startMode = StartMode.GROUP_OFFSETS;
+ private Map<TopicPartition, Long> specificStartOffsets;
+ private Long startOffsetsTimestamp;
public boolean isCommitOnCheckpoint() {
return commitOnCheckpoint;
@@ -79,4 +87,28 @@ public class ConsumerMetadata implements Serializable {
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}
+
+ public StartMode getStartMode() {
+ return startMode;
+ }
+
+ public void setStartMode(StartMode startMode) {
+ this.startMode = startMode;
+ }
+
+ public Map<TopicPartition, Long> getSpecificStartOffsets() {
+ return specificStartOffsets;
+ }
+
+ public void setSpecificStartOffsets(Map<TopicPartition, Long>
specificStartOffsets) {
+ this.specificStartOffsets = specificStartOffsets;
+ }
+
+ public Long getStartOffsetsTimestamp() {
+ return startOffsetsTimestamp;
+ }
+
+ public void setStartOffsetsTimestamp(Long startOffsetsTimestamp) {
+ this.startOffsetsTimestamp = startOffsetsTimestamp;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index 3c608f4a5..07882ea49 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -26,6 +26,9 @@ import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIEL
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PATTERN;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.SCHEMA;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE_OFFSETS;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE_TIMESTAMP;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
import org.apache.seatunnel.api.common.JobContext;
@@ -42,15 +45,22 @@ import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
import org.apache.seatunnel.format.text.TextDeserializationSchema;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.auto.service.AutoService;
+import org.apache.kafka.common.TopicPartition;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
@AutoService(SeaTunnelSource.class)
@@ -96,6 +106,40 @@ public class KafkaSource implements
SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
this.metadata.setCommitOnCheckpoint(config.getBoolean(COMMIT_ON_CHECKPOINT));
}
+ if (config.hasPath(START_MODE)) {
+ StartMode startMode =
StartMode.valueOf(config.getString(START_MODE).toUpperCase());
+ this.metadata.setStartMode(startMode);
+ switch (startMode) {
+ case TIMESTAMP:
+ long startOffsetsTimestamp =
config.getLong(START_MODE_TIMESTAMP);
+ long currentTimestamp = System.currentTimeMillis();
+ if (startOffsetsTimestamp < 0 || startOffsetsTimestamp >
currentTimestamp) {
+ throw new
IllegalArgumentException("start_mode.timestamp The value is smaller than 0 or
smaller than the current time");
+ }
+
this.metadata.setStartOffsetsTimestamp(startOffsetsTimestamp);
+ break;
+ case SPECIFIC_OFFSETS:
+ Config offsets = config.getConfig(START_MODE_OFFSETS);
+ ConfigRenderOptions options =
ConfigRenderOptions.concise();
+ String offsetsJson = offsets.root().render(options);
+ if (offsetsJson == null) {
+ throw new IllegalArgumentException("start mode is " +
StartMode.SPECIFIC_OFFSETS + "but no specific offsets were specified.");
+ }
+ Map<TopicPartition, Long> specificStartOffsets = new
HashMap<>();
+ ObjectNode jsonNodes = JsonUtils.parseObject(offsetsJson);
+ jsonNodes.fieldNames().forEachRemaining(key -> {
+ String[] topicAndPartition = key.split("-");
+ long offset = jsonNodes.get(key).asLong();
+ TopicPartition topicPartition = new
TopicPartition(topicAndPartition[0], Integer.valueOf(topicAndPartition[1]));
+ specificStartOffsets.put(topicPartition, offset);
+ });
+
this.metadata.setSpecificStartOffsets(specificStartOffsets);
+ break;
+ default:
+ break;
+ }
+ }
+
TypesafeConfigUtils.extractSubConfig(config, "kafka.",
false).entrySet().forEach(e -> {
this.metadata.getProperties().put(e.getKey(),
String.valueOf(e.getValue().unwrapped()));
});
@@ -144,9 +188,9 @@ public class KafkaSource implements
SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
delimiter = config.getString(FIELD_DELIMITER);
}
deserializationSchema = TextDeserializationSchema.builder()
- .seaTunnelRowType(typeInfo)
- .delimiter(delimiter)
- .build();
+ .seaTunnelRowType(typeInfo)
+ .delimiter(delimiter)
+ .build();
} else {
// TODO: use format SPI
throw new UnsupportedOperationException("Unsupported format: "
+ format);
@@ -154,9 +198,9 @@ public class KafkaSource implements
SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
} else {
typeInfo = SeaTunnelSchema.buildSimpleTextSchema();
this.deserializationSchema = TextDeserializationSchema.builder()
- .seaTunnelRowType(typeInfo)
- .delimiter(String.valueOf('\002'))
- .build();
+ .seaTunnelRowType(typeInfo)
+ .delimiter(String.valueOf('\002'))
+ .build();
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
index 0852fe8fd..59c1aba7f 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
@@ -23,7 +23,7 @@ import
org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.TopicPartition;
@@ -33,7 +33,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -51,14 +50,14 @@ public class KafkaSourceSplitEnumerator implements
SourceSplitEnumerator<KafkaSo
private final Context<KafkaSourceSplit> context;
private AdminClient adminClient;
- private Set<KafkaSourceSplit> pendingSplit;
- private final Set<KafkaSourceSplit> assignedSplit;
+ private Map<TopicPartition, KafkaSourceSplit> pendingSplit;
+ private final Map<TopicPartition, KafkaSourceSplit> assignedSplit;
KafkaSourceSplitEnumerator(ConsumerMetadata metadata,
Context<KafkaSourceSplit> context) {
this.metadata = metadata;
this.context = context;
- this.assignedSplit = new HashSet<>();
- this.pendingSplit = new HashSet<>();
+ this.assignedSplit = new HashMap<>();
+ this.pendingSplit = new HashMap<>();
}
KafkaSourceSplitEnumerator(ConsumerMetadata metadata,
Context<KafkaSourceSplit> context,
@@ -74,13 +73,45 @@ public class KafkaSourceSplitEnumerator implements
SourceSplitEnumerator<KafkaSo
@Override
public void run() throws ExecutionException, InterruptedException {
getTopicInfo().forEach(split -> {
- if (!assignedSplit.contains(split)) {
- pendingSplit.add(split);
+ if (!assignedSplit.containsKey(split.getTopicPartition())) {
+ if (!pendingSplit.containsKey(split.getTopicPartition())) {
+ pendingSplit.put(split.getTopicPartition(), split);
+ }
}
});
+ setPartitionStartOffset();
assignSplit();
}
+ private void setPartitionStartOffset() throws ExecutionException,
InterruptedException {
+ Collection<TopicPartition> topicPartitions = pendingSplit.keySet();
+ Map<TopicPartition, Long> topicPartitionOffsets = null;
+ switch (metadata.getStartMode()) {
+ case EARLIEST:
+ topicPartitionOffsets = listOffsets(topicPartitions,
OffsetSpec.earliest());
+ break;
+ case GROUP_OFFSETS:
+ topicPartitionOffsets =
listConsumerGroupOffsets(topicPartitions);
+ break;
+ case LATEST:
+ topicPartitionOffsets = listOffsets(topicPartitions,
OffsetSpec.latest());
+ break;
+ case TIMESTAMP:
+ topicPartitionOffsets = listOffsets(topicPartitions,
OffsetSpec.forTimestamp(metadata.getStartOffsetsTimestamp()));
+ break;
+ case SPECIFIC_OFFSETS:
+ topicPartitionOffsets = metadata.getSpecificStartOffsets();
+ break;
+ default:
+ break;
+ }
+ topicPartitionOffsets.entrySet().forEach(entry -> {
+ if (pendingSplit.containsKey(entry.getKey())) {
+
pendingSplit.get(entry.getKey()).setStartOffset(entry.getValue());
+ }
+ });
+ }
+
@Override
public void close() throws IOException {
if (this.adminClient != null) {
@@ -91,20 +122,20 @@ public class KafkaSourceSplitEnumerator implements
SourceSplitEnumerator<KafkaSo
@Override
public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) {
if (!splits.isEmpty()) {
- pendingSplit.addAll(convertToNextSplit(splits));
+ pendingSplit.putAll(convertToNextSplit(splits));
assignSplit();
}
}
- private Collection<? extends KafkaSourceSplit>
convertToNextSplit(List<KafkaSourceSplit> splits) {
+ private Map<TopicPartition, ? extends KafkaSourceSplit>
convertToNextSplit(List<KafkaSourceSplit> splits) {
try {
- Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>
listOffsets =
-
getKafkaPartitionLatestOffset(splits.stream().map(KafkaSourceSplit::getTopicPartition).collect(Collectors.toList()));
+ Map<TopicPartition, Long> listOffsets =
+
listOffsets(splits.stream().map(KafkaSourceSplit::getTopicPartition).collect(Collectors.toList()),
OffsetSpec.latest());
splits.forEach(split -> {
split.setStartOffset(split.getEndOffset() + 1);
-
split.setEndOffset(listOffsets.get(split.getTopicPartition()).offset());
+ split.setEndOffset(listOffsets.get(split.getTopicPartition()));
});
- return splits;
+ return splits.stream().collect(Collectors.toMap(split ->
split.getTopicPartition(), split -> split));
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -129,7 +160,7 @@ public class KafkaSourceSplitEnumerator implements
SourceSplitEnumerator<KafkaSo
@Override
public KafkaSourceState snapshotState(long checkpointId) throws Exception {
- return new KafkaSourceState(assignedSplit);
+ return new
KafkaSourceState(assignedSplit.values().stream().collect(Collectors.toSet()));
}
@Override
@@ -158,34 +189,30 @@ public class KafkaSourceSplitEnumerator implements
SourceSplitEnumerator<KafkaSo
Collection<TopicPartition> partitions =
adminClient.describeTopics(topics).all().get().values().stream().flatMap(t ->
t.partitions().stream()
.map(p -> new TopicPartition(t.name(),
p.partition()))).collect(Collectors.toSet());
- return
getKafkaPartitionLatestOffset(partitions).entrySet().stream().map(partition -> {
- KafkaSourceSplit split = new KafkaSourceSplit(partition.getKey());
- split.setEndOffset(partition.getValue().offset());
+ Map<TopicPartition, Long> latestOffsets = listOffsets(partitions,
OffsetSpec.latest());
+ return partitions.stream().map(partition -> {
+ KafkaSourceSplit split = new KafkaSourceSplit(partition);
+ split.setEndOffset(latestOffsets.get(split.getTopicPartition()));
return split;
}).collect(Collectors.toSet());
}
- private Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>
getKafkaPartitionLatestOffset(Collection<TopicPartition> partitions) throws
InterruptedException, ExecutionException {
- return
adminClient.listOffsets(partitions.stream().collect(Collectors.toMap(p -> p, p
-> OffsetSpec.latest())))
- .all().get();
- }
-
private void assignSplit() {
Map<Integer, List<KafkaSourceSplit>> readySplit = new
HashMap<>(Common.COLLECTION_SIZE);
for (int taskID = 0; taskID < context.currentParallelism(); taskID++) {
readySplit.computeIfAbsent(taskID, id -> new ArrayList<>());
}
- pendingSplit.forEach(s -> {
- if (!assignedSplit.contains(s)) {
- readySplit.get(getSplitOwner(s.getTopicPartition(),
context.currentParallelism()))
- .add(s);
+ pendingSplit.entrySet().forEach(s -> {
+ if (!assignedSplit.containsKey(s.getKey())) {
+ readySplit.get(getSplitOwner(s.getKey(),
context.currentParallelism()))
+ .add(s.getValue());
}
});
readySplit.forEach(context::assignSplit);
- assignedSplit.addAll(pendingSplit);
+ assignedSplit.putAll(pendingSplit);
pendingSplit.clear();
}
@@ -195,4 +222,42 @@ public class KafkaSourceSplitEnumerator implements
SourceSplitEnumerator<KafkaSo
return (startIndex + tp.partition()) % numReaders;
}
+ private Map<TopicPartition, Long> listOffsets(Collection<TopicPartition>
partitions, OffsetSpec offsetSpec) throws ExecutionException,
InterruptedException {
+ Map<TopicPartition, OffsetSpec> topicPartitionOffsets =
partitions.stream().collect(Collectors.toMap(partition -> partition, __ ->
offsetSpec));
+
+ return adminClient.listOffsets(topicPartitionOffsets).all()
+ .thenApply(
+ result -> {
+ Map<TopicPartition, Long>
+ offsets = new HashMap<>();
+ result.forEach(
+ (tp, offsetsResultInfo) -> {
+ if (offsetsResultInfo != null) {
+ offsets.put(tp, offsetsResultInfo.offset());
+ }
+ });
+ return offsets;
+ })
+ .get();
+ }
+
+ public Map<TopicPartition, Long>
listConsumerGroupOffsets(Collection<TopicPartition> partitions) throws
ExecutionException, InterruptedException {
+ ListConsumerGroupOffsetsOptions options = new
ListConsumerGroupOffsetsOptions().topicPartitions(new ArrayList<>(partitions));
+ return adminClient
+ .listConsumerGroupOffsets(metadata.getConsumerGroup(), options)
+ .partitionsToOffsetAndMetadata()
+ .thenApply(
+ result -> {
+ Map<TopicPartition, Long> offsets = new HashMap<>();
+ result.forEach(
+ (tp, oam) -> {
+ if (oam != null) {
+ offsets.put(tp, oam.offset());
+ }
+ });
+ return offsets;
+ })
+ .get();
+ }
+
}
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceJsonToConsoleIT.java
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceJsonToConsoleIT.java
index e3468e8dd..c1499b14e 100644
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceJsonToConsoleIT.java
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceJsonToConsoleIT.java
@@ -27,69 +27,28 @@ import
org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
-import org.apache.seatunnel.e2e.flink.FlinkContainer;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.Container;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.shaded.com.google.common.collect.Lists;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
-import org.testcontainers.utility.DockerImageName;
-import org.testcontainers.utility.DockerLoggerFactory;
import java.io.IOException;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Collections;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
/**
* This test case is used to verify that the kafka source is able to send data
to the console.
- * Make sure the SeaTunnel job can submit successfully on spark engine.
+ * Make sure the SeaTunnel job can submit successfully on flink engine.
*/
@Slf4j
-public class KafkaSourceJsonToConsoleIT extends FlinkContainer {
-
- private static final int KAFKA_PORT = 9093;
-
- private static final String KAFKA_HOST = "kafkaCluster";
-
- private KafkaProducer<byte[], byte[]> producer;
-
- private KafkaContainer kafkaContainer;
-
- @BeforeEach
- public void startKafkaContainer() {
- kafkaContainer = new
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
- .withNetwork(NETWORK)
- .withNetworkAliases(KAFKA_HOST)
- .withLogConsumer(new
Slf4jLogConsumer(DockerLoggerFactory.getLogger("confluentinc/cp-kafka:6.2.1")));
- kafkaContainer.setPortBindings(Lists.newArrayList(
- String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
- Startables.deepStart(Stream.of(kafkaContainer)).join();
- log.info("Kafka container started");
- Awaitility.given().ignoreExceptions()
- .atLeast(100, TimeUnit.MILLISECONDS)
- .pollInterval(500, TimeUnit.MILLISECONDS)
- .atMost(180, TimeUnit.SECONDS)
- .untilAsserted(() -> initKafkaProducer());
- generateTestData();
- }
+public class KafkaSourceJsonToConsoleIT extends KafkaTestBaseIT {
@SuppressWarnings("checkstyle:Indentation")
- private void generateTestData() {
+ protected void generateTestData() {
SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
new String[]{
@@ -160,22 +119,4 @@ public class KafkaSourceJsonToConsoleIT extends
FlinkContainer {
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
}
- private void initKafkaProducer() {
- Properties props = new Properties();
- String bootstrapServers = kafkaContainer.getBootstrapServers();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
- producer = new KafkaProducer<>(props);
- }
-
- @AfterEach
- public void close() {
- if (producer != null) {
- producer.close();
- }
- if (kafkaContainer != null) {
- kafkaContainer.close();
- }
- }
}
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceStartConfigToConsoleIT.java
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceStartConfigToConsoleIT.java
new file mode 100644
index 000000000..1d7225ae8
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceStartConfigToConsoleIT.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.kafka;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+@Slf4j
+public class KafkaSourceStartConfigToConsoleIT extends KafkaTestBaseIT {
+ @Override
+ protected void generateTestData() {
+ generateStepTestData(0, 100);
+ }
+
+ private void generateStepTestData(int start, int end) {
+
+ SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
+ new String[]{
+ "id"
+ },
+ new SeaTunnelDataType[]{
+ BasicType.LONG_TYPE
+ }
+ );
+
+ DefaultSeaTunnelRowSerializer serializer = new
DefaultSeaTunnelRowSerializer("test_topic", seatunnelRowType);
+ for (int i = start; i < end; i++) {
+ SeaTunnelRow row = new SeaTunnelRow(
+ new Object[]{
+ Long.valueOf(i)
+ });
+ ProducerRecord<byte[], byte[]> producerRecord =
serializer.serializeRow(row);
+ producer.send(producerRecord);
+ }
+ }
+
+ @Test
+ public void testKafka() throws IOException, InterruptedException {
+ testKafkaLatestToConsole();
+ testKafkaEarliestToConsole();
+ testKafkaSpecificOffsetsToConsole();
+ testKafkaGroupOffsetsToConsole();
+ testKafkaTimestampToConsole();
+ }
+
+ public void testKafkaLatestToConsole() throws IOException,
InterruptedException {
+ Container.ExecResult execResult =
executeSeaTunnelFlinkJob("/kafka/kafkasource_latest_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ }
+
+ public void testKafkaEarliestToConsole() throws IOException,
InterruptedException {
+ Container.ExecResult execResult =
executeSeaTunnelFlinkJob("/kafka/kafkasource_earliest_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ }
+
+ public void testKafkaSpecificOffsetsToConsole() throws IOException,
InterruptedException {
+ Container.ExecResult execResult =
executeSeaTunnelFlinkJob("/kafka/kafkasource_specific_offsets_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ }
+
+ public void testKafkaGroupOffsetsToConsole() throws IOException,
InterruptedException {
+ generateStepTestData(100, 150);
+ Container.ExecResult execResult =
executeSeaTunnelFlinkJob("/kafka/kafkasource_group_offset_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ }
+
+ public void testKafkaTimestampToConsole() throws IOException,
InterruptedException {
+ Container.ExecResult execResult =
executeSeaTunnelFlinkJob("/kafka/kafkasource_timestamp_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ }
+
+}
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceTextToConsoleIT.java
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceTextToConsoleIT.java
index f0855b455..bfe37078d 100644
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceTextToConsoleIT.java
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceTextToConsoleIT.java
@@ -26,70 +26,28 @@ import
org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.e2e.flink.FlinkContainer;
import org.apache.seatunnel.format.text.TextSerializationSchema;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.Container;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.shaded.com.google.common.collect.Lists;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
-import org.testcontainers.utility.DockerImageName;
-import org.testcontainers.utility.DockerLoggerFactory;
import java.io.IOException;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Collections;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
/**
* This test case is used to verify that the kafka source is able to send data
to the console.
- * Make sure the SeaTunnel job can submit successfully on spark engine.
+ * Make sure the SeaTunnel job can submit successfully on flink engine.
*/
@Slf4j
-public class KafkaSourceTextToConsoleIT extends FlinkContainer {
-
- private static final int KAFKA_PORT = 9093;
-
- private static final String KAFKA_HOST = "kafkaCluster";
-
- private KafkaProducer<byte[], byte[]> producer;
-
- private KafkaContainer kafkaContainer;
-
- @BeforeEach
- public void startKafkaContainer() {
- kafkaContainer = new
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
- .withNetwork(NETWORK)
- .withNetworkAliases(KAFKA_HOST)
- .withLogConsumer(new
Slf4jLogConsumer(DockerLoggerFactory.getLogger("confluentinc/cp-kafka:6.2.1")));
- kafkaContainer.setPortBindings(Lists.newArrayList(
- String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
- Startables.deepStart(Stream.of(kafkaContainer)).join();
- log.info("Kafka container started");
- Awaitility.given().ignoreExceptions()
- .atLeast(100, TimeUnit.MILLISECONDS)
- .pollInterval(500, TimeUnit.MILLISECONDS)
- .atMost(180, TimeUnit.SECONDS)
- .untilAsserted(() -> initKafkaProducer());
- generateTestData();
- }
-
+public class KafkaSourceTextToConsoleIT extends KafkaTestBaseIT {
@SuppressWarnings("checkstyle:Indentation")
- private void generateTestData() {
+ protected void generateTestData() {
SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
new String[]{
@@ -163,22 +121,4 @@ public class KafkaSourceTextToConsoleIT extends
FlinkContainer {
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
}
- private void initKafkaProducer() {
- Properties props = new Properties();
- String bootstrapServers = kafkaContainer.getBootstrapServers();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
- producer = new KafkaProducer<>(props);
- }
-
- @AfterEach
- public void close() {
- if (producer != null) {
- producer.close();
- }
- if (kafkaContainer != null) {
- kafkaContainer.close();
- }
- }
}
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaTestBaseIT.java
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaTestBaseIT.java
new file mode 100644
index 000000000..0c339cccc
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaTestBaseIT.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.kafka;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.com.google.common.collect.Lists;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class KafkaTestBaseIT extends FlinkContainer {
+ protected static final int KAFKA_PORT = 9093;
+
+ protected static final String KAFKA_HOST = "kafkaCluster";
+
+ protected KafkaProducer<byte[], byte[]> producer;
+
+ protected KafkaContainer kafkaContainer;
+
+ @BeforeEach
+ public void startKafkaContainer() {
+ kafkaContainer = new
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
+ .withNetwork(NETWORK)
+ .withNetworkAliases(KAFKA_HOST)
+ .withLogConsumer(new
Slf4jLogConsumer(DockerLoggerFactory.getLogger("confluentinc/cp-kafka:6.2.1")));
+ kafkaContainer.setPortBindings(Lists.newArrayList(
+ String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
+ Startables.deepStart(Stream.of(kafkaContainer)).join();
+ log.info("Kafka container started");
+ Awaitility.given().ignoreExceptions()
+ .atLeast(100, TimeUnit.MILLISECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(180, TimeUnit.SECONDS)
+ .untilAsserted(() -> initKafkaProducer());
+ generateTestData();
+ }
+
+ protected void initKafkaProducer() {
+ Properties props = new Properties();
+ String bootstrapServers = kafkaContainer.getBootstrapServers();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
+ producer = new KafkaProducer<>(props);
+ }
+
+ @SuppressWarnings("checkstyle:Indentation")
+ protected void generateTestData() {
+
+ }
+
+ @AfterEach
+ public void close() {
+ if (producer != null) {
+ producer.close();
+ }
+ if (kafkaContainer != null) {
+ kafkaContainer.close();
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
similarity index 51%
copy from
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
copy to
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
index 91c920237..97f9ddb5d 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
@@ -19,38 +19,25 @@
######
env {
- # You can set spark configuration here
- spark.app.name = "SeaTunnel"
- source.parallelism = 1
- job.mode = "BATCH"
+ # You can set flink configuration here
+ execution.parallelism = 1
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
-
source {
Kafka {
bootstrap.servers = "kafkaCluster:9093"
topic = "test_topic"
result_table_name = "kafka_table"
- kafka.auto.offset.reset = "earliest"
+ # The default format is json, which is optional
+ format = json
+ start_mode = earliest
schema = {
- fields {
- id = bigint
- c_map = "map<string, smallint>"
- c_array = "array<tinyint>"
- c_string = string
- c_boolean = boolean
- c_tinyint = tinyint
- c_smallint = smallint
- c_int = int
- c_bigint = bigint
- c_float = float
- c_double = double
- c_decimal = "decimal(2, 1)"
- c_bytes = bytes
- c_date = date
- c_timestamp = timestamp
- }
- }
+ fields {
+ id = bigint
+ }
+ }
}
# If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
@@ -58,33 +45,31 @@ source {
}
transform {
-}
+ }
-sink {
- Console {}
- Assert {
- rules =
- {
- field_rules = [
- {
- field_name = id
- field_type = long
- field_value = [
- {
- rule_type = NOT_NULL
- },
- {
- rule_type = MIN
- rule_value = 0
- },
- {
- rule_type = MAX
- rule_value = 99
- }
- ]
- }
- ]
- }
+sink {
+ Console {}
+
+ Assert {
+ rules =
+ {
+ field_rules = [
+ {
+ field_name = id
+ field_type = long
+ field_value = [
- }
-}
\ No newline at end of file
+ {
+ rule_type = MIN
+ rule_value = 0
+ },
+ {
+ rule_type = MAX
+ rule_value = 99
+ }
+ ]
+ }
+ ]
+ }
+ }
+ }
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
similarity index 51%
copy from
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
copy to
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
index 91c920237..2a7f9828c 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
@@ -19,38 +19,25 @@
######
env {
- # You can set spark configuration here
- spark.app.name = "SeaTunnel"
- source.parallelism = 1
- job.mode = "BATCH"
+ # You can set flink configuration here
+ execution.parallelism = 1
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
-
source {
Kafka {
bootstrap.servers = "kafkaCluster:9093"
topic = "test_topic"
result_table_name = "kafka_table"
- kafka.auto.offset.reset = "earliest"
+ # The default format is json, which is optional
+ format = json
+ start_mode = group_offsets
schema = {
- fields {
- id = bigint
- c_map = "map<string, smallint>"
- c_array = "array<tinyint>"
- c_string = string
- c_boolean = boolean
- c_tinyint = tinyint
- c_smallint = smallint
- c_int = int
- c_bigint = bigint
- c_float = float
- c_double = double
- c_decimal = "decimal(2, 1)"
- c_bytes = bytes
- c_date = date
- c_timestamp = timestamp
- }
- }
+ fields {
+ id = bigint
+ }
+ }
}
# If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
@@ -58,33 +45,31 @@ source {
}
transform {
-}
+ }
-sink {
- Console {}
- Assert {
- rules =
- {
- field_rules = [
- {
- field_name = id
- field_type = long
- field_value = [
- {
- rule_type = NOT_NULL
- },
- {
- rule_type = MIN
- rule_value = 0
- },
- {
- rule_type = MAX
- rule_value = 99
- }
- ]
- }
- ]
- }
+sink {
+ Console {}
+
+ Assert {
+ rules =
+ {
+ field_rules = [
+ {
+ field_name = id
+ field_type = long
+ field_value = [
- }
-}
\ No newline at end of file
+ {
+ rule_type = MIN
+ rule_value = 100
+ },
+ {
+ rule_type = MAX
+ rule_value = 149
+ }
+ ]
+ }
+ ]
+ }
+ }
+ }
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
similarity index 51%
copy from
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
copy to
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
index 91c920237..736104ea0 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
@@ -19,38 +19,25 @@
######
env {
- # You can set spark configuration here
- spark.app.name = "SeaTunnel"
- source.parallelism = 1
- job.mode = "BATCH"
+ # You can set flink configuration here
+ execution.parallelism = 1
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
-
source {
Kafka {
bootstrap.servers = "kafkaCluster:9093"
topic = "test_topic"
result_table_name = "kafka_table"
- kafka.auto.offset.reset = "earliest"
+ # The default format is json, which is optional
+ format = json
+ start_mode = latest
schema = {
- fields {
- id = bigint
- c_map = "map<string, smallint>"
- c_array = "array<tinyint>"
- c_string = string
- c_boolean = boolean
- c_tinyint = tinyint
- c_smallint = smallint
- c_int = int
- c_bigint = bigint
- c_float = float
- c_double = double
- c_decimal = "decimal(2, 1)"
- c_bytes = bytes
- c_date = date
- c_timestamp = timestamp
- }
- }
+ fields {
+ id = bigint
+ }
+ }
}
# If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
@@ -58,33 +45,31 @@ source {
}
transform {
-}
+ }
-sink {
- Console {}
- Assert {
- rules =
- {
- field_rules = [
- {
- field_name = id
- field_type = long
- field_value = [
- {
- rule_type = NOT_NULL
- },
- {
- rule_type = MIN
- rule_value = 0
- },
- {
- rule_type = MAX
- rule_value = 99
- }
- ]
- }
- ]
- }
+sink {
+ Console {}
+
+ Assert {
+ rules =
+ {
+ field_rules = [
+ {
+ field_name = id
+ field_type = long
+ field_value = [
- }
-}
\ No newline at end of file
+ {
+ rule_type = MIN
+ rule_value = 99
+ },
+ {
+ rule_type = MAX
+ rule_value = 99
+ }
+ ]
+ }
+ ]
+ }
+ }
+ }
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
similarity index 51%
copy from
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
copy to
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
index 91c920237..b75756327 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
@@ -19,38 +19,29 @@
######
env {
- # You can set spark configuration here
- spark.app.name = "SeaTunnel"
- source.parallelism = 1
- job.mode = "BATCH"
+ # You can set flink configuration here
+ execution.parallelism = 1
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
-
source {
Kafka {
bootstrap.servers = "kafkaCluster:9093"
topic = "test_topic"
result_table_name = "kafka_table"
- kafka.auto.offset.reset = "earliest"
+ # The default format is json, which is optional
+ format = json
+ start_mode = specific_offsets
schema = {
- fields {
- id = bigint
- c_map = "map<string, smallint>"
- c_array = "array<tinyint>"
- c_string = string
- c_boolean = boolean
- c_tinyint = tinyint
- c_smallint = smallint
- c_int = int
- c_bigint = bigint
- c_float = float
- c_double = double
- c_decimal = "decimal(2, 1)"
- c_bytes = bytes
- c_date = date
- c_timestamp = timestamp
- }
- }
+ fields {
+ id = bigint
+ }
+ }
+
+ start_mode.offsets = {
+ test_topic-0 = 50
+ }
}
# If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
@@ -58,33 +49,31 @@ source {
}
transform {
-}
+ }
-sink {
- Console {}
- Assert {
- rules =
- {
- field_rules = [
- {
- field_name = id
- field_type = long
- field_value = [
- {
- rule_type = NOT_NULL
- },
- {
- rule_type = MIN
- rule_value = 0
- },
- {
- rule_type = MAX
- rule_value = 99
- }
- ]
- }
- ]
- }
+sink {
+ Console {}
+
+ Assert {
+ rules =
+ {
+ field_rules = [
+ {
+ field_name = id
+ field_type = long
+ field_value = [
- }
-}
\ No newline at end of file
+ {
+ rule_type = MIN
+ rule_value = 50
+ },
+ {
+ rule_type = MAX
+ rule_value = 99
+ }
+ ]
+ }
+ ]
+ }
+ }
+ }
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
similarity index 51%
copy from
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
copy to
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
index 91c920237..b491c6546 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
@@ -19,38 +19,26 @@
######
env {
- # You can set spark configuration here
- spark.app.name = "SeaTunnel"
- source.parallelism = 1
- job.mode = "BATCH"
+ # You can set flink configuration here
+ execution.parallelism = 1
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
-
source {
Kafka {
bootstrap.servers = "kafkaCluster:9093"
topic = "test_topic"
result_table_name = "kafka_table"
- kafka.auto.offset.reset = "earliest"
+ # The default format is json, which is optional
+ format = json
+ start_mode = timestamp
schema = {
- fields {
- id = bigint
- c_map = "map<string, smallint>"
- c_array = "array<tinyint>"
- c_string = string
- c_boolean = boolean
- c_tinyint = tinyint
- c_smallint = smallint
- c_int = int
- c_bigint = bigint
- c_float = float
- c_double = double
- c_decimal = "decimal(2, 1)"
- c_bytes = bytes
- c_date = date
- c_timestamp = timestamp
- }
- }
+ fields {
+ id = bigint
+ }
+ }
+ start_mode.timestamp = 1667179890315
}
# If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
@@ -58,33 +46,31 @@ source {
}
transform {
-}
+ }
-sink {
- Console {}
- Assert {
- rules =
- {
- field_rules = [
- {
- field_name = id
- field_type = long
- field_value = [
- {
- rule_type = NOT_NULL
- },
- {
- rule_type = MIN
- rule_value = 0
- },
- {
- rule_type = MAX
- rule_value = 99
- }
- ]
- }
- ]
- }
+sink {
+ Console {}
+
+ Assert {
+ rules =
+ {
+ field_rules = [
+ {
+ field_name = id
+ field_type = long
+ field_value = [
- }
-}
\ No newline at end of file
+ {
+ rule_type = MIN
+ rule_value = 0
+ },
+ {
+ rule_type = MAX
+ rule_value = 149
+ }
+ ]
+ }
+ ]
+ }
+ }
+ }
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaContainer.java
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaContainer.java
index 431df9205..117daf78e 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaContainer.java
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaContainer.java
@@ -35,7 +35,7 @@ public class KafkaContainer extends
GenericContainer<KafkaContainer> {
private static final DockerImageName DEFAULT_IMAGE_NAME =
DockerImageName.parse("confluentinc/cp-kafka");
- public static final int KAFKA_PORT = 9093;
+ public static final int KAFKA_PORT = 9094;
public static final int ZOOKEEPER_PORT = 2181;
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceJsonToConsoleIT.java
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceJsonToConsoleIT.java
index 8b9a425a9..de6abf829 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceJsonToConsoleIT.java
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceJsonToConsoleIT.java
@@ -27,68 +27,28 @@ import
org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
-import org.apache.seatunnel.e2e.spark.SparkContainer;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.Container;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.shaded.com.google.common.collect.Lists;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
-import org.testcontainers.utility.DockerImageName;
import java.io.IOException;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Collections;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
/**
* This test case is used to verify that the kafka source is able to send data
to the console.
* Make sure the SeaTunnel job can submit successfully on spark engine.
*/
@SuppressWarnings("checkstyle:EmptyLineSeparator")
@Slf4j
-public class KafkaSourceJsonToConsoleIT extends SparkContainer {
-
- private static final int KAFKA_PORT = 9093;
-
- private static final String KAFKA_HOST = "kafkaCluster";
-
- private KafkaProducer<byte[], byte[]> producer;
-
- private KafkaContainer kafkaContainer;
-
- @BeforeEach
- public void startKafkaContainer() {
- kafkaContainer = new
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
- .withNetwork(NETWORK)
- .withNetworkAliases(KAFKA_HOST)
- .withLogConsumer(new Slf4jLogConsumer(log));
- kafkaContainer.setPortBindings(Lists.newArrayList(
- String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
- Startables.deepStart(Stream.of(kafkaContainer)).join();
- log.info("Kafka container started");
- Awaitility.given().ignoreExceptions()
- .atLeast(100, TimeUnit.MILLISECONDS)
- .pollInterval(500, TimeUnit.MILLISECONDS)
- .atMost(180, TimeUnit.SECONDS)
- .untilAsserted(() -> initKafkaProducer());
- generateTestData();
- }
+public class KafkaSourceJsonToConsoleIT extends KafkaTestBaseIT {
@SuppressWarnings("checkstyle:Indentation")
- private void generateTestData() {
+ protected void generateTestData() {
SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
new String[]{
@@ -159,22 +119,4 @@ public class KafkaSourceJsonToConsoleIT extends
SparkContainer {
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
}
- private void initKafkaProducer() {
- Properties props = new Properties();
- String bootstrapServers = kafkaContainer.getBootstrapServers();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
- producer = new KafkaProducer<>(props);
- }
-
- @AfterEach
- public void close() {
- if (producer != null) {
- producer.close();
- }
- if (kafkaContainer != null) {
- kafkaContainer.close();
- }
- }
}
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceStartConfigToConsoleIT.java
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceStartConfigToConsoleIT.java
new file mode 100644
index 000000000..3ff1faff3
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceStartConfigToConsoleIT.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.kafka;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+@Slf4j
+public class KafkaSourceStartConfigToConsoleIT extends KafkaTestBaseIT {
+ @Override
+ protected void generateTestData() {
+ generateStepTestData(0, 100);
+ }
+
+ private void generateStepTestData(int start, int end) {
+
+ SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
+ new String[]{
+ "id"
+ },
+ new SeaTunnelDataType[]{
+ BasicType.LONG_TYPE
+ }
+ );
+
+ DefaultSeaTunnelRowSerializer serializer = new
DefaultSeaTunnelRowSerializer("test_topic", seatunnelRowType);
+ for (int i = start; i < end; i++) {
+ SeaTunnelRow row = new SeaTunnelRow(
+ new Object[]{
+ Long.valueOf(i)
+ });
+ ProducerRecord<byte[], byte[]> producerRecord =
serializer.serializeRow(row);
+ producer.send(producerRecord);
+ }
+ }
+
+ @Test
+ public void testKafka() throws IOException, InterruptedException {
+ testKafkaLatestToConsole();
+ testKafkaEarliestToConsole();
+ testKafkaSpecificOffsetsToConsole();
+ testKafkaGroupOffsetsToConsole();
+ testKafkaTimestampToConsole();
+ }
+
+ public void testKafkaLatestToConsole() throws IOException,
InterruptedException {
+ Container.ExecResult execResult =
executeSeaTunnelSparkJob("/kafka/kafkasource_latest_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ }
+
+ public void testKafkaEarliestToConsole() throws IOException,
InterruptedException {
+ Container.ExecResult execResult =
executeSeaTunnelSparkJob("/kafka/kafkasource_earliest_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ }
+
+ public void testKafkaSpecificOffsetsToConsole() throws IOException,
InterruptedException {
+ Container.ExecResult execResult =
executeSeaTunnelSparkJob("/kafka/kafkasource_specific_offsets_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ }
+
+ public void testKafkaGroupOffsetsToConsole() throws IOException,
InterruptedException {
+ generateStepTestData(100, 150);
+ Container.ExecResult execResult =
executeSeaTunnelSparkJob("/kafka/kafkasource_group_offset_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ }
+
+ public void testKafkaTimestampToConsole() throws IOException,
InterruptedException {
+ Container.ExecResult execResult =
executeSeaTunnelSparkJob("/kafka/kafkasource_timestamp_to_console.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ }
+
+}
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceTextToConsoleIT.java
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceTextToConsoleIT.java
index c2842557c..f4575f4e8 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceTextToConsoleIT.java
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceTextToConsoleIT.java
@@ -26,70 +26,29 @@ import
org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.e2e.spark.SparkContainer;
import org.apache.seatunnel.format.text.TextSerializationSchema;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.Container;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.shaded.com.google.common.collect.Lists;
-import org.testcontainers.shaded.org.awaitility.Awaitility;
-import org.testcontainers.utility.DockerImageName;
-import org.testcontainers.utility.DockerLoggerFactory;
import java.io.IOException;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Collections;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Stream;
/**
* This test case is used to verify that the kafka source is able to send data
to the console.
* Make sure the SeaTunnel job can submit successfully on spark engine.
*/
@Slf4j
-public class KafkaSourceTextToConsoleIT extends SparkContainer {
-
- private static final int KAFKA_PORT = 9093;
-
- private static final String KAFKA_HOST = "kafkaCluster";
-
- private KafkaProducer<byte[], byte[]> producer;
-
- private KafkaContainer kafkaContainer;
-
- @BeforeEach
- public void startKafkaContainer() {
- kafkaContainer = new
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
- .withNetwork(NETWORK)
- .withNetworkAliases(KAFKA_HOST)
- .withLogConsumer(new
Slf4jLogConsumer(DockerLoggerFactory.getLogger("confluentinc/cp-kafka:6.2.1")));
- kafkaContainer.setPortBindings(Lists.newArrayList(
- String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
- Startables.deepStart(Stream.of(kafkaContainer)).join();
- log.info("Kafka container started");
- Awaitility.given().ignoreExceptions()
- .atLeast(100, TimeUnit.MILLISECONDS)
- .pollInterval(500, TimeUnit.MILLISECONDS)
- .atMost(180, TimeUnit.SECONDS)
- .untilAsserted(() -> initKafkaProducer());
- generateTestData();
- }
+public class KafkaSourceTextToConsoleIT extends KafkaTestBaseIT {
@SuppressWarnings("checkstyle:Indentation")
- private void generateTestData() {
+ protected void generateTestData() {
SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
new String[]{
@@ -163,22 +122,4 @@ public class KafkaSourceTextToConsoleIT extends
SparkContainer {
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
}
- private void initKafkaProducer() {
- Properties props = new Properties();
- String bootstrapServers = kafkaContainer.getBootstrapServers();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
- producer = new KafkaProducer<>(props);
- }
-
- @AfterEach
- public void close() {
- if (producer != null) {
- producer.close();
- }
- if (kafkaContainer != null) {
- kafkaContainer.close();
- }
- }
}
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaTestBaseIT.java
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaTestBaseIT.java
new file mode 100644
index 000000000..b6d5354a9
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaTestBaseIT.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.kafka;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.com.google.common.collect.Lists;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class KafkaTestBaseIT extends SparkContainer {
+ protected static final int KAFKA_PORT = 9094;
+
+ protected static final String KAFKA_HOST = "kafkaCluster";
+
+ protected KafkaProducer<byte[], byte[]> producer;
+
+ protected KafkaContainer kafkaContainer;
+
+ @BeforeEach
+ public void startKafkaContainer() {
+ kafkaContainer = new
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
+ .withNetwork(NETWORK)
+ .withNetworkAliases(KAFKA_HOST)
+ .withLogConsumer(new
Slf4jLogConsumer(DockerLoggerFactory.getLogger("confluentinc/cp-kafka:6.2.1")));
+ kafkaContainer.setPortBindings(Lists.newArrayList(
+ String.format("%s:%s", KAFKA_PORT, KAFKA_PORT)));
+ Startables.deepStart(Stream.of(kafkaContainer)).join();
+ log.info("Kafka container started");
+ Awaitility.given().ignoreExceptions()
+ .atLeast(100, TimeUnit.MILLISECONDS)
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(180, TimeUnit.SECONDS)
+ .untilAsserted(() -> initKafkaProducer());
+ generateTestData();
+ }
+
+ protected void initKafkaProducer() {
+ Properties props = new Properties();
+ String bootstrapServers = kafkaContainer.getBootstrapServers();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
+ producer = new KafkaProducer<>(props);
+ }
+
+ @SuppressWarnings("checkstyle:Indentation")
+ protected void generateTestData() {
+
+ }
+
+ @AfterEach
+ public void close() {
+ if (producer != null) {
+ producer.close();
+ }
+ if (kafkaContainer != null) {
+ kafkaContainer.close();
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
similarity index 53%
copy from
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
copy to
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
index 91c920237..c1674e192 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf
@@ -20,37 +20,24 @@
env {
# You can set spark configuration here
- spark.app.name = "SeaTunnel"
+ job.name = "SeaTunnel"
source.parallelism = 1
job.mode = "BATCH"
}
-
source {
Kafka {
- bootstrap.servers = "kafkaCluster:9093"
+ bootstrap.servers = "kafkaCluster:9094"
topic = "test_topic"
result_table_name = "kafka_table"
- kafka.auto.offset.reset = "earliest"
+ # The default format is json, which is optional
+ format = json
+ start_mode = earliest
schema = {
- fields {
- id = bigint
- c_map = "map<string, smallint>"
- c_array = "array<tinyint>"
- c_string = string
- c_boolean = boolean
- c_tinyint = tinyint
- c_smallint = smallint
- c_int = int
- c_bigint = bigint
- c_float = float
- c_double = double
- c_decimal = "decimal(2, 1)"
- c_bytes = bytes
- c_date = date
- c_timestamp = timestamp
- }
- }
+ fields {
+ id = bigint
+ }
+ }
}
# If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
@@ -58,33 +45,31 @@ source {
}
transform {
-}
+ }
-sink {
- Console {}
- Assert {
- rules =
- {
- field_rules = [
- {
- field_name = id
- field_type = long
- field_value = [
- {
- rule_type = NOT_NULL
- },
- {
- rule_type = MIN
- rule_value = 0
- },
- {
- rule_type = MAX
- rule_value = 99
- }
- ]
- }
- ]
- }
+sink {
+ Console {}
+
+ Assert {
+ rules =
+ {
+ field_rules = [
+ {
+ field_name = id
+ field_type = long
+ field_value = [
- }
-}
\ No newline at end of file
+ {
+ rule_type = MIN
+ rule_value = 0
+ },
+ {
+ rule_type = MAX
+ rule_value = 99
+ }
+ ]
+ }
+ ]
+ }
+ }
+ }
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
similarity index 53%
copy from
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
copy to
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
index 91c920237..336d364c0 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf
@@ -20,37 +20,24 @@
env {
# You can set spark configuration here
- spark.app.name = "SeaTunnel"
+ job.name = "SeaTunnel"
source.parallelism = 1
job.mode = "BATCH"
}
-
source {
Kafka {
- bootstrap.servers = "kafkaCluster:9093"
+ bootstrap.servers = "kafkaCluster:9094"
topic = "test_topic"
result_table_name = "kafka_table"
- kafka.auto.offset.reset = "earliest"
+ # The default format is json, which is optional
+ format = json
+ start_mode = group_offsets
schema = {
- fields {
- id = bigint
- c_map = "map<string, smallint>"
- c_array = "array<tinyint>"
- c_string = string
- c_boolean = boolean
- c_tinyint = tinyint
- c_smallint = smallint
- c_int = int
- c_bigint = bigint
- c_float = float
- c_double = double
- c_decimal = "decimal(2, 1)"
- c_bytes = bytes
- c_date = date
- c_timestamp = timestamp
- }
- }
+ fields {
+ id = bigint
+ }
+ }
}
# If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
@@ -58,33 +45,31 @@ source {
}
transform {
-}
+ }
-sink {
- Console {}
- Assert {
- rules =
- {
- field_rules = [
- {
- field_name = id
- field_type = long
- field_value = [
- {
- rule_type = NOT_NULL
- },
- {
- rule_type = MIN
- rule_value = 0
- },
- {
- rule_type = MAX
- rule_value = 99
- }
- ]
- }
- ]
- }
+sink {
+ Console {}
+
+ Assert {
+ rules =
+ {
+ field_rules = [
+ {
+ field_name = id
+ field_type = long
+ field_value = [
- }
-}
\ No newline at end of file
+ {
+ rule_type = MIN
+ rule_value = 100
+ },
+ {
+ rule_type = MAX
+ rule_value = 149
+ }
+ ]
+ }
+ ]
+ }
+ }
+ }
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
index 91c920237..cf5c67743 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
@@ -20,7 +20,7 @@
env {
# You can set spark configuration here
- spark.app.name = "SeaTunnel"
+ job.name = "SeaTunnel"
source.parallelism = 1
job.mode = "BATCH"
}
@@ -28,7 +28,7 @@ env {
source {
Kafka {
- bootstrap.servers = "kafkaCluster:9093"
+ bootstrap.servers = "kafkaCluster:9094"
topic = "test_topic"
result_table_name = "kafka_table"
kafka.auto.offset.reset = "earliest"
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
similarity index 53%
copy from
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
copy to
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
index 91c920237..0c20ebc9a 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf
@@ -20,37 +20,24 @@
env {
# You can set spark configuration here
- spark.app.name = "SeaTunnel"
+ job.name = "SeaTunnel"
source.parallelism = 1
job.mode = "BATCH"
}
-
source {
Kafka {
- bootstrap.servers = "kafkaCluster:9093"
+ bootstrap.servers = "kafkaCluster:9094"
topic = "test_topic"
result_table_name = "kafka_table"
- kafka.auto.offset.reset = "earliest"
+ # The default format is json, which is optional
+ format = json
+ start_mode = latest
schema = {
- fields {
- id = bigint
- c_map = "map<string, smallint>"
- c_array = "array<tinyint>"
- c_string = string
- c_boolean = boolean
- c_tinyint = tinyint
- c_smallint = smallint
- c_int = int
- c_bigint = bigint
- c_float = float
- c_double = double
- c_decimal = "decimal(2, 1)"
- c_bytes = bytes
- c_date = date
- c_timestamp = timestamp
- }
- }
+ fields {
+ id = bigint
+ }
+ }
}
# If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
@@ -58,33 +45,31 @@ source {
}
transform {
-}
+ }
-sink {
- Console {}
- Assert {
- rules =
- {
- field_rules = [
- {
- field_name = id
- field_type = long
- field_value = [
- {
- rule_type = NOT_NULL
- },
- {
- rule_type = MIN
- rule_value = 0
- },
- {
- rule_type = MAX
- rule_value = 99
- }
- ]
- }
- ]
- }
+sink {
+ Console {}
+
+ Assert {
+ rules =
+ {
+ field_rules = [
+ {
+ field_name = id
+ field_type = long
+ field_value = [
- }
-}
\ No newline at end of file
+ {
+ rule_type = MIN
+ rule_value = 99
+ },
+ {
+ rule_type = MAX
+ rule_value = 99
+ }
+ ]
+ }
+ ]
+ }
+ }
+ }
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
similarity index 53%
copy from
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
copy to
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
index 91c920237..8f6f00a68 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf
@@ -20,37 +20,28 @@
env {
# You can set spark configuration here
- spark.app.name = "SeaTunnel"
+ job.name = "SeaTunnel"
source.parallelism = 1
job.mode = "BATCH"
}
-
source {
Kafka {
- bootstrap.servers = "kafkaCluster:9093"
+ bootstrap.servers = "kafkaCluster:9094"
topic = "test_topic"
result_table_name = "kafka_table"
- kafka.auto.offset.reset = "earliest"
+ # The default format is json, which is optional
+ format = json
+ start_mode = specific_offsets
schema = {
- fields {
- id = bigint
- c_map = "map<string, smallint>"
- c_array = "array<tinyint>"
- c_string = string
- c_boolean = boolean
- c_tinyint = tinyint
- c_smallint = smallint
- c_int = int
- c_bigint = bigint
- c_float = float
- c_double = double
- c_decimal = "decimal(2, 1)"
- c_bytes = bytes
- c_date = date
- c_timestamp = timestamp
- }
- }
+ fields {
+ id = bigint
+ }
+ }
+
+ start_mode.offsets = {
+ test_topic-0 = 50
+ }
}
# If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
@@ -58,33 +49,31 @@ source {
}
transform {
-}
+ }
-sink {
- Console {}
- Assert {
- rules =
- {
- field_rules = [
- {
- field_name = id
- field_type = long
- field_value = [
- {
- rule_type = NOT_NULL
- },
- {
- rule_type = MIN
- rule_value = 0
- },
- {
- rule_type = MAX
- rule_value = 99
- }
- ]
- }
- ]
- }
+sink {
+ Console {}
+
+ Assert {
+ rules =
+ {
+ field_rules = [
+ {
+ field_name = id
+ field_type = long
+ field_value = [
- }
-}
\ No newline at end of file
+ {
+ rule_type = MIN
+ rule_value = 50
+ },
+ {
+ rule_type = MAX
+ rule_value = 99
+ }
+ ]
+ }
+ ]
+ }
+ }
+ }
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf
index 369f34a2f..94af38e64 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf
@@ -20,7 +20,7 @@
env {
# You can set spark configuration here
- spark.app.name = "SeaTunnel"
+ job.name = "SeaTunnel"
source.parallelism = 1
job.mode = "BATCH"
}
@@ -28,7 +28,7 @@ env {
source {
Kafka {
- bootstrap.servers = "kafkaCluster:9093"
+ bootstrap.servers = "kafkaCluster:9094"
topic = "test_topic"
result_table_name = "kafka_table"
kafka.auto.offset.reset = "earliest"
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
similarity index 53%
copy from
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
copy to
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
index 91c920237..e7ec35c7d 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf
@@ -20,37 +20,25 @@
env {
# You can set spark configuration here
- spark.app.name = "SeaTunnel"
+ job.app.name = "SeaTunnel"
source.parallelism = 1
job.mode = "BATCH"
}
-
source {
Kafka {
- bootstrap.servers = "kafkaCluster:9093"
+ bootstrap.servers = "kafkaCluster:9094"
topic = "test_topic"
result_table_name = "kafka_table"
- kafka.auto.offset.reset = "earliest"
+ # The default format is json, which is optional
+ format = json
+ start_mode = timestamp
schema = {
- fields {
- id = bigint
- c_map = "map<string, smallint>"
- c_array = "array<tinyint>"
- c_string = string
- c_boolean = boolean
- c_tinyint = tinyint
- c_smallint = smallint
- c_int = int
- c_bigint = bigint
- c_float = float
- c_double = double
- c_decimal = "decimal(2, 1)"
- c_bytes = bytes
- c_date = date
- c_timestamp = timestamp
- }
- }
+ fields {
+ id = bigint
+ }
+ }
+ start_mode.timestamp = 1667179890315
}
# If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
@@ -58,33 +46,31 @@ source {
}
transform {
-}
+ }
-sink {
- Console {}
- Assert {
- rules =
- {
- field_rules = [
- {
- field_name = id
- field_type = long
- field_value = [
- {
- rule_type = NOT_NULL
- },
- {
- rule_type = MIN
- rule_value = 0
- },
- {
- rule_type = MAX
- rule_value = 99
- }
- ]
- }
- ]
- }
+sink {
+ Console {}
+
+ Assert {
+ rules =
+ {
+ field_rules = [
+ {
+ field_name = id
+ field_type = long
+ field_value = [
- }
-}
\ No newline at end of file
+ {
+ rule_type = MIN
+ rule_value = 0
+ },
+ {
+ rule_type = MAX
+ rule_value = 149
+ }
+ ]
+ }
+ ]
+ }
+ }
+ }
\ No newline at end of file