This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a7fe39780 [flink][source] Supports disable infer scan parallelism by 
flink job dynamic option (#1958)
a7fe39780 is described below

commit a7fe397800923091d6129fcea68eff1148b93943
Author: Shammon FY <[email protected]>
AuthorDate: Mon Sep 11 10:20:44 2023 +0800

    [flink][source] Supports disable infer scan parallelism by flink job 
dynamic option (#1958)
---
 .../org/apache/paimon/options/OptionsUtils.java    |   2 +
 .../paimon/flink/source/DataTableSource.java       |  11 ++
 .../paimon/flink/source/DataTableSourceTest.java   | 135 +++++++++++++++++++++
 .../org/apache/paimon/hive/utils/HiveUtils.java    |   4 +-
 4 files changed, 150 insertions(+), 2 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/options/OptionsUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/options/OptionsUtils.java
index a0943208f..86880c32f 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/OptionsUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/OptionsUtils.java
@@ -32,6 +32,8 @@ import static 
org.apache.paimon.options.StructuredOptionsSplitter.escapeWithSing
 /** Utility class for {@link Options} related helper functions. */
 public class OptionsUtils {
 
+    public static final String PAIMON_PREFIX = "paimon.";
+
     // 
--------------------------------------------------------------------------------------------
     //  Type conversion
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index 10195f1f8..9d9ce0eb5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -39,6 +39,7 @@ import org.apache.paimon.table.source.Split;
 import org.apache.paimon.utils.Projection;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -68,6 +69,7 @@ import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGN
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_EMIT_STRATEGY;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT;
+import static org.apache.paimon.options.OptionsUtils.PAIMON_PREFIX;
 
 /**
  * Table source to create {@link StaticFileStoreSource} or {@link 
ContinuousFileStoreSource} under
@@ -78,6 +80,9 @@ import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_
  * LogSourceProvider}.
  */
 public class DataTableSource extends FlinkTableSource {
+    private static final String FLINK_INFER_SCAN_PARALLELISM =
+            String.format(
+                    "%s%s", PAIMON_PREFIX, 
FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key());
 
     private final ObjectIdentifier tableIdentifier;
     protected final boolean streaming;
@@ -220,6 +225,12 @@ public class DataTableSource extends FlinkTableSource {
     private DataStream<RowData> configureSource(
             FlinkSourceBuilder sourceBuilder, StreamExecutionEnvironment env) {
         Options options = Options.fromMap(this.table.options());
+        Configuration envConfig = (Configuration) env.getConfiguration();
+        if (envConfig.containsKey(FLINK_INFER_SCAN_PARALLELISM)) {
+            options.set(
+                    FlinkConnectorOptions.INFER_SCAN_PARALLELISM,
+                    
Boolean.parseBoolean(envConfig.toMap().get(FLINK_INFER_SCAN_PARALLELISM)));
+        }
         Integer parallelism = 
options.get(FlinkConnectorOptions.SCAN_PARALLELISM);
         if (parallelism == null && 
options.get(FlinkConnectorOptions.INFER_SCAN_PARALLELISM)) {
             if (streaming) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/DataTableSourceTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/DataTableSourceTest.java
new file mode 100644
index 000000000..e83117735
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/DataTableSourceTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.PaimonDataStreamScanProvider;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.sink.InnerTableWrite;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.types.DataTypes;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Collections;
+import java.util.Optional;
+
+import static org.apache.paimon.options.OptionsUtils.PAIMON_PREFIX;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Tests for {@link DataTableSource}. */
+class DataTableSourceTest {
+    @Test
+    void testInferScanParallelism(@TempDir java.nio.file.Path path) throws 
Exception {
+        FileIO fileIO = LocalFileIO.create();
+        Path tablePath = new Path(path.toString());
+        SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
+        TableSchema tableSchema =
+                schemaManager.createTable(
+                        Schema.newBuilder()
+                                .column("a", DataTypes.INT())
+                                .column("b", DataTypes.BIGINT())
+                                .build());
+        FileStoreTable fileStoreTable =
+                FileStoreTableFactory.create(fileIO, tablePath, tableSchema);
+        InnerTableWrite writer = fileStoreTable.newWrite("test");
+        TableCommitImpl commit = fileStoreTable.newCommit("test");
+        writer.write(GenericRow.of(1, 2L));
+        writer.write(GenericRow.of(3, 4L));
+        writer.write(GenericRow.of(5, 6L));
+        writer.write(GenericRow.of(7, 8L));
+        writer.write(GenericRow.of(9, 10L));
+        writer.write(GenericRow.of(11, 12L));
+        writer.write(GenericRow.of(13, 14L));
+        writer.write(GenericRow.of(15, 16L));
+        writer.write(GenericRow.of(17, 18L));
+        commit.commit(writer.prepareCommit());
+
+        commit.close();
+        writer.close();
+
+        DataTableSource tableSource =
+                new DataTableSource(
+                        ObjectIdentifier.of("cat", "db", "table"),
+                        fileStoreTable,
+                        true,
+                        null,
+                        null);
+        PaimonDataStreamScanProvider runtimeProvider =
+                (PaimonDataStreamScanProvider)
+                        tableSource.getScanRuntimeProvider(
+                                new ScanTableSource.ScanContext() {
+                                    @Override
+                                    public <T> TypeInformation<T> 
createTypeInformation(
+                                            DataType dataType) {
+                                        throw new 
UnsupportedOperationException();
+                                    }
+
+                                    @Override
+                                    public <T> TypeInformation<T> 
createTypeInformation(
+                                            LogicalType logicalType) {
+                                        throw new 
UnsupportedOperationException();
+                                    }
+
+                                    @Override
+                                    public 
DynamicTableSource.DataStructureConverter
+                                            
createDataStructureConverter(DataType dataType) {
+                                        throw new 
UnsupportedOperationException();
+                                    }
+                                });
+        StreamExecutionEnvironment sEnv1 = 
StreamExecutionEnvironment.createLocalEnvironment();
+        DataStream<RowData> sourceStream1 =
+                runtimeProvider.produceDataStream(s -> Optional.empty(), 
sEnv1);
+        assertThat(sourceStream1.getParallelism()).isEqualTo(1);
+
+        StreamExecutionEnvironment sEnv2 =
+                StreamExecutionEnvironment.createLocalEnvironment(
+                        Configuration.fromMap(
+                                Collections.singletonMap(
+                                        String.format(
+                                                "%s%s",
+                                                PAIMON_PREFIX,
+                                                
FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key()),
+                                        "false")));
+        DataStream<RowData> sourceStream2 =
+                runtimeProvider.produceDataStream(s -> Optional.empty(), 
sEnv2);
+        // The default parallelism is not 1
+        assertThat(sourceStream2.getParallelism()).isNotEqualTo(1);
+        
assertThat(sourceStream2.getParallelism()).isEqualTo(sEnv2.getParallelism());
+    }
+}
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveUtils.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveUtils.java
index 3ede93564..af7f7a051 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveUtils.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveUtils.java
@@ -40,11 +40,11 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.options.OptionsUtils.PAIMON_PREFIX;
+
 /** Utils for create {@link FileStoreTable} and {@link Predicate}. */
 public class HiveUtils {
 
-    private static final String PAIMON_PREFIX = "paimon.";
-
     public static FileStoreTable createFileStoreTable(JobConf jobConf) {
         Options options = extractCatalogConfig(jobConf);
         options.set(CoreOptions.PATH, 
LocationKeyExtractor.getPaimonLocation(jobConf));

Reply via email to