This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new dc4552491 [flink] support to infer parallelism for system table (#3201)
dc4552491 is described below

commit dc4552491a3f679f6e6591e816a465443cb1d211
Author: Aitozi <[email protected]>
AuthorDate: Fri Apr 12 13:30:24 2024 +0800

    [flink] support to infer parallelism for system table (#3201)
---
 .../paimon/flink/AbstractFlinkTableFactory.java    |   5 +-
 .../paimon/flink/source/DataTableSource.java       |  79 +----------
 .../paimon/flink/source/FlinkTableSource.java      |  77 +++++++++++
 .../paimon/flink/source/SystemTableSource.java     |  32 ++++-
 .../paimon/flink/source/DataTableSourceTest.java   | 147 ++++++++++++++-------
 5 files changed, 211 insertions(+), 129 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
index 782a9804e..c42780afa 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
@@ -100,7 +100,10 @@ public abstract class AbstractFlinkTableFactory
                         == RuntimeExecutionMode.STREAMING;
         if (origin instanceof SystemCatalogTable) {
             return new PushedTableSource(
-                    new SystemTableSource(((SystemCatalogTable) 
origin).table(), isStreamingMode));
+                    new SystemTableSource(
+                            ((SystemCatalogTable) origin).table(),
+                            isStreamingMode,
+                            context.getObjectIdentifier()));
         } else {
             Table table = buildPaimonTable(context);
             if (table instanceof FileStoreTable) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index fd29dd048..ff0312ee5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -22,7 +22,6 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.ChangelogProducer;
 import org.apache.paimon.CoreOptions.LogChangelogMode;
 import org.apache.paimon.CoreOptions.LogConsistency;
-import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.FlinkConnectorOptions.WatermarkEmitStrategy;
 import org.apache.paimon.flink.PaimonDataStreamScanProvider;
 import org.apache.paimon.flink.log.LogSourceProvider;
@@ -32,13 +31,9 @@ import 
org.apache.paimon.flink.lookup.LookupRuntimeProviderFactory;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.Split;
 import org.apache.paimon.utils.Projection;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.source.LookupTableSource.LookupContext;
@@ -67,7 +62,6 @@ import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGN
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_EMIT_STRATEGY;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT;
-import static org.apache.paimon.options.OptionsUtils.PAIMON_PREFIX;
 import static org.apache.paimon.utils.Preconditions.checkState;
 
 /**
@@ -79,9 +73,6 @@ import static 
org.apache.paimon.utils.Preconditions.checkState;
  * LogSourceProvider}.
  */
 public class DataTableSource extends FlinkTableSource {
-    private static final String FLINK_INFER_SCAN_PARALLELISM =
-            String.format(
-                    "%s%s", PAIMON_PREFIX, 
FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key());
 
     private final ObjectIdentifier tableIdentifier;
     private final boolean streaming;
@@ -90,8 +81,6 @@ public class DataTableSource extends FlinkTableSource {
 
     @Nullable private WatermarkStrategy<RowData> watermarkStrategy;
 
-    private SplitStatistics splitStatistics;
-
     @Nullable private List<String> dynamicPartitionFilteringFields;
 
     public DataTableSource(
@@ -211,48 +200,12 @@ public class DataTableSource extends FlinkTableSource {
                         
.withDynamicPartitionFilteringFields(dynamicPartitionFilteringFields);
 
         return new PaimonDataStreamScanProvider(
-                !streaming, env -> configureSource(sourceBuilder, env));
-    }
-
-    private DataStream<RowData> configureSource(
-            FlinkSourceBuilder sourceBuilder, StreamExecutionEnvironment env) {
-        Options options = Options.fromMap(this.table.options());
-        Configuration envConfig = (Configuration) env.getConfiguration();
-        if (envConfig.containsKey(FLINK_INFER_SCAN_PARALLELISM)) {
-            options.set(
-                    FlinkConnectorOptions.INFER_SCAN_PARALLELISM,
-                    
Boolean.parseBoolean(envConfig.toMap().get(FLINK_INFER_SCAN_PARALLELISM)));
-        }
-        Integer parallelism = 
options.get(FlinkConnectorOptions.SCAN_PARALLELISM);
-        if (parallelism == null && 
options.get(FlinkConnectorOptions.INFER_SCAN_PARALLELISM)) {
-            if (streaming) {
-                parallelism = options.get(CoreOptions.BUCKET);
-            } else {
-                scanSplitsForInference();
-                parallelism = splitStatistics.splitNumber();
-                if (null != limit && limit > 0) {
-                    int limitCount =
-                            limit >= Integer.MAX_VALUE ? Integer.MAX_VALUE : 
limit.intValue();
-                    parallelism = Math.min(parallelism, limitCount);
-                }
-
-                parallelism = Math.max(1, parallelism);
-            }
-            parallelism =
-                    Math.min(
-                            parallelism,
-                            
options.get(FlinkConnectorOptions.INFER_SCAN_MAX_PARALLELISM));
-        }
-
-        return sourceBuilder.withParallelism(parallelism).withEnv(env).build();
-    }
-
-    private void scanSplitsForInference() {
-        if (splitStatistics == null) {
-            List<Split> splits =
-                    
table.newReadBuilder().withFilter(predicate).newScan().plan().splits();
-            splitStatistics = new SplitStatistics(splits);
-        }
+                !streaming,
+                env ->
+                        sourceBuilder
+                                .withParallelism(inferSourceParallelism(env))
+                                .withEnv(env)
+                                .build());
     }
 
     @Override
@@ -335,24 +288,4 @@ public class DataTableSource extends FlinkTableSource {
     public boolean isStreaming() {
         return streaming;
     }
-
-    /** Split statistics for inferring row count and parallelism size. */
-    protected static class SplitStatistics {
-
-        private final int splitNumber;
-        private final long totalRowCount;
-
-        private SplitStatistics(List<Split> splits) {
-            this.splitNumber = splits.size();
-            this.totalRowCount = 
splits.stream().mapToLong(Split::rowCount).sum();
-        }
-
-        public int splitNumber() {
-            return splitNumber;
-        }
-
-        public long totalRowCount() {
-            return totalRowCount;
-        }
-    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
index 68209240e..7254eefaa 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
@@ -18,15 +18,21 @@
 
 package org.apache.paimon.flink.source;
 
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.LogicalTypeConversion;
 import org.apache.paimon.flink.PredicateConverter;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.PartitionPredicateVisitor;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.predicate.PredicateVisitor;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.Split;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.source.LookupTableSource.LookupContext;
 import 
org.apache.flink.table.connector.source.LookupTableSource.LookupRuntimeProvider;
@@ -46,16 +52,23 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 
+import static org.apache.paimon.options.OptionsUtils.PAIMON_PREFIX;
+
 /** A Flink {@link ScanTableSource} for paimon. */
 public abstract class FlinkTableSource {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(FlinkTableSource.class);
 
+    protected static final String FLINK_INFER_SCAN_PARALLELISM =
+            String.format(
+                    "%s%s", PAIMON_PREFIX, 
FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key());
+
     protected final Table table;
 
     @Nullable protected Predicate predicate;
     @Nullable protected int[][] projectFields;
     @Nullable protected Long limit;
+    protected SplitStatistics splitStatistics;
 
     public FlinkTableSource(Table table) {
         this(table, null, null, null);
@@ -132,4 +145,68 @@ public abstract class FlinkTableSource {
     public abstract void applyDynamicFiltering(List<String> 
candidateFilterFields);
 
     public abstract boolean isStreaming();
+
+    @Nullable
+    protected Integer inferSourceParallelism(StreamExecutionEnvironment env) {
+        Options options = Options.fromMap(this.table.options());
+        Configuration envConfig = (Configuration) env.getConfiguration();
+        if (envConfig.containsKey(FLINK_INFER_SCAN_PARALLELISM)) {
+            options.set(
+                    FlinkConnectorOptions.INFER_SCAN_PARALLELISM,
+                    
Boolean.parseBoolean(envConfig.toMap().get(FLINK_INFER_SCAN_PARALLELISM)));
+        }
+        Integer parallelism = 
options.get(FlinkConnectorOptions.SCAN_PARALLELISM);
+        if (parallelism == null && 
options.get(FlinkConnectorOptions.INFER_SCAN_PARALLELISM)) {
+            if (isStreaming()) {
+                parallelism = Math.max(1, options.get(CoreOptions.BUCKET));
+            } else {
+                scanSplitsForInference();
+                parallelism = splitStatistics.splitNumber();
+                if (null != limit && limit > 0) {
+                    int limitCount =
+                            limit >= Integer.MAX_VALUE ? Integer.MAX_VALUE : 
limit.intValue();
+                    parallelism = Math.min(parallelism, limitCount);
+                }
+
+                parallelism = Math.max(1, parallelism);
+                parallelism =
+                        Math.min(
+                                parallelism,
+                                
options.get(FlinkConnectorOptions.INFER_SCAN_MAX_PARALLELISM));
+            }
+        }
+        return parallelism;
+    }
+
+    protected void scanSplitsForInference() {
+        if (splitStatistics == null) {
+            List<Split> splits =
+                    
table.newReadBuilder().withFilter(predicate).newScan().plan().splits();
+            splitStatistics = new SplitStatistics(splits);
+        }
+    }
+
+    /** Split statistics for inferring row count and parallelism size. */
+    protected static class SplitStatistics {
+
+        private final int splitNumber;
+        private final long totalRowCount;
+
+        protected SplitStatistics(List<Split> splits) {
+            this.splitNumber = splits.size();
+            this.totalRowCount = 
splits.stream().mapToLong(Split::rowCount).sum();
+        }
+
+        public int splitNumber() {
+            return splitNumber;
+        }
+
+        public long totalRowCount() {
+            return totalRowCount;
+        }
+    }
+
+    public Table getTable() {
+        return table;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
index b577a73aa..49ed0c0b8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.source;
 
 import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.PaimonDataStreamScanProvider;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.DataTable;
@@ -26,12 +27,14 @@ import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.ReadBuilder;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.source.LookupTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource.ScanContext;
 import 
org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider;
-import org.apache.flink.table.connector.source.SourceProvider;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.plan.stats.TableStats;
 
@@ -46,13 +49,16 @@ public class SystemTableSource extends FlinkTableSource {
     private final boolean isStreamingMode;
     private final int splitBatchSize;
     private final FlinkConnectorOptions.SplitAssignMode splitAssignMode;
+    private final ObjectIdentifier tableIdentifier;
 
-    public SystemTableSource(Table table, boolean isStreamingMode) {
+    public SystemTableSource(
+            Table table, boolean isStreamingMode, ObjectIdentifier 
tableIdentifier) {
         super(table);
         this.isStreamingMode = isStreamingMode;
         Options options = Options.fromMap(table.options());
         this.splitBatchSize = 
options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_BATCH_SIZE);
         this.splitAssignMode = 
options.get(FlinkConnectorOptions.SCAN_SPLIT_ENUMERATOR_ASSIGN_MODE);
+        this.tableIdentifier = tableIdentifier;
     }
 
     public SystemTableSource(
@@ -62,11 +68,13 @@ public class SystemTableSource extends FlinkTableSource {
             @Nullable int[][] projectFields,
             @Nullable Long limit,
             int splitBatchSize,
-            FlinkConnectorOptions.SplitAssignMode splitAssignMode) {
+            FlinkConnectorOptions.SplitAssignMode splitAssignMode,
+            ObjectIdentifier tableIdentifier) {
         super(table, predicate, projectFields, limit);
         this.isStreamingMode = isStreamingMode;
         this.splitBatchSize = splitBatchSize;
         this.splitAssignMode = splitAssignMode;
+        this.tableIdentifier = tableIdentifier;
     }
 
     @Override
@@ -85,7 +93,20 @@ public class SystemTableSource extends FlinkTableSource {
         } else {
             source = new StaticFileStoreSource(readBuilder, limit, 
splitBatchSize, splitAssignMode);
         }
-        return SourceProvider.of(source);
+        return new PaimonDataStreamScanProvider(
+                source.getBoundedness() == Boundedness.BOUNDED,
+                env -> {
+                    Integer parallelism = inferSourceParallelism(env);
+                    DataStreamSource<RowData> dataStreamSource =
+                            env.fromSource(
+                                    source,
+                                    WatermarkStrategy.noWatermarks(),
+                                    tableIdentifier.asSummaryString());
+                    if (parallelism != null) {
+                        dataStreamSource.setParallelism(parallelism);
+                    }
+                    return dataStreamSource;
+                });
     }
 
     @Override
@@ -97,7 +118,8 @@ public class SystemTableSource extends FlinkTableSource {
                 projectFields,
                 limit,
                 splitBatchSize,
-                splitAssignMode);
+                splitAssignMode,
+                tableIdentifier);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/DataTableSourceTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/DataTableSourceTest.java
index 91af8071e..b98762ae2 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/DataTableSourceTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/DataTableSourceTest.java
@@ -31,10 +31,15 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.sink.InnerTableWrite;
 import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.system.ReadOptimizedTable;
 import org.apache.paimon.types.DataTypes;
 
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -47,6 +52,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.util.Collections;
+import java.util.Map;
 import java.util.Optional;
 
 import static org.apache.paimon.options.OptionsUtils.PAIMON_PREFIX;
@@ -55,35 +61,12 @@ import static 
org.assertj.core.api.AssertionsForClassTypes.assertThat;
 /** Tests for {@link DataTableSource}. */
 class DataTableSourceTest {
 
-    @Test
-    void testInferScanParallelism(@TempDir java.nio.file.Path path) throws 
Exception {
-        FileIO fileIO = LocalFileIO.create();
-        Path tablePath = new Path(path.toString());
-        SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
-        TableSchema tableSchema =
-                schemaManager.createTable(
-                        Schema.newBuilder()
-                                .column("a", DataTypes.INT())
-                                .column("b", DataTypes.BIGINT())
-                                .option("bucket", "1")
-                                .build());
-        FileStoreTable fileStoreTable =
-                FileStoreTableFactory.create(fileIO, tablePath, tableSchema);
-        InnerTableWrite writer = fileStoreTable.newWrite("test");
-        TableCommitImpl commit = fileStoreTable.newCommit("test");
-        writer.write(GenericRow.of(1, 2L));
-        writer.write(GenericRow.of(3, 4L));
-        writer.write(GenericRow.of(5, 6L));
-        writer.write(GenericRow.of(7, 8L));
-        writer.write(GenericRow.of(9, 10L));
-        writer.write(GenericRow.of(11, 12L));
-        writer.write(GenericRow.of(13, 14L));
-        writer.write(GenericRow.of(15, 16L));
-        writer.write(GenericRow.of(17, 18L));
-        commit.commit(writer.prepareCommit());
+    @TempDir java.nio.file.Path path;
 
-        commit.close();
-        writer.close();
+    @Test
+    void testInferScanParallelism() throws Exception {
+        FileStoreTable fileStoreTable = createTable(ImmutableMap.of("bucket", 
"1"));
+        writeData(fileStoreTable);
 
         DataTableSource tableSource =
                 new DataTableSource(
@@ -92,28 +75,7 @@ class DataTableSourceTest {
                         true,
                         null,
                         null);
-        PaimonDataStreamScanProvider runtimeProvider =
-                (PaimonDataStreamScanProvider)
-                        tableSource.getScanRuntimeProvider(
-                                new ScanTableSource.ScanContext() {
-                                    @Override
-                                    public <T> TypeInformation<T> 
createTypeInformation(
-                                            DataType dataType) {
-                                        throw new 
UnsupportedOperationException();
-                                    }
-
-                                    @Override
-                                    public <T> TypeInformation<T> 
createTypeInformation(
-                                            LogicalType logicalType) {
-                                        throw new 
UnsupportedOperationException();
-                                    }
-
-                                    @Override
-                                    public 
DynamicTableSource.DataStructureConverter
-                                            
createDataStructureConverter(DataType dataType) {
-                                        throw new 
UnsupportedOperationException();
-                                    }
-                                });
+        PaimonDataStreamScanProvider runtimeProvider = 
runtimeProvider(tableSource);
         StreamExecutionEnvironment sEnv1 = 
StreamExecutionEnvironment.createLocalEnvironment();
         DataStream<RowData> sourceStream1 =
                 runtimeProvider.produceDataStream(s -> Optional.empty(), 
sEnv1);
@@ -134,4 +96,89 @@ class DataTableSourceTest {
         assertThat(sourceStream2.getParallelism()).isNotEqualTo(1);
         
assertThat(sourceStream2.getParallelism()).isEqualTo(sEnv2.getParallelism());
     }
+
+    @Test
+    public void testInferStreamParallelism() throws Exception {
+        FileStoreTable fileStoreTable = createTable(ImmutableMap.of("bucket", 
"-1"));
+
+        DataTableSource tableSource =
+                new DataTableSource(
+                        ObjectIdentifier.of("cat", "db", "table"),
+                        fileStoreTable,
+                        true,
+                        null,
+                        null);
+        PaimonDataStreamScanProvider runtimeProvider = 
runtimeProvider(tableSource);
+
+        StreamExecutionEnvironment sEnv1 = 
StreamExecutionEnvironment.createLocalEnvironment();
+        DataStream<RowData> sourceStream1 =
+                runtimeProvider.produceDataStream(s -> Optional.empty(), 
sEnv1);
+        // parallelism = 1 for table with -1 bucket.
+        assertThat(sourceStream1.getParallelism()).isEqualTo(1);
+    }
+
+    @Test
+    public void testSystemTableParallelism() throws Exception {
+        FileStoreTable fileStoreTable =
+                createTable(ImmutableMap.of("bucket", "1", "scan.parallelism", 
"3"));
+
+        ReadOptimizedTable ro = new ReadOptimizedTable(fileStoreTable);
+
+        SystemTableSource tableSource =
+                new SystemTableSource(ro, false, ObjectIdentifier.of("cat", 
"db", "table"));
+        PaimonDataStreamScanProvider runtimeProvider = 
runtimeProvider(tableSource);
+
+        Configuration configuration = new Configuration();
+        configuration.set(ExecutionOptions.RUNTIME_MODE, 
RuntimeExecutionMode.BATCH);
+        StreamExecutionEnvironment sEnv1 = 
StreamExecutionEnvironment.createLocalEnvironment();
+        DataStream<RowData> sourceStream1 =
+                runtimeProvider.produceDataStream(s -> Optional.empty(), 
sEnv1);
+        assertThat(sourceStream1.getParallelism()).isEqualTo(3);
+    }
+
+    private PaimonDataStreamScanProvider runtimeProvider(FlinkTableSource 
tableSource) {
+        return (PaimonDataStreamScanProvider)
+                tableSource.getScanRuntimeProvider(
+                        new ScanTableSource.ScanContext() {
+                            @Override
+                            public <T> TypeInformation<T> 
createTypeInformation(DataType dataType) {
+                                throw new UnsupportedOperationException();
+                            }
+
+                            @Override
+                            public <T> TypeInformation<T> 
createTypeInformation(
+                                    LogicalType logicalType) {
+                                throw new UnsupportedOperationException();
+                            }
+
+                            @Override
+                            public DynamicTableSource.DataStructureConverter
+                                    createDataStructureConverter(DataType 
dataType) {
+                                throw new UnsupportedOperationException();
+                            }
+                        });
+    }
+
+    private FileStoreTable createTable(Map<String, String> options) throws 
Exception {
+        FileIO fileIO = LocalFileIO.create();
+        Path tablePath = new Path(path.toString());
+        SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
+        TableSchema tableSchema =
+                schemaManager.createTable(
+                        Schema.newBuilder()
+                                .column("a", DataTypes.INT())
+                                .column("b", DataTypes.BIGINT())
+                                .options(options)
+                                .build());
+        return FileStoreTableFactory.create(fileIO, tablePath, tableSchema);
+    }
+
+    private void writeData(FileStoreTable table) throws Exception {
+        InnerTableWrite writer = table.newWrite("test");
+        TableCommitImpl commit = table.newCommit("test");
+        writer.write(GenericRow.of(1, 2L));
+        commit.commit(writer.prepareCommit());
+        commit.close();
+        writer.close();
+    }
 }

Reply via email to