This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2484af7e07 [Flink] Fix ReadOperator to stop reading after LIMIT on
dedicated split path. (#7991)
2484af7e07 is described below
commit 2484af7e072548427266991fe3ccc738937682e6
Author: wangwj <[email protected]>
AuthorDate: Mon Jun 1 20:53:08 2026 +0800
[Flink] Fix ReadOperator to stop reading after LIMIT on dedicated split
path. (#7991)
---
.../paimon/flink/source/operator/ReadOperator.java | 15 +-
.../operator/DedicatedSplitReadLimitTest.java | 163 +++++++++++++++++++++
.../flink/source/operator/OperatorSourceTest.java | 8 +-
3 files changed, 174 insertions(+), 12 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
index a9b9767041..30b2ab0d62 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.NestedProjectedRowData;
+import org.apache.paimon.flink.source.RecordLimiter;
import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
@@ -68,6 +69,7 @@ public class ReadOperator extends
AbstractStreamOperator<RowData>
private transient long emitEventTimeLag =
FileStoreSourceReaderMetrics.UNDEFINED;
private transient long idleStartTime = FileStoreSourceReaderMetrics.ACTIVE;
private transient Counter numRecordsIn;
+ @Nullable private transient RecordLimiter recordLimiter;
@Nullable private final Long limit;
public ReadOperator(
@@ -98,6 +100,7 @@ public class ReadOperator extends
AbstractStreamOperator<RowData>
.getIOManager()
.getSpillingDirectoriesPaths());
this.read = readSupplier.get().withIOManager(ioManager);
+ this.recordLimiter = RecordLimiter.create(limit);
this.reuseRow = new FlinkRowData(null);
this.reuseRecord = new StreamRecord<>(null);
this.idlingStarted();
@@ -122,7 +125,7 @@ public class ReadOperator extends
AbstractStreamOperator<RowData>
boolean firstRecord = true;
try (CloseableIterator<InternalRow> iterator =
read.createReader(split).toCloseableIterator()) {
- while (iterator.hasNext()) {
+ while (!reachLimit() && iterator.hasNext()) {
emitEventTimeLag = System.currentTimeMillis() - eventTime;
// each Split is already counted as one input record,
@@ -133,10 +136,6 @@ public class ReadOperator extends
AbstractStreamOperator<RowData>
numRecordsIn.inc();
}
- if (reachLimit()) {
- return;
- }
-
reuseRow.replace(iterator.next());
if (nestedProjectedRowData == null) {
reuseRecord.replace(reuseRow);
@@ -145,6 +144,10 @@ public class ReadOperator extends
AbstractStreamOperator<RowData>
reuseRecord.replace(nestedProjectedRowData);
}
output.collect(reuseRecord);
+
+ if (recordLimiter != null) {
+ recordLimiter.increment();
+ }
}
}
// start idle when data sending is completed
@@ -160,7 +163,7 @@ public class ReadOperator extends
AbstractStreamOperator<RowData>
}
private boolean reachLimit() {
- if (limit != null && numRecordsIn.getCount() > limit) {
+ if (recordLimiter != null && recordLimiter.reachLimit()) {
LOG.info("Reader {} reach the limit record {}.", this, limit);
return true;
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/DedicatedSplitReadLimitTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/DedicatedSplitReadLimitTest.java
new file mode 100644
index 0000000000..9ef7757dfa
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/DedicatedSplitReadLimitTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.operator;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.metrics.MetricRegistry;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataTypes;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalSerializers;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests {@link ReadOperator} limit on the dedicated split read path. */
+public class DedicatedSplitReadLimitTest {
+
+ private static final int LIMIT = 10;
+
+ @TempDir Path tempDir;
+
+ private Table table;
+
+ @BeforeEach
+ public void before()
+ throws Catalog.TableAlreadyExistException,
Catalog.DatabaseNotExistException,
+ Catalog.TableNotExistException,
Catalog.DatabaseAlreadyExistException {
+ Catalog catalog =
+ CatalogFactory.createCatalog(
+ CatalogContext.create(new
org.apache.paimon.fs.Path(tempDir.toUri())));
+ Schema schema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("b", DataTypes.INT())
+ .column("c", DataTypes.INT())
+ .primaryKey("a")
+ .option("bucket", "1")
+ .build();
+ Identifier identifier = Identifier.create("default", "t");
+ catalog.createDatabase("default", false);
+ catalog.createTable(identifier, schema, false);
+ this.table = catalog.getTable(identifier);
+ }
+
+ @Test
+ public void testReadOperatorStopsAfterLimit() throws Exception {
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ BatchTableWrite write = writeBuilder.newWrite();
+ for (int i = 0; i < 100; i++) {
+ write.write(GenericRow.of(i, i, i));
+ }
+ BatchTableCommit commit = writeBuilder.newCommit();
+ commit.commit(write.prepareCommit());
+ write.close();
+ commit.close();
+
+ ReadBatchCountingRead countingRead =
+ new ReadBatchCountingRead(table.newReadBuilder().newRead());
+ ReadOperator readOperator = new ReadOperator(() -> countingRead, null,
(long) LIMIT);
+
+ OneInputStreamOperatorTestHarness<Split, RowData> harness =
+ new OneInputStreamOperatorTestHarness<>(readOperator);
+ harness.setup(
+ InternalSerializers.create(
+ RowType.of(new IntType(), new IntType(), new
IntType())));
+ harness.open();
+ for (Split split : table.newReadBuilder().newScan().plan().splits()) {
+ harness.processElement(new StreamRecord<>(split));
+ }
+
+ assertThat(harness.getOutput()).hasSize(LIMIT);
+ assertThat(countingRead.readBatchInvocations()).isEqualTo(1);
+ }
+
+ private static class ReadBatchCountingRead implements TableRead {
+
+ private final TableRead delegate;
+ private final AtomicInteger readBatchInvocations = new AtomicInteger();
+
+ private ReadBatchCountingRead(TableRead delegate) {
+ this.delegate = delegate;
+ }
+
+ int readBatchInvocations() {
+ return readBatchInvocations.get();
+ }
+
+ @Override
+ public TableRead withMetricRegistry(MetricRegistry registry) {
+ delegate.withMetricRegistry(registry);
+ return this;
+ }
+
+ @Override
+ public TableRead executeFilter() {
+ delegate.executeFilter();
+ return this;
+ }
+
+ @Override
+ public TableRead withIOManager(IOManager ioManager) {
+ delegate.withIOManager(ioManager);
+ return this;
+ }
+
+ @Override
+ public RecordReader<InternalRow> createReader(Split split) throws
IOException {
+ RecordReader<InternalRow> reader = delegate.createReader(split);
+ return new RecordReader<InternalRow>() {
+ @Override
+ public RecordIterator<InternalRow> readBatch() throws
IOException {
+ readBatchInvocations.incrementAndGet();
+ return reader.readBatch();
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+ };
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
index e4ab4ec157..9c57f27b86 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
@@ -238,15 +238,11 @@ public class OperatorSourceTest {
}
ArrayList<Object> values = new ArrayList<>(harness.getOutput());
- // In ReadOperator each Split is already counted as one input record.
But in this case it
- // will not happen.
- // So in this case the result values's size if 3 even if the limit is
2.
- // The IT case see
BatchFileStoreITCase#testBatchReadSourceWithSnapshot.
+ // ReadOperator limit is enforced on emitted records.
assertThat(values)
.containsExactlyInAnyOrder(
new StreamRecord<>(GenericRowData.of(1, 1, 1)),
- new StreamRecord<>(GenericRowData.of(2, 2, 2)),
- new StreamRecord<>(GenericRowData.of(3, 3, 3)));
+ new StreamRecord<>(GenericRowData.of(2, 2, 2)));
}
@Test