This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c203ef5f8 [hotfix][kafka] Fix the problem that the partition 
information cannot be obtained when kafka is restored (#4764)
c203ef5f8 is described below

commit c203ef5f8d6ad8bfb7f284e32a9ce5f2bc1fe142
Author: Guangdong Liu <[email protected]>
AuthorDate: Tue Jun 6 10:23:56 2023 +0800

    [hotfix][kafka] Fix the problem that the partition information cannot be 
obtained when kafka is restored (#4764)
---
 .../kafka/source/KafkaSourceSplitEnumerator.java   | 34 ++++++++++------------
 .../context/SeaTunnelSplitEnumeratorContext.java   |  7 +++++
 .../server/task/flow/SourceFlowLifeCycle.java      |  8 ++---
 3 files changed, 26 insertions(+), 23 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
index 7f5d88762..f1de236cf 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
@@ -36,8 +36,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
@@ -57,18 +59,19 @@ public class KafkaSourceSplitEnumerator
     private final ConsumerMetadata metadata;
     private final Context<KafkaSourceSplit> context;
     private long discoveryIntervalMillis;
-    private AdminClient adminClient;
+    private final AdminClient adminClient;
 
-    private Map<TopicPartition, KafkaSourceSplit> pendingSplit;
+    private final Map<TopicPartition, KafkaSourceSplit> pendingSplit;
     private final Map<TopicPartition, KafkaSourceSplit> assignedSplit;
     private ScheduledExecutorService executor;
-    private ScheduledFuture scheduledFuture;
+    private ScheduledFuture<?> scheduledFuture;
 
     KafkaSourceSplitEnumerator(ConsumerMetadata metadata, 
Context<KafkaSourceSplit> context) {
         this.metadata = metadata;
         this.context = context;
         this.assignedSplit = new HashMap<>();
         this.pendingSplit = new HashMap<>();
+        this.adminClient = initAdminClient(this.metadata.getProperties());
     }
 
     KafkaSourceSplitEnumerator(
@@ -97,7 +100,6 @@ public class KafkaSourceSplitEnumerator
 
     @Override
     public void open() {
-        this.adminClient = initAdminClient(this.metadata.getProperties());
         if (discoveryIntervalMillis > 0) {
             this.executor =
                     Executors.newScheduledThreadPool(
@@ -180,7 +182,6 @@ public class KafkaSourceSplitEnumerator
     public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) {
         if (!splits.isEmpty()) {
             pendingSplit.putAll(convertToNextSplit(splits));
-            assignSplit();
         }
     }
 
@@ -191,6 +192,7 @@ public class KafkaSourceSplitEnumerator
                     listOffsets(
                             splits.stream()
                                     .map(KafkaSourceSplit::getTopicPartition)
+                                    .filter(Objects::nonNull)
                                     .collect(Collectors.toList()),
                             OffsetSpec.latest());
             splits.forEach(
@@ -199,7 +201,7 @@ public class KafkaSourceSplitEnumerator
                         
split.setEndOffset(listOffsets.get(split.getTopicPartition()));
                     });
             return splits.stream()
-                    .collect(Collectors.toMap(split -> 
split.getTopicPartition(), split -> split));
+                    
.collect(Collectors.toMap(KafkaSourceSplit::getTopicPartition, split -> split));
         } catch (Exception e) {
             throw new KafkaConnectorException(
                     
KafkaConnectorErrorCode.ADD_SPLIT_BACK_TO_ENUMERATOR_FAILED, e);
@@ -225,7 +227,7 @@ public class KafkaSourceSplitEnumerator
 
     @Override
     public KafkaSourceState snapshotState(long checkpointId) throws Exception {
-        return new 
KafkaSourceState(assignedSplit.values().stream().collect(Collectors.toSet()));
+        return new KafkaSourceState(new HashSet<>(assignedSplit.values()));
     }
 
     @Override
@@ -291,18 +293,12 @@ public class KafkaSourceSplitEnumerator
             readySplit.computeIfAbsent(taskID, id -> new ArrayList<>());
         }
 
-        pendingSplit
-                .entrySet()
-                .forEach(
-                        s -> {
-                            if (!assignedSplit.containsKey(s.getKey())) {
-                                readySplit
-                                        .get(
-                                                getSplitOwner(
-                                                        s.getKey(), 
context.currentParallelism()))
-                                        .add(s.getValue());
-                            }
-                        });
+        pendingSplit.forEach(
+                (key, value) -> {
+                    if (!assignedSplit.containsKey(key)) {
+                        readySplit.get(getSplitOwner(key, 
context.currentParallelism())).add(value);
+                    }
+                });
 
         readySplit.forEach(
                 (id, split) -> {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
index abb58a4f1..c3cce03d3 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
@@ -25,11 +25,14 @@ import org.apache.seatunnel.common.utils.SerializationUtils;
 import org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask;
 import 
org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation;
 
+import lombok.extern.slf4j.Slf4j;
+
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+@Slf4j
 public class SeaTunnelSplitEnumeratorContext<SplitT extends SourceSplit>
         implements SourceSplitEnumerator.Context<SplitT> {
 
@@ -60,6 +63,10 @@ public class SeaTunnelSplitEnumeratorContext<SplitT extends 
SourceSplit>
 
     @Override
     public void assignSplit(int subtaskIndex, List<SplitT> splits) {
+        if (registeredReaders().isEmpty()) {
+            log.warn("No reader is obtained, skip this assign!");
+            return;
+        }
         task.getExecutionContext()
                 .sendToMember(
                         new AssignSplitOperation<>(
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
index 8430f6898..e4928343c 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
@@ -168,7 +168,7 @@ public class SourceFlowLifeCycle<T, SplitT extends 
SourceSplit> extends ActionFl
                             enumeratorTaskAddress)
                     .get();
         } catch (InterruptedException | ExecutionException e) {
-            log.warn("source register failed {}", e);
+            log.warn("source register failed.", e);
             throw new RuntimeException(e);
         }
     }
@@ -182,7 +182,7 @@ public class SourceFlowLifeCycle<T, SplitT extends 
SourceSplit> extends ActionFl
                             enumeratorTaskAddress)
                     .get();
         } catch (InterruptedException | ExecutionException e) {
-            log.warn("source request split failed [{}]", e);
+            log.warn("source request split failed.", e);
             throw new RuntimeException(e);
         }
     }
@@ -197,7 +197,7 @@ public class SourceFlowLifeCycle<T, SplitT extends 
SourceSplit> extends ActionFl
                             enumeratorTaskAddress)
                     .get();
         } catch (InterruptedException | ExecutionException e) {
-            log.warn("source request split failed {}", e);
+            log.warn("source request split failed.", e);
             throw new RuntimeException(e);
         }
     }
@@ -263,7 +263,7 @@ public class SourceFlowLifeCycle<T, SplitT extends 
SourceSplit> extends ActionFl
                             enumeratorTaskAddress)
                     .get();
         } catch (InterruptedException | ExecutionException e) {
-            log.warn("source request split failed {}", e);
+            log.warn("source request split failed.", e);
             throw new RuntimeException(e);
         }
     }

Reply via email to