This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.1 in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
commit 621d54a7accafe82dee51a0c0fa17d9724f856d7 Author: Jingsong Lee <[email protected]> AuthorDate: Sat Apr 2 14:27:09 2022 +0800 [FLINK-26911] Introduce parallelism setter for table store This closes #67 --- .../flink/table/store/connector/TableStore.java | 61 +++++++-- .../table/store/connector/TableStoreFactory.java | 1 + .../store/connector/TableStoreFactoryOptions.java | 23 ++++ .../table/store/connector/sink/TableStoreSink.java | 2 + .../store/connector/source/TableStoreSource.java | 27 +++- .../table/store/connector/FileStoreITCase.java | 16 +-- .../store/connector/ReadWriteTableITCase.java | 140 ++++++++++++++++++--- .../store/connector/sink/LogStoreSinkITCase.java | 5 +- 8 files changed, 232 insertions(+), 43 deletions(-) diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java index d222018..88826f0 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java @@ -148,6 +148,10 @@ public class TableStore { return primaryKeyType.getFieldNames(); } + public Configuration options() { + return options; + } + public Configuration logOptions() { return new DelegatingConfiguration(options, LOG_PREFIX); } @@ -227,6 +231,8 @@ public class TableStore { private boolean isContinuous = false; + private StreamExecutionEnvironment env; + @Nullable private int[][] projectedFields; @Nullable private Predicate partitionPredicate; @@ -235,6 +241,13 @@ public class TableStore { @Nullable private LogSourceProvider logSourceProvider; + @Nullable private Integer parallelism; + + public SourceBuilder withEnv(StreamExecutionEnvironment env) { + this.env = env; + return this; + } + public SourceBuilder withProjection(int[][] projectedFields) { this.projectedFields = projectedFields; return this; @@ -260,6 +273,11 @@ public class TableStore { return this; } + public SourceBuilder withParallelism(@Nullable Integer parallelism) { + this.parallelism = parallelism; + return this; + } + private long discoveryIntervalMills() { return options.get(CONTINUOUS_DISCOVERY_INTERVAL).toMillis(); } @@ -277,7 +295,7 @@ public class TableStore { fieldPredicate); } - public Source<RowData, ?, ?> build() { + private Source<RowData, ?, ?> buildSource() { if (isContinuous) { LogStartupMode startupMode = logOptions().get(SCAN); if (logSourceProvider == null) { @@ -298,17 +316,27 @@ public class TableStore { } } - public DataStreamSource<RowData> build(StreamExecutionEnvironment env) { + public DataStreamSource<RowData> build() { + if (env == null) { + throw new IllegalArgumentException( + "StreamExecutionEnvironment should not be null."); + } + LogicalType produceType = Optional.ofNullable(projectedFields) .map(Projection::of) .map(p -> p.project(type)) .orElse(type); - return env.fromSource( - build(), - WatermarkStrategy.noWatermarks(), - tableIdentifier.asSummaryString(), - InternalTypeInfo.of(produceType)); + DataStreamSource<RowData> dataStream = + env.fromSource( + buildSource(), + WatermarkStrategy.noWatermarks(), + tableIdentifier.asSummaryString(), + InternalTypeInfo.of(produceType)); + if (parallelism != null) { + dataStream.setParallelism(parallelism); + } + return dataStream; } } @@ -323,6 +351,8 @@ public class TableStore { @Nullable private LogSinkProvider logSinkProvider; + @Nullable private Integer parallelism; + public SinkBuilder withInput(DataStream<RowData> input) { this.input = input; return this; @@ -343,6 +373,11 @@ public class TableStore { return this; } + public SinkBuilder withParallelism(@Nullable Integer parallelism) { + this.parallelism = parallelism; + return this; + } + public DataStreamSink<?> build() { FileStore fileStore = buildFileStore(); int numBucket = options.get(BUCKET); @@ -350,10 +385,11 @@ public class TableStore { BucketStreamPartitioner partitioner = new BucketStreamPartitioner( numBucket, type, partitions, primaryKeys, logPrimaryKeys); - DataStream<RowData> partitioned = - new DataStream<>( - input.getExecutionEnvironment(), - new PartitionTransformation<>(input.getTransformation(), partitioner)); + PartitionTransformation<RowData> partitioned = + new PartitionTransformation<>(input.getTransformation(), partitioner); + if (parallelism != null) { + partitioned.setParallelism(parallelism); + } StoreSink<?, ?> sink = new StoreSink<>( @@ -366,7 +402,8 @@ public class TableStore { lockFactory, overwritePartition, logSinkProvider); - return GlobalCommittingSinkTranslator.translate(partitioned, sink); + return GlobalCommittingSinkTranslator.translate( + new DataStream<>(input.getExecutionEnvironment(), partitioned), sink); } } } diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java index 4cd73a8..78cba33 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java @@ -177,6 +177,7 @@ public class TableStoreFactory public Set<ConfigOption<?>> optionalOptions() { Set<ConfigOption<?>> options = FileStoreOptions.allOptions(); options.addAll(MergeTreeOptions.allOptions()); + options.addAll(TableStoreFactoryOptions.allOptions()); return options; } diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java index 8965370..4337ec5 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java @@ -20,6 +20,10 @@ package org.apache.flink.table.store.connector; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.table.factories.FactoryUtil; + +import java.util.HashSet; +import java.util.Set; /** Options for {@link TableStoreFactory}. */ public class TableStoreFactoryOptions { @@ -29,4 +33,23 @@ public class TableStoreFactoryOptions { .stringType() .noDefaultValue() .withDescription("The log system used to keep changes of the table."); + + public static final ConfigOption<Integer> SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM; + + public static final ConfigOption<Integer> SCAN_PARALLELISM = + ConfigOptions.key("scan.parallelism") + .intType() + .noDefaultValue() + .withDescription( + "Defines a custom parallelism for the scan source. " + + "By default, if this option is not defined, the planner will derive the parallelism " + + "for each statement individually by also considering the global configuration."); + + public static Set<ConfigOption<?>> allOptions() { + Set<ConfigOption<?>> allOptions = new HashSet<>(); + allOptions.add(LOG_SYSTEM); + allOptions.add(SINK_PARALLELISM); + allOptions.add(SCAN_PARALLELISM); + return allOptions; + } } diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java index eba324d..962aa0f 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java @@ -42,6 +42,7 @@ import javax.annotation.Nullable; import java.util.HashMap; import java.util.Map; +import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.SINK_PARALLELISM; import static org.apache.flink.table.store.log.LogOptions.CHANGELOG_MODE; /** Table sink to create {@link StoreSink}. */ @@ -138,6 +139,7 @@ public class TableStoreSink .withLockFactory(lockFactory) .withLogSinkProvider(finalLogSinkProvider) .withOverwritePartition(overwrite ? staticPartitions : null) + .withParallelism(tableStore.options().get(SINK_PARALLELISM)) .build(); } diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java index e1681fe..6071c64 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java @@ -20,12 +20,16 @@ package org.apache.flink.table.store.connector.source; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.ProviderContext; +import org.apache.flink.table.connector.source.DataStreamScanProvider; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.ScanTableSource; -import org.apache.flink.table.connector.source.SourceProvider; import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; +import org.apache.flink.table.data.RowData; import org.apache.flink.table.expressions.CallExpression; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.expressions.ExpressionVisitor; @@ -47,6 +51,7 @@ import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; +import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.SCAN_PARALLELISM; import static org.apache.flink.table.store.log.LogOptions.CHANGELOG_MODE; import static org.apache.flink.table.store.log.LogOptions.CONSISTENCY; import static org.apache.flink.table.store.log.LogOptions.LogChangelogMode.ALL; @@ -130,15 +135,29 @@ public class TableStoreSource }, projectFields); } - TableStore.SourceBuilder builder = + + TableStore.SourceBuilder sourceBuilder = tableStore .sourceBuilder() .withContinuousMode(streaming) .withLogSourceProvider(logSourceProvider) .withProjection(projectFields) .withPartitionPredicate(PredicateConverter.convert(partitionFilters)) - .withFieldPredicate(PredicateConverter.convert(fieldFilters)); - return SourceProvider.of(builder.build()); + .withFieldPredicate(PredicateConverter.convert(fieldFilters)) + .withParallelism(tableStore.options().get(SCAN_PARALLELISM)); + + return new DataStreamScanProvider() { + @Override + public DataStream<RowData> produceDataStream( + ProviderContext providerContext, StreamExecutionEnvironment env) { + return sourceBuilder.withEnv(env).build(); + } + + @Override + public boolean isBounded() { + return !streaming; + } + }; } @Override diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java index 07bc759..645e208 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java @@ -126,7 +126,7 @@ public class FileStoreITCase extends AbstractTestBase { env.execute(); // read - List<Row> results = executeAndCollect(store.sourceBuilder().build(env)); + List<Row> results = executeAndCollect(store.sourceBuilder().withEnv(env).build()); // assert Row[] expected = @@ -145,7 +145,7 @@ public class FileStoreITCase extends AbstractTestBase { env.execute(); // read - List<Row> results = executeAndCollect(store.sourceBuilder().build(env)); + List<Row> results = executeAndCollect(store.sourceBuilder().withEnv(env).build()); // assert Row[] expected = new Row[] {Row.of(5, "p2", 1), Row.of(0, "p1", 2), Row.of(3, "p2", 5)}; @@ -173,7 +173,7 @@ public class FileStoreITCase extends AbstractTestBase { env.execute(); // read - List<Row> results = executeAndCollect(store.sourceBuilder().build(env)); + List<Row> results = executeAndCollect(store.sourceBuilder().withEnv(env).build()); Row[] expected = new Row[] {Row.of(9, "p2", 5), Row.of(5, "p1", 1), Row.of(0, "p1", 2)}; assertThat(results).containsExactlyInAnyOrder(expected); @@ -188,7 +188,7 @@ public class FileStoreITCase extends AbstractTestBase { env.execute(); // read - results = executeAndCollect(store.sourceBuilder().build(env)); + results = executeAndCollect(store.sourceBuilder().withEnv(env).build()); expected = new Row[] {Row.of(19, "p2", 6)}; assertThat(results).containsExactlyInAnyOrder(expected); } @@ -202,7 +202,7 @@ public class FileStoreITCase extends AbstractTestBase { env.execute(); // read - List<Row> results = executeAndCollect(store.sourceBuilder().build(env)); + List<Row> results = executeAndCollect(store.sourceBuilder().withEnv(env).build()); // assert // in streaming mode, expect origin data X 2 (FiniteTestSource) @@ -242,7 +242,8 @@ public class FileStoreITCase extends AbstractTestBase { executeAndCollect( store.sourceBuilder() .withProjection(projection.toNestedIndexes()) - .build(env), + .withEnv(env) + .build(), converter); // assert @@ -281,7 +282,8 @@ public class FileStoreITCase extends AbstractTestBase { BlockingIterator.of( store.sourceBuilder() .withContinuousMode(true) - .build(env) + .withEnv(env) + .build() .executeAndCollect(), CONVERTER::toExternal); Thread.sleep(ThreadLocalRandom.current().nextInt(1000)); diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java index 6ea0367..88b608f 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java @@ -19,14 +19,33 @@ package org.apache.flink.table.store.connector; import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.transformations.PartitionTransformation; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.connector.sink.DataStreamSinkProvider; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DynamicTableFactory.Context; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext; +import org.apache.flink.table.store.connector.sink.TableStoreSink; import org.apache.flink.table.store.file.FileStoreOptions; import org.apache.flink.table.store.file.utils.BlockingIterator; import org.apache.flink.table.store.kafka.KafkaTableTestBase; @@ -46,11 +65,14 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow; import static org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData; import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.LOG_SYSTEM; +import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.SCAN_PARALLELISM; +import static org.apache.flink.table.store.connector.TableStoreFactoryOptions.SINK_PARALLELISM; import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS; import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX; import static org.assertj.core.api.Assertions.assertThat; @@ -1262,17 +1284,12 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { .close(); } - private Tuple2<String, BlockingIterator<Row, Row>> collectAndCheckUnderSameEnv( + private Tuple2<String, String> createSourceAndManagedTable( boolean streaming, boolean enableLogStore, boolean insertOnly, boolean partitioned, - boolean hasPk, - boolean writeFirst, - Map<String, String> readHints, - @Nullable String filter, - List<String> projection, - List<Row> expected) + boolean hasPk) throws Exception { Map<String, String> tableOptions = new HashMap<>(); rootPath = TEMPORARY_FOLDER.newFolder().getPath(); @@ -1305,6 +1322,26 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { tEnv = StreamTableEnvironment.create(env, builder.build()); tEnv.executeSql(helperTableDdl); tEnv.executeSql(managedTableDdl); + return new Tuple2<>(sourceTable, managedTable); + } + + private Tuple2<String, BlockingIterator<Row, Row>> collectAndCheckUnderSameEnv( + boolean streaming, + boolean enableLogStore, + boolean insertOnly, + boolean partitioned, + boolean hasPk, + boolean writeFirst, + Map<String, String> readHints, + @Nullable String filter, + List<String> projection, + List<Row> expected) + throws Exception { + Tuple2<String, String> tables = + createSourceAndManagedTable( + streaming, enableLogStore, insertOnly, partitioned, hasPk); + String sourceTable = tables.f0; + String managedTable = tables.f1; String insertQuery = prepareInsertIntoQuery(sourceTable, managedTable); String selectQuery = prepareSelectQuery(managedTable, readHints, filter, projection); @@ -1516,17 +1553,15 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { private static String buildHints(Map<String, String> hints) { if (hints.size() > 0) { - StringBuilder hintsBuilder = new StringBuilder("/*+ OPTIONS ("); - hints.forEach( - (k, v) -> { - hintsBuilder.append(String.format("'%s'", k)); - hintsBuilder.append(" = "); - hintsBuilder.append(String.format("'%s', ", v)); - }); - int len = hintsBuilder.length(); - hintsBuilder.deleteCharAt(len - 2); - hintsBuilder.append(") */"); - return hintsBuilder.toString(); + String hintString = + hints.entrySet().stream() + .map( + entry -> + String.format( + "'%s' = '%s'", + entry.getKey(), entry.getValue())) + .collect(Collectors.joining(", ")); + return "/*+ OPTIONS (" + hintString + ") */"; } return ""; } @@ -1713,4 +1748,73 @@ public class ReadWriteTableITCase extends KafkaTableTestBase { env.setParallelism(2); return env; } + + @Test + public void testSourceParallelism() throws Exception { + String managedTable = createSourceAndManagedTable(false, false, false, false, false).f1; + + // without hint + String query = prepareSimpleSelectQuery(managedTable, Collections.emptyMap()); + assertThat(sourceParallelism(query)).isEqualTo(env.getParallelism()); + + // with hint + query = + prepareSimpleSelectQuery( + managedTable, Collections.singletonMap(SCAN_PARALLELISM.key(), "66")); + assertThat(sourceParallelism(query)).isEqualTo(66); + } + + private int sourceParallelism(String sql) { + DataStream<Row> stream = tEnv.toChangelogStream(tEnv.sqlQuery(sql)); + return stream.getParallelism(); + } + + @Test + public void testSinkParallelism() { + testSinkParallelism(null, env.getParallelism()); + testSinkParallelism(23, 23); + } + + private void testSinkParallelism(Integer configParallelism, int expectedParallelism) { + // 1. create a mock table sink + ResolvedSchema schema = ResolvedSchema.of(Column.physical("a", DataTypes.STRING())); + Map<String, String> options = new HashMap<>(); + if (configParallelism != null) { + options.put(SINK_PARALLELISM.key(), configParallelism.toString()); + } + + Context context = + new FactoryUtil.DefaultDynamicTableContext( + ObjectIdentifier.of("default", "default", "t1"), + new ResolvedCatalogTable( + CatalogTable.of( + Schema.newBuilder().fromResolvedSchema(schema).build(), + "mock context", + Collections.emptyList(), + options), + schema), + Collections.emptyMap(), + new Configuration(), + Thread.currentThread().getContextClassLoader(), + false); + DynamicTableSink tableSink = new TableStoreFactory().createDynamicTableSink(context); + assertThat(tableSink).isInstanceOf(TableStoreSink.class); + + // 2. get sink provider + DynamicTableSink.SinkRuntimeProvider provider = + tableSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false)); + assertThat(provider).isInstanceOf(DataStreamSinkProvider.class); + DataStreamSinkProvider sinkProvider = (DataStreamSinkProvider) provider; + + // 3. assert parallelism from transformation + DataStream<RowData> mockSource = + env.fromCollection(Collections.singletonList(GenericRowData.of())); + DataStreamSink<?> sink = sinkProvider.consumeDataStream(null, mockSource); + Transformation<?> transformation = sink.getTransformation(); + // until a PartitionTransformation, see TableStore.SinkBuilder.build() + while (!(transformation instanceof PartitionTransformation)) { + assertThat(transformation.getParallelism()).isIn(1, expectedParallelism); + transformation = transformation.getInputs().get(0); + } + } } diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java index a58f517..07bd058 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java @@ -134,7 +134,7 @@ public class LogStoreSinkITCase extends KafkaTableTestBase { env.execute(); // read - List<Row> results = executeAndCollect(store.sourceBuilder().build(env)); + List<Row> results = executeAndCollect(store.sourceBuilder().withEnv(env).build()); Row[] expected; if (hasPk) { @@ -164,7 +164,8 @@ public class LogStoreSinkITCase extends KafkaTableTestBase { store.sourceBuilder() .withContinuousMode(true) .withLogSourceProvider(sourceProvider) - .build(buildStreamEnv()) + .withEnv(buildStreamEnv()) + .build() .executeAndCollect(), CONVERTER::toExternal); results = iterator.collectAndClose(expected.length);
