This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c203ef5f8 [hotfix][kafka] Fix the problem that the partition
information cannot be obtained when kafka is restored (#4764)
c203ef5f8 is described below
commit c203ef5f8d6ad8bfb7f284e32a9ce5f2bc1fe142
Author: Guangdong Liu <[email protected]>
AuthorDate: Tue Jun 6 10:23:56 2023 +0800
[hotfix][kafka] Fix the problem that the partition information cannot be
obtained when kafka is restored (#4764)
---
.../kafka/source/KafkaSourceSplitEnumerator.java | 34 ++++++++++------------
.../context/SeaTunnelSplitEnumeratorContext.java | 7 +++++
.../server/task/flow/SourceFlowLifeCycle.java | 8 ++---
3 files changed, 26 insertions(+), 23 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
index 7f5d88762..f1de236cf 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
@@ -36,8 +36,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
@@ -57,18 +59,19 @@ public class KafkaSourceSplitEnumerator
private final ConsumerMetadata metadata;
private final Context<KafkaSourceSplit> context;
private long discoveryIntervalMillis;
- private AdminClient adminClient;
+ private final AdminClient adminClient;
- private Map<TopicPartition, KafkaSourceSplit> pendingSplit;
+ private final Map<TopicPartition, KafkaSourceSplit> pendingSplit;
private final Map<TopicPartition, KafkaSourceSplit> assignedSplit;
private ScheduledExecutorService executor;
- private ScheduledFuture scheduledFuture;
+ private ScheduledFuture<?> scheduledFuture;
KafkaSourceSplitEnumerator(ConsumerMetadata metadata,
Context<KafkaSourceSplit> context) {
this.metadata = metadata;
this.context = context;
this.assignedSplit = new HashMap<>();
this.pendingSplit = new HashMap<>();
+ this.adminClient = initAdminClient(this.metadata.getProperties());
}
KafkaSourceSplitEnumerator(
@@ -97,7 +100,6 @@ public class KafkaSourceSplitEnumerator
@Override
public void open() {
- this.adminClient = initAdminClient(this.metadata.getProperties());
if (discoveryIntervalMillis > 0) {
this.executor =
Executors.newScheduledThreadPool(
@@ -180,7 +182,6 @@ public class KafkaSourceSplitEnumerator
public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) {
if (!splits.isEmpty()) {
pendingSplit.putAll(convertToNextSplit(splits));
- assignSplit();
}
}
@@ -191,6 +192,7 @@ public class KafkaSourceSplitEnumerator
listOffsets(
splits.stream()
.map(KafkaSourceSplit::getTopicPartition)
+ .filter(Objects::nonNull)
.collect(Collectors.toList()),
OffsetSpec.latest());
splits.forEach(
@@ -199,7 +201,7 @@ public class KafkaSourceSplitEnumerator
split.setEndOffset(listOffsets.get(split.getTopicPartition()));
});
return splits.stream()
- .collect(Collectors.toMap(split ->
split.getTopicPartition(), split -> split));
+
.collect(Collectors.toMap(KafkaSourceSplit::getTopicPartition, split -> split));
} catch (Exception e) {
throw new KafkaConnectorException(
KafkaConnectorErrorCode.ADD_SPLIT_BACK_TO_ENUMERATOR_FAILED, e);
@@ -225,7 +227,7 @@ public class KafkaSourceSplitEnumerator
@Override
public KafkaSourceState snapshotState(long checkpointId) throws Exception {
- return new
KafkaSourceState(assignedSplit.values().stream().collect(Collectors.toSet()));
+ return new KafkaSourceState(new HashSet<>(assignedSplit.values()));
}
@Override
@@ -291,18 +293,12 @@ public class KafkaSourceSplitEnumerator
readySplit.computeIfAbsent(taskID, id -> new ArrayList<>());
}
- pendingSplit
- .entrySet()
- .forEach(
- s -> {
- if (!assignedSplit.containsKey(s.getKey())) {
- readySplit
- .get(
- getSplitOwner(
- s.getKey(),
context.currentParallelism()))
- .add(s.getValue());
- }
- });
+ pendingSplit.forEach(
+ (key, value) -> {
+ if (!assignedSplit.containsKey(key)) {
+ readySplit.get(getSplitOwner(key,
context.currentParallelism())).add(value);
+ }
+ });
readySplit.forEach(
(id, split) -> {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
index abb58a4f1..c3cce03d3 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
@@ -25,11 +25,14 @@ import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask;
import
org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation;
+import lombok.extern.slf4j.Slf4j;
+
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+@Slf4j
public class SeaTunnelSplitEnumeratorContext<SplitT extends SourceSplit>
implements SourceSplitEnumerator.Context<SplitT> {
@@ -60,6 +63,10 @@ public class SeaTunnelSplitEnumeratorContext<SplitT extends
SourceSplit>
@Override
public void assignSplit(int subtaskIndex, List<SplitT> splits) {
+ if (registeredReaders().isEmpty()) {
+ log.warn("No reader is obtained, skip this assign!");
+ return;
+ }
task.getExecutionContext()
.sendToMember(
new AssignSplitOperation<>(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
index 8430f6898..e4928343c 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
@@ -168,7 +168,7 @@ public class SourceFlowLifeCycle<T, SplitT extends
SourceSplit> extends ActionFl
enumeratorTaskAddress)
.get();
} catch (InterruptedException | ExecutionException e) {
- log.warn("source register failed {}", e);
+ log.warn("source register failed.", e);
throw new RuntimeException(e);
}
}
@@ -182,7 +182,7 @@ public class SourceFlowLifeCycle<T, SplitT extends
SourceSplit> extends ActionFl
enumeratorTaskAddress)
.get();
} catch (InterruptedException | ExecutionException e) {
- log.warn("source request split failed [{}]", e);
+ log.warn("source request split failed.", e);
throw new RuntimeException(e);
}
}
@@ -197,7 +197,7 @@ public class SourceFlowLifeCycle<T, SplitT extends
SourceSplit> extends ActionFl
enumeratorTaskAddress)
.get();
} catch (InterruptedException | ExecutionException e) {
- log.warn("source request split failed {}", e);
+ log.warn("source request split failed.", e);
throw new RuntimeException(e);
}
}
@@ -263,7 +263,7 @@ public class SourceFlowLifeCycle<T, SplitT extends
SourceSplit> extends ActionFl
enumeratorTaskAddress)
.get();
} catch (InterruptedException | ExecutionException e) {
- log.warn("source request split failed {}", e);
+ log.warn("source request split failed.", e);
throw new RuntimeException(e);
}
}