This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 680608bb48 [flink] Remove useless blocking queue in 
AlignedSourceReader (#5119)
680608bb48 is described below

commit 680608bb48dd2cb617166a1069f6706f250b3c25
Author: tsreaper <[email protected]>
AuthorDate: Thu Feb 20 16:54:43 2025 +0800

    [flink] Remove useless blocking queue in AlignedSourceReader (#5119)
---
 .../flink/source/align/AlignedContinuousFileStoreSource.java  | 11 +----------
 .../apache/paimon/flink/source/align/AlignedSourceReader.java | 10 ----------
 .../paimon/flink/source/align/AlignedSourceReaderTest.java    |  2 --
 3 files changed, 1 insertion(+), 22 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
index 63b3f63f7f..1b3e7b5b19 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
@@ -35,8 +35,6 @@ import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
-import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 import org.apache.flink.table.data.RowData;
 
 import javax.annotation.Nullable;
@@ -68,14 +66,7 @@ public class AlignedContinuousFileStoreSource extends 
ContinuousFileStoreSource
         FileStoreSourceReaderMetrics sourceReaderMetrics =
                 new FileStoreSourceReaderMetrics(context.metricGroup());
         return new AlignedSourceReader(
-                context,
-                readBuilder.newRead(),
-                sourceReaderMetrics,
-                ioManager,
-                limit,
-                new FutureCompletingBlockingQueue<>(
-                        
context.getConfiguration().get(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)),
-                rowData);
+                context, readBuilder.newRead(), sourceReaderMetrics, 
ioManager, limit, rowData);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
index 7d6f47296a..38e5a37e77 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
@@ -29,9 +29,6 @@ import org.apache.paimon.table.source.TableRead;
 import org.apache.flink.api.connector.source.ExternallyInducedSourceReader;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceReaderContext;
-import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
-import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.table.data.RowData;
 
 import javax.annotation.Nullable;
@@ -46,9 +43,6 @@ import java.util.Optional;
 public class AlignedSourceReader extends FileStoreSourceReader
         implements ExternallyInducedSourceReader<RowData, 
FileStoreSourceSplit> {
 
-    private final FutureCompletingBlockingQueue<
-                    RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>>
-            elementsQueue;
     private Long nextCheckpointId;
 
     public AlignedSourceReader(
@@ -57,11 +51,8 @@ public class AlignedSourceReader extends 
FileStoreSourceReader
             FileStoreSourceReaderMetrics metrics,
             IOManager ioManager,
             @Nullable Long limit,
-            
FutureCompletingBlockingQueue<RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>>
-                    elementsQueue,
             @Nullable NestedProjectedRowData rowData) {
         super(readerContext, tableRead, metrics, ioManager, limit, rowData);
-        this.elementsQueue = elementsQueue;
         this.nextCheckpointId = null;
     }
 
@@ -69,7 +60,6 @@ public class AlignedSourceReader extends FileStoreSourceReader
     public void handleSourceEvents(SourceEvent sourceEvent) {
         if (sourceEvent instanceof CheckpointEvent) {
             nextCheckpointId = ((CheckpointEvent) 
sourceEvent).getCheckpointId();
-            elementsQueue.notifyAvailable();
         } else {
             super.handleSourceEvents(sourceEvent);
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
index f815dbe632..00e1babcff 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
@@ -24,7 +24,6 @@ import 
org.apache.paimon.flink.source.FileStoreSourceReaderTest;
 import org.apache.paimon.flink.source.TestChangelogDataReadWrite;
 import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
 
-import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
 import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
 import org.apache.flink.table.data.RowData;
@@ -77,7 +76,6 @@ public class AlignedSourceReaderTest extends 
FileStoreSourceReaderTest {
                 new FileStoreSourceReaderMetrics(new DummyMetricGroup()),
                 IOManager.create(tempDir.toString()),
                 null,
-                new FutureCompletingBlockingQueue<>(2),
                 null);
     }
 }

Reply via email to