Hisoka-X commented on code in PR #5100:
URL: https://github.com/apache/seatunnel/pull/5100#discussion_r1413318044


##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/FileSourceSplitEnumerator.java:
##########
@@ -54,15 +57,18 @@ public FileSourceSplitEnumerator(
 
     @Override
     public void open() {
-        this.pendingSplit = new HashSet<>();
+        this.pendingSplit.addAll(discoverySplits());
     }
 
     @Override
     public void run() {
-        // do nothing
+        for (int i = 0; i < context.currentParallelism(); i++) {
+            LOGGER.info("Assigned splits to reader [{}]", i);
+            assignSplit(i);
+        }

Review Comment:
   when invoke `run`, does not mean the reader already started. So why we need 
do it after `registerReader`. 



##########
seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceSplitEnumerator.java:
##########
@@ -103,6 +100,7 @@ private void assignSplit(Set<Integer> readers) {
                     pendingSplits.put(reader, assignmentForReader);
                 }
             }
+            enumeratorContext.signalNoMoreSplits(reader);
         }

Review Comment:
   This method will be invoked when restore too. 
https://github.com/apache/seatunnel/pull/5100/files#diff-50ae2032ed74589f0bf5dbfca6e5ad7ace518c5647e8b844bf343c7ea0ae3ad1L143
 . So I think move `enumeratorContext.signalNoMoreSplits(reader);` to here not 
a good way.



##########
seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java:
##########
@@ -76,24 +67,14 @@ public List<DataStreamTableInfo> 
execute(List<DataStreamTableInfo> upstreamDataS
             SourceTableInfo sourceTableInfo = plugins.get(i);
             SeaTunnelSource internalSource = sourceTableInfo.getSource();
             Config pluginConfig = pluginConfigs.get(i);
-            BaseSeaTunnelSourceFunction sourceFunction;
             if (internalSource instanceof SupportCoordinate) {
-                sourceFunction = new 
SeaTunnelCoordinatedSource(internalSource, envConfigs);
-
                 registerAppendStream(pluginConfig);
-            } else {
-                sourceFunction = new SeaTunnelParallelSource(internalSource, 
envConfigs);
             }
-            boolean bounded =
-                    internalSource.getBoundedness()
-                            == 
org.apache.seatunnel.api.source.Boundedness.BOUNDED;
+            FlinkSource flinkSource = new FlinkSource<>(internalSource);
 
-            DataStreamSource<Row> sourceStream =
-                    addSource(
-                            executionEnvironment,
-                            sourceFunction,
-                            "SeaTunnel " + 
internalSource.getClass().getSimpleName(),
-                            bounded);
+            DataStreamSource sourceStream =
+                    executionEnvironment.fromSource(
+                            flinkSource, WatermarkStrategy.noWatermarks(), 
"st-source");

Review Comment:
   The name too simple, not clear than old one.



##########
seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java:
##########
@@ -83,16 +84,24 @@ public void close() {
 
     @Override
     @SuppressWarnings("magicnumber")
-    public void pollNext(Collector<SeaTunnelRow> output) {
-        while (!pendingSplits.isEmpty()) {
-            synchronized (output.getCheckpointLock()) {
-                AmazonDynamoDBSourceSplit split = pendingSplits.poll();
+    public void pollNext(Collector<SeaTunnelRow> output) throws 
InterruptedException {
+        synchronized (output.getCheckpointLock()) {
+            AmazonDynamoDBSourceSplit split = pendingSplits.poll();
+            if (split == null) {
+                log.info(
+                        "AmazonDynamoDB Source Reader [{}] waiting for splits",
+                        context.getIndexOfSubtask());
+                if (noMoreSplit) {
+                    // signal to the source that we have reached the end of 
the data.
+                    log.info("Closed the bounded amazonDynamodb source");
+                    context.signalNoMoreElement();
+                    Thread.sleep(2000L);

Review Comment:
   Why need `Thread.sleep(2000L);` ?



##########
seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/AbstractSingleSplitReader.java:
##########
@@ -25,6 +26,23 @@
 
 public abstract class AbstractSingleSplitReader<T> implements SourceReader<T, 
SingleSplit> {
 
+    protected final Object lock = new Object();
+
+    protected volatile boolean noMoreSplits = false;
+
+    @Override
+    public void pollNext(Collector<T> output) throws Exception {
+        synchronized (lock) {

Review Comment:
   Why we need a lock in here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to