This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2484af7e07 [Flink] Fix ReadOperator to stop reading after LIMIT on 
dedicated split path. (#7991)
2484af7e07 is described below

commit 2484af7e072548427266991fe3ccc738937682e6
Author: wangwj <[email protected]>
AuthorDate: Mon Jun 1 20:53:08 2026 +0800

    [Flink] Fix ReadOperator to stop reading after LIMIT on dedicated split 
path. (#7991)
---
 .../paimon/flink/source/operator/ReadOperator.java |  15 +-
 .../operator/DedicatedSplitReadLimitTest.java      | 163 +++++++++++++++++++++
 .../flink/source/operator/OperatorSourceTest.java  |   8 +-
 3 files changed, 174 insertions(+), 12 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
index a9b9767041..30b2ab0d62 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.flink.FlinkRowData;
 import org.apache.paimon.flink.NestedProjectedRowData;
+import org.apache.paimon.flink.source.RecordLimiter;
 import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.Split;
@@ -68,6 +69,7 @@ public class ReadOperator extends 
AbstractStreamOperator<RowData>
     private transient long emitEventTimeLag = 
FileStoreSourceReaderMetrics.UNDEFINED;
     private transient long idleStartTime = FileStoreSourceReaderMetrics.ACTIVE;
     private transient Counter numRecordsIn;
+    @Nullable private transient RecordLimiter recordLimiter;
     @Nullable private final Long limit;
 
     public ReadOperator(
@@ -98,6 +100,7 @@ public class ReadOperator extends 
AbstractStreamOperator<RowData>
                                 .getIOManager()
                                 .getSpillingDirectoriesPaths());
         this.read = readSupplier.get().withIOManager(ioManager);
+        this.recordLimiter = RecordLimiter.create(limit);
         this.reuseRow = new FlinkRowData(null);
         this.reuseRecord = new StreamRecord<>(null);
         this.idlingStarted();
@@ -122,7 +125,7 @@ public class ReadOperator extends 
AbstractStreamOperator<RowData>
         boolean firstRecord = true;
         try (CloseableIterator<InternalRow> iterator =
                 read.createReader(split).toCloseableIterator()) {
-            while (iterator.hasNext()) {
+            while (!reachLimit() && iterator.hasNext()) {
                 emitEventTimeLag = System.currentTimeMillis() - eventTime;
 
                 // each Split is already counted as one input record,
@@ -133,10 +136,6 @@ public class ReadOperator extends 
AbstractStreamOperator<RowData>
                     numRecordsIn.inc();
                 }
 
-                if (reachLimit()) {
-                    return;
-                }
-
                 reuseRow.replace(iterator.next());
                 if (nestedProjectedRowData == null) {
                     reuseRecord.replace(reuseRow);
@@ -145,6 +144,10 @@ public class ReadOperator extends 
AbstractStreamOperator<RowData>
                     reuseRecord.replace(nestedProjectedRowData);
                 }
                 output.collect(reuseRecord);
+
+                if (recordLimiter != null) {
+                    recordLimiter.increment();
+                }
             }
         }
         // start idle when data sending is completed
@@ -160,7 +163,7 @@ public class ReadOperator extends 
AbstractStreamOperator<RowData>
     }
 
     private boolean reachLimit() {
-        if (limit != null && numRecordsIn.getCount() > limit) {
+        if (recordLimiter != null && recordLimiter.reachLimit()) {
             LOG.info("Reader {} reach the limit record {}.", this, limit);
             return true;
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/DedicatedSplitReadLimitTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/DedicatedSplitReadLimitTest.java
new file mode 100644
index 0000000000..9ef7757dfa
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/DedicatedSplitReadLimitTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.operator;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.metrics.MetricRegistry;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataTypes;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests {@link ReadOperator} limit on the dedicated split read path. */
+public class DedicatedSplitReadLimitTest {
+
+    private static final int LIMIT = 10;
+
+    @TempDir Path tempDir;
+
+    private Table table;
+
+    @BeforeEach
+    public void before()
+            throws Catalog.TableAlreadyExistException, 
Catalog.DatabaseNotExistException,
+                    Catalog.TableNotExistException, 
Catalog.DatabaseAlreadyExistException {
+        Catalog catalog =
+                CatalogFactory.createCatalog(
+                        CatalogContext.create(new 
org.apache.paimon.fs.Path(tempDir.toUri())));
+        Schema schema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.INT())
+                        .column("c", DataTypes.INT())
+                        .primaryKey("a")
+                        .option("bucket", "1")
+                        .build();
+        Identifier identifier = Identifier.create("default", "t");
+        catalog.createDatabase("default", false);
+        catalog.createTable(identifier, schema, false);
+        this.table = catalog.getTable(identifier);
+    }
+
+    @Test
+    public void testReadOperatorStopsAfterLimit() throws Exception {
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+        BatchTableWrite write = writeBuilder.newWrite();
+        for (int i = 0; i < 100; i++) {
+            write.write(GenericRow.of(i, i, i));
+        }
+        BatchTableCommit commit = writeBuilder.newCommit();
+        commit.commit(write.prepareCommit());
+        write.close();
+        commit.close();
+
+        ReadBatchCountingRead countingRead =
+                new ReadBatchCountingRead(table.newReadBuilder().newRead());
+        ReadOperator readOperator = new ReadOperator(() -> countingRead, null, 
(long) LIMIT);
+
+        OneInputStreamOperatorTestHarness<Split, RowData> harness =
+                new OneInputStreamOperatorTestHarness<>(readOperator);
+        harness.setup(
+                InternalSerializers.create(
+                        RowType.of(new IntType(), new IntType(), new 
IntType())));
+        harness.open();
+        for (Split split : table.newReadBuilder().newScan().plan().splits()) {
+            harness.processElement(new StreamRecord<>(split));
+        }
+
+        assertThat(harness.getOutput()).hasSize(LIMIT);
+        assertThat(countingRead.readBatchInvocations()).isEqualTo(1);
+    }
+
+    private static class ReadBatchCountingRead implements TableRead {
+
+        private final TableRead delegate;
+        private final AtomicInteger readBatchInvocations = new AtomicInteger();
+
+        private ReadBatchCountingRead(TableRead delegate) {
+            this.delegate = delegate;
+        }
+
+        int readBatchInvocations() {
+            return readBatchInvocations.get();
+        }
+
+        @Override
+        public TableRead withMetricRegistry(MetricRegistry registry) {
+            delegate.withMetricRegistry(registry);
+            return this;
+        }
+
+        @Override
+        public TableRead executeFilter() {
+            delegate.executeFilter();
+            return this;
+        }
+
+        @Override
+        public TableRead withIOManager(IOManager ioManager) {
+            delegate.withIOManager(ioManager);
+            return this;
+        }
+
+        @Override
+        public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
+            RecordReader<InternalRow> reader = delegate.createReader(split);
+            return new RecordReader<InternalRow>() {
+                @Override
+                public RecordIterator<InternalRow> readBatch() throws 
IOException {
+                    readBatchInvocations.incrementAndGet();
+                    return reader.readBatch();
+                }
+
+                @Override
+                public void close() throws IOException {
+                    reader.close();
+                }
+            };
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
index e4ab4ec157..9c57f27b86 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
@@ -238,15 +238,11 @@ public class OperatorSourceTest {
         }
         ArrayList<Object> values = new ArrayList<>(harness.getOutput());
 
-        // In ReadOperator each Split is already counted as one input record. 
But in this case it
-        // will not happen.
-        // So in this case the result values's size if 3 even if the limit is 
2.
-        // The IT case see 
BatchFileStoreITCase#testBatchReadSourceWithSnapshot.
+        // ReadOperator limit is enforced on emitted records.
         assertThat(values)
                 .containsExactlyInAnyOrder(
                         new StreamRecord<>(GenericRowData.of(1, 1, 1)),
-                        new StreamRecord<>(GenericRowData.of(2, 2, 2)),
-                        new StreamRecord<>(GenericRowData.of(3, 3, 3)));
+                        new StreamRecord<>(GenericRowData.of(2, 2, 2)));
     }
 
     @Test

Reply via email to