This is an automated email from the ASF dual-hosted git repository.
libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 862a7129d27 [FLINK-33264][table] Support source parallelism setting
for DataGen connector
862a7129d27 is described below
commit 862a7129d2730b4c70a21826a5b858fc541a4470
Author: Zhanghao Chen <[email protected]>
AuthorDate: Thu Jan 18 21:26:42 2024 +0800
[FLINK-33264][table] Support source parallelism setting for DataGen
connector
Close apache/flink#24133
---
docs/content.zh/docs/connectors/table/datagen.md | 7 ++++
docs/content/docs/connectors/table/datagen.md | 7 ++++
.../f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e | 4 +-
.../src/test/resources/sql/table.q | 1 +
.../src/test/resources/sql/table.q | 1 +
.../datagen/table/DataGenConnectorOptions.java | 3 ++
.../datagen/table/DataGenTableSource.java | 11 ++++--
.../datagen/table/DataGenTableSourceFactory.java | 5 ++-
.../factories/DataGenTableSourceFactoryTest.java | 46 ++++++++++++++++++++++
.../stream/table/DataGeneratorConnectorITCase.java | 25 ++++++++++++
10 files changed, 104 insertions(+), 6 deletions(-)
diff --git a/docs/content.zh/docs/connectors/table/datagen.md
b/docs/content.zh/docs/connectors/table/datagen.md
index 642382a7768..b90183fcc8c 100644
--- a/docs/content.zh/docs/connectors/table/datagen.md
+++ b/docs/content.zh/docs/connectors/table/datagen.md
@@ -266,6 +266,13 @@ CREATE TABLE Orders (
<td>Long</td>
<td>生成数据的总行数。默认情况下,该表是无界的。</td>
</tr>
+ <tr>
+ <td><h5>scan.parallelism</h5></td>
+ <td>可选</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Integer</td>
+ <td>定义算子并行度。不设置将使用全局默认并发。</td>
+ </tr>
<tr>
<td><h5>fields.#.kind</h5></td>
<td>可选</td>
diff --git a/docs/content/docs/connectors/table/datagen.md
b/docs/content/docs/connectors/table/datagen.md
index 70253786bff..1f105076897 100644
--- a/docs/content/docs/connectors/table/datagen.md
+++ b/docs/content/docs/connectors/table/datagen.md
@@ -270,6 +270,13 @@ Connector Options
<td>Long</td>
<td>The total number of rows to emit. By default, the table is
unbounded.</td>
</tr>
+ <tr>
+ <td><h5>scan.parallelism</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Integer</td>
+ <td>Defines the parallelism of the source. If not set, the global
default parallelism is used.</td>
+ </tr>
<tr>
<td><h5>fields.#.kind</h5></td>
<td>optional</td>
diff --git
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e
index 4e44bbf3505..0fd86b29430 100644
---
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e
+++
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e
@@ -76,8 +76,8 @@ Constructor
<org.apache.flink.connector.datagen.source.GeneratingIteratorSourceR
Constructor
<org.apache.flink.connector.datagen.source.GeneratorSourceReaderFactory.<init>(org.apache.flink.connector.datagen.source.GeneratorFunction,
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy)>
calls method
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object)> in
(GeneratorSourceReaderFactory.java:54)
Constructor
<org.apache.flink.connector.datagen.source.GeneratorSourceReaderFactory.<init>(org.apache.flink.connector.datagen.source.GeneratorFunction,
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy)>
calls method
<org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object)> in
(GeneratorSourceReaderFactory.java:55)
Constructor
<org.apache.flink.connector.datagen.source.GeneratorSourceReaderFactory.<init>(org.apache.flink.connector.datagen.source.GeneratorFunction,
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy)> has
parameter of type
<org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy> in
(GeneratorSourceReaderFactory.java:0)
-Constructor
<org.apache.flink.connector.datagen.table.DataGenTableSource.<init>([Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;,
java.lang.String, org.apache.flink.table.types.DataType, long,
java.lang.Long)> depends on component type
<org.apache.flink.streaming.api.functions.source.datagen.DataGenerator> in
(DataGenTableSource.java:0)
-Constructor
<org.apache.flink.connector.datagen.table.DataGenTableSource.<init>([Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;,
java.lang.String, org.apache.flink.table.types.DataType, long,
java.lang.Long)> has parameter of type
<[Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;> in
(DataGenTableSource.java:0)
+Constructor
<org.apache.flink.connector.datagen.table.DataGenTableSource.<init>([Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;,
java.lang.String, org.apache.flink.table.types.DataType, long, java.lang.Long,
java.lang.Integer)> depends on component type
<org.apache.flink.streaming.api.functions.source.datagen.DataGenerator> in
(DataGenTableSource.java:0)
+Constructor
<org.apache.flink.connector.datagen.table.DataGenTableSource.<init>([Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;,
java.lang.String, org.apache.flink.table.types.DataType, long, java.lang.Long,
java.lang.Integer)> has parameter of type
<[Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;> in
(DataGenTableSource.java:0)
Constructor
<org.apache.flink.connector.datagen.table.DataGenVisitorBase.<init>(java.lang.String,
org.apache.flink.configuration.ReadableConfig)> calls constructor
<org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.<init>()>
in (DataGenVisitorBase.java:49)
Constructor
<org.apache.flink.connector.datagen.table.DataGeneratorContainer.<init>(org.apache.flink.streaming.api.functions.source.datagen.DataGenerator,
java.util.Set)> has parameter of type
<org.apache.flink.streaming.api.functions.source.datagen.DataGenerator> in
(DataGeneratorContainer.java:0)
Constructor
<org.apache.flink.connector.datagen.table.RandomGeneratorVisitor$1.<init>(int)>
calls constructor
<org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator.<init>()>
in (RandomGeneratorVisitor.java:458)
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/table.q
b/flink-table/flink-sql-client/src/test/resources/sql/table.q
index f5a23835530..57452546242 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/table.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/table.q
@@ -455,6 +455,7 @@ fields.user.min
fields.user.null-rate
number-of-rows
rows-per-second
+scan.parallelism
!error
# ==========================================================================
diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql/table.q
b/flink-table/flink-sql-gateway/src/test/resources/sql/table.q
index f94a6ef379d..597281a6e33 100644
--- a/flink-table/flink-sql-gateway/src/test/resources/sql/table.q
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/table.q
@@ -410,6 +410,7 @@ fields.user.min
fields.user.null-rate
number-of-rows
rows-per-second
+scan.parallelism
!error
# ==========================================================================
diff --git
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenConnectorOptions.java
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenConnectorOptions.java
index aa900754111..ef696b20045 100644
---
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenConnectorOptions.java
+++
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenConnectorOptions.java
@@ -21,6 +21,7 @@ package org.apache.flink.connector.datagen.table;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.table.factories.FactoryUtil;
import java.time.Duration;
@@ -53,6 +54,8 @@ public class DataGenConnectorOptions {
.withDescription(
"Total number of rows to emit. By default, the
source is unbounded.");
+ public static final ConfigOption<Integer> SOURCE_PARALLELISM =
FactoryUtil.SOURCE_PARALLELISM;
+
//
--------------------------------------------------------------------------------------------
// Placeholder options
//
--------------------------------------------------------------------------------------------
diff --git
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSource.java
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSource.java
index 9163c2a63f2..93fa9a9a8d8 100644
---
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSource.java
+++
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSource.java
@@ -32,6 +32,8 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.types.DataType;
+import javax.annotation.Nullable;
+
/** A {@link StreamTableSource} that emits generated data rows. */
@Internal
public class DataGenTableSource implements ScanTableSource,
SupportsLimitPushDown {
@@ -41,24 +43,27 @@ public class DataGenTableSource implements ScanTableSource,
SupportsLimitPushDow
private final DataType rowDataType;
private final long rowsPerSecond;
private Long numberOfRows;
+ private final @Nullable Integer parallelism;
public DataGenTableSource(
DataGenerator<?>[] fieldGenerators,
String tableName,
DataType rowDataType,
long rowsPerSecond,
- Long numberOfRows) {
+ Long numberOfRows,
+ Integer parallelism) {
this.fieldGenerators = fieldGenerators;
this.tableName = tableName;
this.rowDataType = rowDataType;
this.rowsPerSecond = rowsPerSecond;
this.numberOfRows = numberOfRows;
+ this.parallelism = parallelism;
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
boolean isBounded = numberOfRows != null;
- return SourceFunctionProvider.of(createSource(), isBounded);
+ return SourceFunctionProvider.of(createSource(), isBounded,
parallelism);
}
@VisibleForTesting
@@ -72,7 +77,7 @@ public class DataGenTableSource implements ScanTableSource,
SupportsLimitPushDow
@Override
public DynamicTableSource copy() {
return new DataGenTableSource(
- fieldGenerators, tableName, rowDataType, rowsPerSecond,
numberOfRows);
+ fieldGenerators, tableName, rowDataType, rowsPerSecond,
numberOfRows, parallelism);
}
@Override
diff --git
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java
index 72398c34b96..fc55764fddf 100644
---
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java
+++
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java
@@ -64,6 +64,7 @@ public class DataGenTableSourceFactory implements
DynamicTableSourceFactory {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(DataGenConnectorOptions.ROWS_PER_SECOND);
options.add(DataGenConnectorOptions.NUMBER_OF_ROWS);
+ options.add(DataGenConnectorOptions.SOURCE_PARALLELISM);
// Placeholder options
options.add(DataGenConnectorOptions.FIELD_KIND);
@@ -116,6 +117,7 @@ public class DataGenTableSourceFactory implements
DynamicTableSourceFactory {
consumedOptionKeys.add(CONNECTOR.key());
consumedOptionKeys.add(DataGenConnectorOptions.ROWS_PER_SECOND.key());
consumedOptionKeys.add(DataGenConnectorOptions.NUMBER_OF_ROWS.key());
+
consumedOptionKeys.add(DataGenConnectorOptions.SOURCE_PARALLELISM.key());
optionalOptions.stream().map(ConfigOption::key).forEach(consumedOptionKeys::add);
FactoryUtil.validateUnconsumedKeys(
factoryIdentifier(), options.keySet(), consumedOptionKeys);
@@ -126,7 +128,8 @@ public class DataGenTableSourceFactory implements
DynamicTableSourceFactory {
name,
rowDataType,
options.get(DataGenConnectorOptions.ROWS_PER_SECOND),
- options.get(DataGenConnectorOptions.NUMBER_OF_ROWS));
+ options.get(DataGenConnectorOptions.NUMBER_OF_ROWS),
+
options.getOptional(DataGenConnectorOptions.SOURCE_PARALLELISM).orElse(null));
}
private DataGeneratorContainer createContainer(
diff --git
a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
index 882ec105f01..687d479839c 100644
---
a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
+++
b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.factories;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.datagen.table.DataGenConnectorOptions;
import org.apache.flink.connector.datagen.table.DataGenConnectorOptionsUtil;
import org.apache.flink.connector.datagen.table.DataGenTableSource;
@@ -34,9 +35,13 @@ import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.InstantiationUtil;
import org.junit.jupiter.api.Test;
@@ -45,8 +50,10 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
@@ -581,6 +588,27 @@ class DataGenTableSourceFactoryTest {
anyCauseMatches("Could not parse value 'Wrong' for key
'fields.f0.start'"));
}
+ @Test
+ void testWithParallelism() {
+ ResolvedSchema schema = ResolvedSchema.of(Column.physical("f0",
DataTypes.CHAR(1)));
+
+ Map<String, String> options = new HashMap<>();
+ options.put(FactoryUtil.CONNECTOR.key(), "datagen");
+ options.put(DataGenConnectorOptions.SOURCE_PARALLELISM.key(), "10");
+
+ DynamicTableSource source = createTableSource(schema, options);
+ assertThat(source).isInstanceOf(DataGenTableSource.class);
+
+ DataGenTableSource dataGenTableSource = (DataGenTableSource) source;
+ ScanTableSource.ScanRuntimeProvider scanRuntimeProvider =
+ dataGenTableSource.getScanRuntimeProvider(new
TestScanContext());
+
assertThat(scanRuntimeProvider).isInstanceOf(SourceFunctionProvider.class);
+
+ SourceFunctionProvider sourceFunctionProvider =
+ (SourceFunctionProvider) scanRuntimeProvider;
+ assertThat(sourceFunctionProvider.getParallelism()).hasValue(10);
+ }
+
private void assertException(
ResolvedSchema schema,
DescriptorProperties descriptor,
@@ -649,4 +677,22 @@ class DataGenTableSourceFactoryTest {
@Override
public void close() {}
}
+
+ private static class TestScanContext implements
ScanTableSource.ScanContext {
+ @Override
+ public <T> TypeInformation<T> createTypeInformation(DataType
producedDataType) {
+ return null;
+ }
+
+ @Override
+ public <T> TypeInformation<T> createTypeInformation(LogicalType
producedLogicalType) {
+ return null;
+ }
+
+ @Override
+ public DynamicTableSource.DataStructureConverter
createDataStructureConverter(
+ DataType producedDataType) {
+ return null;
+ }
+ }
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/DataGeneratorConnectorITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/DataGeneratorConnectorITCase.java
index c017d1f6b5d..f62d161affe 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/DataGeneratorConnectorITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/DataGeneratorConnectorITCase.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.planner.runtime.stream.table;
import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
@@ -102,4 +103,28 @@ class DataGeneratorConnectorITCase extends BatchTestBase {
.as("Unexpected number of results")
.hasSize(5);
}
+
+ @Test
+ void testWithParallelism() {
+ final TestingTableEnvironment env =
+ TestingTableEnvironment.create(
+
EnvironmentSettings.newInstance().inStreamingMode().build(),
+ null,
+ TableConfig.getDefault());
+
+ env.executeSql(
+ "CREATE TABLE datagen_t (\n"
+ + " f0 CHAR(1)\n"
+ + ") WITH ("
+ + " 'connector' = 'datagen',"
+ + " 'scan.parallelism' = '2'"
+ + ")");
+
+ final Table table = env.sqlQuery("select * from datagen_t");
+ final String explain =
table.explain(ExplainDetail.JSON_EXECUTION_PLAN);
+ final String expectedPhysicalExecutionPlanFragment =
+ "table=[[default_catalog, default_database, datagen_t]],
fields=[f0])\",\n"
+ + " \"parallelism\" : 2";
+ assertThat(explain).contains(expectedPhysicalExecutionPlanFragment);
+ }
}