This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 862a7129d27 [FLINK-33264][table] Support source parallelism setting 
for DataGen connector
862a7129d27 is described below

commit 862a7129d2730b4c70a21826a5b858fc541a4470
Author: Zhanghao Chen <[email protected]>
AuthorDate: Thu Jan 18 21:26:42 2024 +0800

    [FLINK-33264][table] Support source parallelism setting for DataGen 
connector
    
    Close apache/flink#24133
---
 docs/content.zh/docs/connectors/table/datagen.md   |  7 ++++
 docs/content/docs/connectors/table/datagen.md      |  7 ++++
 .../f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e           |  4 +-
 .../src/test/resources/sql/table.q                 |  1 +
 .../src/test/resources/sql/table.q                 |  1 +
 .../datagen/table/DataGenConnectorOptions.java     |  3 ++
 .../datagen/table/DataGenTableSource.java          | 11 ++++--
 .../datagen/table/DataGenTableSourceFactory.java   |  5 ++-
 .../factories/DataGenTableSourceFactoryTest.java   | 46 ++++++++++++++++++++++
 .../stream/table/DataGeneratorConnectorITCase.java | 25 ++++++++++++
 10 files changed, 104 insertions(+), 6 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/datagen.md 
b/docs/content.zh/docs/connectors/table/datagen.md
index 642382a7768..b90183fcc8c 100644
--- a/docs/content.zh/docs/connectors/table/datagen.md
+++ b/docs/content.zh/docs/connectors/table/datagen.md
@@ -266,6 +266,13 @@ CREATE TABLE Orders (
       <td>Long</td>
       <td>生成数据的总行数。默认情况下,该表是无界的。</td>
     </tr>
+    <tr>
+      <td><h5>scan.parallelism</h5></td>
+      <td>可选</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Integer</td>
+      <td>定义算子并行度。不设置将使用全局默认并发。</td>
+    </tr>
     <tr>
       <td><h5>fields.#.kind</h5></td>
       <td>可选</td>
diff --git a/docs/content/docs/connectors/table/datagen.md 
b/docs/content/docs/connectors/table/datagen.md
index 70253786bff..1f105076897 100644
--- a/docs/content/docs/connectors/table/datagen.md
+++ b/docs/content/docs/connectors/table/datagen.md
@@ -270,6 +270,13 @@ Connector Options
       <td>Long</td>
       <td>The total number of rows to emit. By default, the table is 
unbounded.</td>
     </tr>
+    <tr>
+      <td><h5>scan.parallelism</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Integer</td>
+      <td>Defines the parallelism of the source. If not set, the global 
default parallelism is used.</td>
+    </tr>
     <tr>
       <td><h5>fields.#.kind</h5></td>
       <td>optional</td>
diff --git 
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e
 
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e
index 4e44bbf3505..0fd86b29430 100644
--- 
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e
+++ 
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e
@@ -76,8 +76,8 @@ Constructor 
<org.apache.flink.connector.datagen.source.GeneratingIteratorSourceR
 Constructor 
<org.apache.flink.connector.datagen.source.GeneratorSourceReaderFactory.<init>(org.apache.flink.connector.datagen.source.GeneratorFunction,
 org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy)> 
calls method 
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object)> in 
(GeneratorSourceReaderFactory.java:54)
 Constructor 
<org.apache.flink.connector.datagen.source.GeneratorSourceReaderFactory.<init>(org.apache.flink.connector.datagen.source.GeneratorFunction,
 org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy)> 
calls method 
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object)> in 
(GeneratorSourceReaderFactory.java:55)
 Constructor 
<org.apache.flink.connector.datagen.source.GeneratorSourceReaderFactory.<init>(org.apache.flink.connector.datagen.source.GeneratorFunction,
 org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy)> has 
parameter of type 
<org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy> in 
(GeneratorSourceReaderFactory.java:0)
-Constructor 
<org.apache.flink.connector.datagen.table.DataGenTableSource.<init>([Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;,
 java.lang.String, org.apache.flink.table.types.DataType, long, 
java.lang.Long)> depends on component type 
<org.apache.flink.streaming.api.functions.source.datagen.DataGenerator> in 
(DataGenTableSource.java:0)
-Constructor 
<org.apache.flink.connector.datagen.table.DataGenTableSource.<init>([Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;,
 java.lang.String, org.apache.flink.table.types.DataType, long, 
java.lang.Long)> has parameter of type 
<[Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;> in 
(DataGenTableSource.java:0)
+Constructor 
<org.apache.flink.connector.datagen.table.DataGenTableSource.<init>([Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;,
 java.lang.String, org.apache.flink.table.types.DataType, long, java.lang.Long, 
java.lang.Integer)> depends on component type 
<org.apache.flink.streaming.api.functions.source.datagen.DataGenerator> in 
(DataGenTableSource.java:0)
+Constructor 
<org.apache.flink.connector.datagen.table.DataGenTableSource.<init>([Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;,
 java.lang.String, org.apache.flink.table.types.DataType, long, java.lang.Long, 
java.lang.Integer)> has parameter of type 
<[Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;> in 
(DataGenTableSource.java:0)
 Constructor 
<org.apache.flink.connector.datagen.table.DataGenVisitorBase.<init>(java.lang.String,
 org.apache.flink.configuration.ReadableConfig)> calls constructor 
<org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.<init>()> 
in (DataGenVisitorBase.java:49)
 Constructor 
<org.apache.flink.connector.datagen.table.DataGeneratorContainer.<init>(org.apache.flink.streaming.api.functions.source.datagen.DataGenerator,
 java.util.Set)> has parameter of type 
<org.apache.flink.streaming.api.functions.source.datagen.DataGenerator> in 
(DataGeneratorContainer.java:0)
 Constructor 
<org.apache.flink.connector.datagen.table.RandomGeneratorVisitor$1.<init>(int)> 
calls constructor 
<org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator.<init>()>
 in (RandomGeneratorVisitor.java:458)
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/table.q 
b/flink-table/flink-sql-client/src/test/resources/sql/table.q
index f5a23835530..57452546242 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/table.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/table.q
@@ -455,6 +455,7 @@ fields.user.min
 fields.user.null-rate
 number-of-rows
 rows-per-second
+scan.parallelism
 !error
 
 # ==========================================================================
diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql/table.q 
b/flink-table/flink-sql-gateway/src/test/resources/sql/table.q
index f94a6ef379d..597281a6e33 100644
--- a/flink-table/flink-sql-gateway/src/test/resources/sql/table.q
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/table.q
@@ -410,6 +410,7 @@ fields.user.min
 fields.user.null-rate
 number-of-rows
 rows-per-second
+scan.parallelism
 !error
 
 # ==========================================================================
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenConnectorOptions.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenConnectorOptions.java
index aa900754111..ef696b20045 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenConnectorOptions.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenConnectorOptions.java
@@ -21,6 +21,7 @@ package org.apache.flink.connector.datagen.table;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.table.factories.FactoryUtil;
 
 import java.time.Duration;
 
@@ -53,6 +54,8 @@ public class DataGenConnectorOptions {
                     .withDescription(
                             "Total number of rows to emit. By default, the 
source is unbounded.");
 
+    public static final ConfigOption<Integer> SOURCE_PARALLELISM = 
FactoryUtil.SOURCE_PARALLELISM;
+
     // 
--------------------------------------------------------------------------------------------
     // Placeholder options
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSource.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSource.java
index 9163c2a63f2..93fa9a9a8d8 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSource.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSource.java
@@ -32,6 +32,8 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.table.types.DataType;
 
+import javax.annotation.Nullable;
+
 /** A {@link StreamTableSource} that emits generated data rows. */
 @Internal
 public class DataGenTableSource implements ScanTableSource, 
SupportsLimitPushDown {
@@ -41,24 +43,27 @@ public class DataGenTableSource implements ScanTableSource, 
SupportsLimitPushDow
     private final DataType rowDataType;
     private final long rowsPerSecond;
     private Long numberOfRows;
+    private final @Nullable Integer parallelism;
 
     public DataGenTableSource(
             DataGenerator<?>[] fieldGenerators,
             String tableName,
             DataType rowDataType,
             long rowsPerSecond,
-            Long numberOfRows) {
+            Long numberOfRows,
+            Integer parallelism) {
         this.fieldGenerators = fieldGenerators;
         this.tableName = tableName;
         this.rowDataType = rowDataType;
         this.rowsPerSecond = rowsPerSecond;
         this.numberOfRows = numberOfRows;
+        this.parallelism = parallelism;
     }
 
     @Override
     public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
         boolean isBounded = numberOfRows != null;
-        return SourceFunctionProvider.of(createSource(), isBounded);
+        return SourceFunctionProvider.of(createSource(), isBounded, 
parallelism);
     }
 
     @VisibleForTesting
@@ -72,7 +77,7 @@ public class DataGenTableSource implements ScanTableSource, 
SupportsLimitPushDow
     @Override
     public DynamicTableSource copy() {
         return new DataGenTableSource(
-                fieldGenerators, tableName, rowDataType, rowsPerSecond, 
numberOfRows);
+                fieldGenerators, tableName, rowDataType, rowsPerSecond, 
numberOfRows, parallelism);
     }
 
     @Override
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java
index 72398c34b96..fc55764fddf 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java
@@ -64,6 +64,7 @@ public class DataGenTableSourceFactory implements 
DynamicTableSourceFactory {
         Set<ConfigOption<?>> options = new HashSet<>();
         options.add(DataGenConnectorOptions.ROWS_PER_SECOND);
         options.add(DataGenConnectorOptions.NUMBER_OF_ROWS);
+        options.add(DataGenConnectorOptions.SOURCE_PARALLELISM);
 
         // Placeholder options
         options.add(DataGenConnectorOptions.FIELD_KIND);
@@ -116,6 +117,7 @@ public class DataGenTableSourceFactory implements 
DynamicTableSourceFactory {
         consumedOptionKeys.add(CONNECTOR.key());
         consumedOptionKeys.add(DataGenConnectorOptions.ROWS_PER_SECOND.key());
         consumedOptionKeys.add(DataGenConnectorOptions.NUMBER_OF_ROWS.key());
+        
consumedOptionKeys.add(DataGenConnectorOptions.SOURCE_PARALLELISM.key());
         
optionalOptions.stream().map(ConfigOption::key).forEach(consumedOptionKeys::add);
         FactoryUtil.validateUnconsumedKeys(
                 factoryIdentifier(), options.keySet(), consumedOptionKeys);
@@ -126,7 +128,8 @@ public class DataGenTableSourceFactory implements 
DynamicTableSourceFactory {
                 name,
                 rowDataType,
                 options.get(DataGenConnectorOptions.ROWS_PER_SECOND),
-                options.get(DataGenConnectorOptions.NUMBER_OF_ROWS));
+                options.get(DataGenConnectorOptions.NUMBER_OF_ROWS),
+                
options.getOptional(DataGenConnectorOptions.SOURCE_PARALLELISM).orElse(null));
     }
 
     private DataGeneratorContainer createContainer(
diff --git 
a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
 
b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
index 882ec105f01..687d479839c 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.factories;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.connector.datagen.table.DataGenConnectorOptions;
 import org.apache.flink.connector.datagen.table.DataGenConnectorOptionsUtil;
 import org.apache.flink.connector.datagen.table.DataGenTableSource;
@@ -34,9 +35,13 @@ import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.util.InstantiationUtil;
 
 import org.junit.jupiter.api.Test;
@@ -45,8 +50,10 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
@@ -581,6 +588,27 @@ class DataGenTableSourceFactoryTest {
                         anyCauseMatches("Could not parse value 'Wrong' for key 
'fields.f0.start'"));
     }
 
+    @Test
+    void testWithParallelism() {
+        ResolvedSchema schema = ResolvedSchema.of(Column.physical("f0", 
DataTypes.CHAR(1)));
+
+        Map<String, String> options = new HashMap<>();
+        options.put(FactoryUtil.CONNECTOR.key(), "datagen");
+        options.put(DataGenConnectorOptions.SOURCE_PARALLELISM.key(), "10");
+
+        DynamicTableSource source = createTableSource(schema, options);
+        assertThat(source).isInstanceOf(DataGenTableSource.class);
+
+        DataGenTableSource dataGenTableSource = (DataGenTableSource) source;
+        ScanTableSource.ScanRuntimeProvider scanRuntimeProvider =
+                dataGenTableSource.getScanRuntimeProvider(new 
TestScanContext());
+        
assertThat(scanRuntimeProvider).isInstanceOf(SourceFunctionProvider.class);
+
+        SourceFunctionProvider sourceFunctionProvider =
+                (SourceFunctionProvider) scanRuntimeProvider;
+        assertThat(sourceFunctionProvider.getParallelism()).hasValue(10);
+    }
+
     private void assertException(
             ResolvedSchema schema,
             DescriptorProperties descriptor,
@@ -649,4 +677,22 @@ class DataGenTableSourceFactoryTest {
         @Override
         public void close() {}
     }
+
+    private static class TestScanContext implements 
ScanTableSource.ScanContext {
+        @Override
+        public <T> TypeInformation<T> createTypeInformation(DataType 
producedDataType) {
+            return null;
+        }
+
+        @Override
+        public <T> TypeInformation<T> createTypeInformation(LogicalType 
producedLogicalType) {
+            return null;
+        }
+
+        @Override
+        public DynamicTableSource.DataStructureConverter 
createDataStructureConverter(
+                DataType producedDataType) {
+            return null;
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/DataGeneratorConnectorITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/DataGeneratorConnectorITCase.java
index c017d1f6b5d..f62d161affe 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/DataGeneratorConnectorITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/DataGeneratorConnectorITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.runtime.stream.table;
 
 import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.ExplainDetail;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
@@ -102,4 +103,28 @@ class DataGeneratorConnectorITCase extends BatchTestBase {
                 .as("Unexpected number of results")
                 .hasSize(5);
     }
+
+    @Test
+    void testWithParallelism() {
+        final TestingTableEnvironment env =
+                TestingTableEnvironment.create(
+                        
EnvironmentSettings.newInstance().inStreamingMode().build(),
+                        null,
+                        TableConfig.getDefault());
+
+        env.executeSql(
+                "CREATE TABLE datagen_t (\n"
+                        + "    f0 CHAR(1)\n"
+                        + ") WITH ("
+                        + "    'connector' = 'datagen',"
+                        + "    'scan.parallelism' = '2'"
+                        + ")");
+
+        final Table table = env.sqlQuery("select * from datagen_t");
+        final String explain = 
table.explain(ExplainDetail.JSON_EXECUTION_PLAN);
+        final String expectedPhysicalExecutionPlanFragment =
+                "table=[[default_catalog, default_database, datagen_t]], 
fields=[f0])\",\n"
+                        + "    \"parallelism\" : 2";
+        assertThat(explain).contains(expectedPhysicalExecutionPlanFragment);
+    }
 }

Reply via email to