This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e83cee4e [flink] Supports infer parallelism (#584)
8e83cee4e is described below

commit 8e83cee4e4a68ee8198229420599eadd4eaf2bb8
Author: JunZhang <[email protected]>
AuthorDate: Sun Apr 2 15:16:51 2023 +0800

    [flink] Supports infer parallelism (#584)
---
 .../generated/flink_connector_configuration.html   |   8 +-
 .../apache/paimon/flink/FlinkConnectorOptions.java |  11 +-
 .../flink/source/CompactorSourceBuilder.java       |   3 +
 .../paimon/flink/source/DataTableSource.java       |  43 +++++++-
 .../source/FileStoreSourceSplitGenerator.java      |   7 ++
 .../paimon/flink/source/FlinkSourceBuilder.java    |  11 +-
 .../paimon/flink/source/StaticFileStoreSource.java |  27 +++--
 .../paimon/flink/source/SystemTableSource.java     |   6 +-
 .../apache/paimon/flink/ReadWriteTableITCase.java  | 122 +++++++++++++++++++++
 .../paimon/flink/util/ReadWriteTableTestUtil.java  |  24 +++-
 10 files changed, 244 insertions(+), 18 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 8060955d2..11cfa050b 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -30,7 +30,13 @@
             <td><h5>scan.parallelism</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>Integer</td>
-            <td>Define a custom parallelism for the scan source. By default, 
if this option is not defined, the planner will derive the parallelism for each 
statement individually by also considering the global configuration.</td>
+            <td>Define a custom parallelism for the scan source. By default, 
if this option is not defined, the planner will derive the parallelism for each 
statement individually by also considering the global configuration. If user 
enable the scan.infer-parallelism, the planner will derive the parallelism by 
inferred parallelism.</td>
+        </tr>
+        <tr>
+            <td><h5>scan.infer-parallelism</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>If it is false, parallelism of source are set by 
scan.parallelism. Otherwise, source parallelism is inferred from splits number 
(batch mode) or bucket number(streaming mode).</td>
         </tr>
         <tr>
             <td><h5>scan.split-enumerator.batch-size</h5></td>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index c882ffb60..91dd1b8be 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -83,7 +83,16 @@ public class FlinkConnectorOptions {
                     .withDescription(
                             "Define a custom parallelism for the scan source. "
                                     + "By default, if this option is not 
defined, the planner will derive the parallelism "
-                                    + "for each statement individually by also 
considering the global configuration.");
+                                    + "for each statement individually by also 
considering the global configuration. "
+                                    + "If user enable the 
scan.infer-parallelism, the planner will derive the parallelism by inferred 
parallelism.");
+    public static final ConfigOption<Boolean> INFER_SCAN_PARALLELISM =
+            ConfigOptions.key("scan.infer-parallelism")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "If it is false, parallelism of source are set by "
+                                    + SCAN_PARALLELISM.key()
+                                    + ". Otherwise, source parallelism is 
inferred from splits number (batch mode) or bucket number(streaming mode).");
 
     public static final ConfigOption<Boolean> STREAMING_READ_ATOMIC =
             ConfigOptions.key("streaming-read-atomic")
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
index 9761c4cbe..f49b03309 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
@@ -25,6 +25,7 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.system.BucketsTable;
 import org.apache.paimon.types.RowType;
 
@@ -99,6 +100,7 @@ public class CompactorSourceBuilder {
                     null,
                     TableScanUtils.compactStreamScanFactory());
         } else {
+            List<Split> splits = readBuilder.newScan().plan().splits();
             return new StaticFileStoreSource(
                     readBuilder,
                     null,
@@ -106,6 +108,7 @@ public class CompactorSourceBuilder {
                             .coreOptions()
                             .toConfiguration()
                             
.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE),
+                    splits,
                     TableScanUtils.compactBatchScanFactory());
         }
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index 6529c8f04..f0de0825a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.source;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.ChangelogProducer;
 import org.apache.paimon.CoreOptions.LogChangelogMode;
 import org.apache.paimon.CoreOptions.LogConsistency;
@@ -33,10 +34,15 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.AppendOnlyFileStoreTable;
 import org.apache.paimon.table.ChangelogValueCountFileStoreTable;
 import org.apache.paimon.table.ChangelogWithKeyFileStoreTable;
+import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.Projection;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.source.DynamicTableSource;
@@ -48,6 +54,7 @@ import org.apache.flink.table.factories.DynamicTableFactory;
 import javax.annotation.Nullable;
 
 import java.time.Duration;
+import java.util.List;
 import java.util.stream.IntStream;
 
 import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
@@ -197,13 +204,41 @@ public class DataTableSource extends FlinkTableSource
                         .withProjection(projectFields)
                         .withPredicate(predicate)
                         .withLimit(limit)
-                        .withParallelism(
-                                Options.fromMap(table.options())
-                                        
.get(FlinkConnectorOptions.SCAN_PARALLELISM))
                         .withWatermarkStrategy(watermarkStrategy);
 
         return new PaimonDataStreamScanProvider(
-                !streaming, env -> sourceBuilder.withEnv(env).build());
+                !streaming, env -> configureSource(sourceBuilder, env));
+    }
+
+    private DataStreamSource<RowData> configureSource(
+            FlinkSourceBuilder sourceBuilder, StreamExecutionEnvironment env) {
+        Options options = Options.fromMap(this.table.options());
+        Integer parallelism = 
options.get(FlinkConnectorOptions.SCAN_PARALLELISM);
+        List<Split> splits = null;
+        if (options.get(FlinkConnectorOptions.INFER_SCAN_PARALLELISM)) {
+            // for streaming mode, set the default parallelism to the bucket 
number.
+            if (streaming) {
+                parallelism = options.get(CoreOptions.BUCKET);
+            } else {
+
+                Preconditions.checkState(table instanceof DataTable);
+                DataTable dataTable = (DataTable) table;
+                splits = dataTable.newScan().plan().splits();
+
+                if (null != splits) {
+                    parallelism = splits.size();
+                }
+                if (null != limit && limit > 0) {
+                    int limitCount =
+                            limit >= Integer.MAX_VALUE ? Integer.MAX_VALUE : 
limit.intValue();
+                    parallelism = Math.min(parallelism, limitCount);
+                }
+
+                parallelism = null == parallelism ? 1 : Math.max(1, 
parallelism);
+            }
+        }
+
+        return 
sourceBuilder.withParallelism(parallelism).withSplits(splits).withEnv(env).build();
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitGenerator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitGenerator.java
index 5f884cc18..f19ec828e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitGenerator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitGenerator.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.source;
 
+import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableScan;
 
 import java.util.List;
@@ -41,6 +42,12 @@ public class FileStoreSourceSplitGenerator {
                 .collect(Collectors.toList());
     }
 
+    public List<FileStoreSourceSplit> createSplits(List<Split> splits) {
+        return splits.stream()
+                .map(s -> new FileStoreSourceSplit(getNextId(), s))
+                .collect(Collectors.toList());
+    }
+
     protected final String getNextId() {
         // because we just increment numbers, we increment the char 
representation directly,
         // rather than incrementing an integer and converting it to a string 
representation
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index 7de7e3c2c..17aa346d9 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -27,6 +27,7 @@ import org.apache.paimon.flink.utils.TableScanUtils;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.Split;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.connector.source.Boundedness;
@@ -42,6 +43,7 @@ import org.apache.flink.table.types.logical.RowType;
 
 import javax.annotation.Nullable;
 
+import java.util.List;
 import java.util.Optional;
 
 import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
@@ -64,6 +66,7 @@ public class FlinkSourceBuilder {
     @Nullable private Integer parallelism;
     @Nullable private Long limit;
     @Nullable private WatermarkStrategy<RowData> watermarkStrategy;
+    @Nullable private List<Split> splits;
 
     public FlinkSourceBuilder(ObjectIdentifier tableIdentifier, Table table) {
         this.tableIdentifier = tableIdentifier;
@@ -112,12 +115,18 @@ public class FlinkSourceBuilder {
         return this;
     }
 
+    public FlinkSourceBuilder withSplits(List<Split> splits) {
+        this.splits = splits;
+        return this;
+    }
+
     private StaticFileStoreSource buildStaticFileSource() {
         return new StaticFileStoreSource(
                 
table.newReadBuilder().withProjection(projectedFields).withFilter(predicate),
                 limit,
                 Options.fromMap(table.options())
-                        
.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE));
+                        
.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE),
+                splits);
     }
 
     private ContinuousFileStoreSource buildContinuousFileSource() {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
index 219f2c13a..c0fa8c9e0 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSource.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.source;
 
 import org.apache.paimon.flink.utils.TableScanUtils;
 import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
 
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.SplitEnumerator;
@@ -28,6 +29,7 @@ import 
org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import javax.annotation.Nullable;
 
 import java.util.Collection;
+import java.util.List;
 
 /** Bounded {@link FlinkSource} for reading records. It does not monitor new 
snapshots. */
 public class StaticFileStoreSource extends FlinkSource {
@@ -36,19 +38,25 @@ public class StaticFileStoreSource extends FlinkSource {
 
     private final int splitBatchSize;
     private final TableScanUtils.TableScanFactory scanFactory;
+    private final List<Split> splitList;
 
     public StaticFileStoreSource(
-            ReadBuilder readBuilder, @Nullable Long limit, int splitBatchSize) 
{
-        this(readBuilder, limit, splitBatchSize, ReadBuilder::newScan);
+            ReadBuilder readBuilder,
+            @Nullable Long limit,
+            int splitBatchSize,
+            List<Split> splitList) {
+        this(readBuilder, limit, splitBatchSize, splitList, 
ReadBuilder::newScan);
     }
 
     public StaticFileStoreSource(
             ReadBuilder readBuilder,
             @Nullable Long limit,
             int splitBatchSize,
+            List<Split> splitList,
             TableScanUtils.TableScanFactory scanFactory) {
         super(readBuilder, limit);
         this.splitBatchSize = splitBatchSize;
+        this.splitList = splitList;
         this.scanFactory = scanFactory;
     }
 
@@ -62,11 +70,16 @@ public class StaticFileStoreSource extends FlinkSource {
             SplitEnumeratorContext<FileStoreSourceSplit> context,
             PendingSplitsCheckpoint checkpoint) {
         Collection<FileStoreSourceSplit> splits =
-                checkpoint == null
-                        ? new FileStoreSourceSplitGenerator()
-                                
.createSplits(scanFactory.create(readBuilder).plan())
-                        : checkpoint.splits();
-
+                checkpoint == null ? getSplits() : checkpoint.splits();
         return new StaticFileStoreSplitEnumerator(context, null, splits, 
splitBatchSize);
     }
+
+    private List<FileStoreSourceSplit> getSplits() {
+        FileStoreSourceSplitGenerator splitGenerator = new 
FileStoreSourceSplitGenerator();
+        if (null != splitList) {
+            return splitGenerator.createSplits(splitList);
+        } else {
+            return 
splitGenerator.createSplits(scanFactory.create(readBuilder).plan());
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
index a4d911540..bda8946b6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
@@ -24,6 +24,7 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
 
 import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.table.connector.ChangelogMode;
@@ -33,6 +34,8 @@ import org.apache.flink.table.data.RowData;
 
 import javax.annotation.Nullable;
 
+import java.util.List;
+
 /** A {@link FlinkTableSource} for system table. */
 public class SystemTableSource extends FlinkTableSource {
 
@@ -73,7 +76,8 @@ public class SystemTableSource extends FlinkTableSource {
         if (isStreamingMode && table instanceof DataTable) {
             source = new ContinuousFileStoreSource(readBuilder, 
table.options(), limit);
         } else {
-            source = new StaticFileStoreSource(readBuilder, limit, 
splitBatchSize);
+            List<Split> splits = readBuilder.newScan().plan().splits();
+            source = new StaticFileStoreSource(readBuilder, limit, 
splitBatchSize, splits);
         }
         return SourceProvider.of(source);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index cfe5f9f35..213625b6b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -57,7 +57,11 @@ import java.util.Map;
 import java.util.UUID;
 
 import static 
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
+import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST;
+import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_TARGET_SIZE;
 import static 
org.apache.paimon.flink.AbstractFlinkTableFactory.buildPaimonTable;
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.INFER_SCAN_PARALLELISM;
 import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_PARALLELISM;
 import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_PARALLELISM;
 import static org.apache.paimon.flink.FlinkTestBase.createResolvedTable;
@@ -1111,12 +1115,124 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
                                         "",
                                         new HashMap<String, String>() {
                                             {
+                                                
put(INFER_SCAN_PARALLELISM.key(), "false");
                                                 put(SCAN_PARALLELISM.key(), 
"66");
                                             }
                                         })))
                 .isEqualTo(66);
     }
 
+    @Test
+    public void testInferParallelism() throws Exception {
+        String table =
+                createTable(
+                        Arrays.asList("currency STRING", "rate BIGINT"),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        new HashMap<String, String>() {
+                            {
+                                put(SOURCE_SPLIT_OPEN_FILE_COST.key(), "1KB");
+                                put(SOURCE_SPLIT_TARGET_SIZE.key(), "1KB");
+                                put(BUCKET.key(), "2");
+                            }
+                        });
+        // Empty table, infer parallelism should be at least 1
+        assertThat(
+                        sourceParallelism(
+                                buildQueryWithTableOptions(
+                                        table,
+                                        "*",
+                                        "",
+                                        new HashMap<String, String>() {
+                                            {
+                                                
put(INFER_SCAN_PARALLELISM.key(), "true");
+                                            }
+                                        })))
+                .isEqualTo(1);
+
+        // error scan.parallelism, infer parallelism should be at least 1
+        assertThat(
+                        sourceParallelism(
+                                buildQueryWithTableOptions(
+                                        table,
+                                        "*",
+                                        "",
+                                        new HashMap<String, String>() {
+                                            {
+                                                
put(INFER_SCAN_PARALLELISM.key(), "true");
+                                                put(SCAN_PARALLELISM.key(), 
"-1");
+                                            }
+                                        })))
+                .isEqualTo(1);
+
+        // 2 splits, the parallelism is splits num: 2
+        insertInto(table, "('Euro', 119)");
+        insertInto(table, "('US Dollar', 102)");
+        assertThat(
+                        sourceParallelism(
+                                buildQueryWithTableOptions(
+                                        table,
+                                        "*",
+                                        "",
+                                        new HashMap<String, String>() {
+                                            {
+                                                
put(INFER_SCAN_PARALLELISM.key(), "true");
+                                            }
+                                        })))
+                .isEqualTo(2);
+
+        // 2 splits and limit is 1, the parallelism is the limit value : 1
+        assertThat(
+                        sourceParallelism(
+                                buildQueryWithTableOptions(
+                                        table,
+                                        "*",
+                                        "",
+                                        1L,
+                                        new HashMap<String, String>() {
+                                            {
+                                                
put(INFER_SCAN_PARALLELISM.key(), "true");
+                                            }
+                                        })))
+                .isEqualTo(1);
+
+        // 2 splits, limit is 3, the parallelism is infer parallelism : 2
+        assertThat(
+                        sourceParallelism(
+                                buildQueryWithTableOptions(
+                                        table,
+                                        "*",
+                                        "",
+                                        3L,
+                                        new HashMap<String, String>() {
+                                            {
+                                                
put(INFER_SCAN_PARALLELISM.key(), "true");
+                                            }
+                                        })))
+                .isEqualTo(1);
+
+        // 2 splits, infer parallelism is disabled, the parallelism is 
scan.parallelism
+        assertThat(
+                        sourceParallelism(
+                                buildQueryWithTableOptions(
+                                        table,
+                                        "*",
+                                        "",
+                                        new HashMap<String, String>() {
+                                            {
+                                                
put(INFER_SCAN_PARALLELISM.key(), "false");
+                                                put(SCAN_PARALLELISM.key(), 
"3");
+                                            }
+                                        })))
+                .isEqualTo(3);
+
+        // for streaming mode
+        assertThat(
+                        sourceParallelismStreaming(
+                                buildQueryWithTableOptions(table, "*", "", new 
HashMap<>())))
+                .isEqualTo(2);
+    }
+
     @Test
     public void testSinkParallelism() throws Exception {
         testSinkParallelism(null, bExeEnv.getParallelism());
@@ -1206,6 +1322,12 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
         return stream.getParallelism();
     }
 
+    private int sourceParallelismStreaming(String sql) {
+        DataStream<Row> stream =
+                ((StreamTableEnvironment) 
sEnv).toChangelogStream(sEnv.sqlQuery(sql));
+        return stream.getParallelism();
+    }
+
     private void testSinkParallelism(Integer configParallelism, int 
expectedParallelism)
             throws Exception {
         // 1. create a mock table sink
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
index 8eba48b1e..3ffba193f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/util/ReadWriteTableTestUtil.java
@@ -235,11 +235,29 @@ public class ReadWriteTableTestUtil {
         return buildQueryWithTableOptions(table, projection, filter, new 
HashMap<>());
     }
 
+    public static String buildQueryWithTableOptions(
+            String table,
+            String projection,
+            String filter,
+            Long limit,
+            Map<String, String> options) {
+        List<Object> params = new ArrayList<>();
+        params.add(projection);
+        params.add(table);
+        params.add(buildTableOptionsSpec(options));
+        params.add(filter);
+        StringBuilder queryFormat = new StringBuilder("SELECT %s FROM `%s` %s 
%s");
+        if (null != limit) {
+            queryFormat.append(" limit %s");
+            params.add(limit);
+        }
+
+        return String.format(queryFormat.toString(), params.toArray());
+    }
+
     public static String buildQueryWithTableOptions(
             String table, String projection, String filter, Map<String, 
String> options) {
-        String queryFormat = "SELECT %s FROM `%s` %s %s;";
-        return String.format(
-                queryFormat, projection, table, 
buildTableOptionsSpec(options), filter);
+        return buildQueryWithTableOptions(table, projection, filter, null, 
options);
     }
 
     public static void checkFileStorePath(String table, List<String> 
partitionSpec) {

Reply via email to