This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 118d5cf [FLINK-26911] Introduce parallelism setter for table store
118d5cf is described below
commit 118d5cf61f3086afc1885bf6880dc78162cc8ba7
Author: Jingsong Lee <[email protected]>
AuthorDate: Sat Apr 2 14:27:09 2022 +0800
[FLINK-26911] Introduce parallelism setter for table store
This closes #67
---
.../flink/table/store/connector/TableStore.java | 61 +++++++--
.../table/store/connector/TableStoreFactory.java | 1 +
.../store/connector/TableStoreFactoryOptions.java | 23 ++++
.../table/store/connector/sink/TableStoreSink.java | 2 +
.../store/connector/source/TableStoreSource.java | 27 +++-
.../table/store/connector/FileStoreITCase.java | 16 +--
.../store/connector/ReadWriteTableITCase.java | 140 ++++++++++++++++++---
.../store/connector/sink/LogStoreSinkITCase.java | 5 +-
8 files changed, 232 insertions(+), 43 deletions(-)
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
index d222018..88826f0 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
@@ -148,6 +148,10 @@ public class TableStore {
return primaryKeyType.getFieldNames();
}
+ public Configuration options() {
+ return options;
+ }
+
public Configuration logOptions() {
return new DelegatingConfiguration(options, LOG_PREFIX);
}
@@ -227,6 +231,8 @@ public class TableStore {
private boolean isContinuous = false;
+ private StreamExecutionEnvironment env;
+
@Nullable private int[][] projectedFields;
@Nullable private Predicate partitionPredicate;
@@ -235,6 +241,13 @@ public class TableStore {
@Nullable private LogSourceProvider logSourceProvider;
+ @Nullable private Integer parallelism;
+
+ public SourceBuilder withEnv(StreamExecutionEnvironment env) {
+ this.env = env;
+ return this;
+ }
+
public SourceBuilder withProjection(int[][] projectedFields) {
this.projectedFields = projectedFields;
return this;
@@ -260,6 +273,11 @@ public class TableStore {
return this;
}
+ public SourceBuilder withParallelism(@Nullable Integer parallelism) {
+ this.parallelism = parallelism;
+ return this;
+ }
+
private long discoveryIntervalMills() {
return options.get(CONTINUOUS_DISCOVERY_INTERVAL).toMillis();
}
@@ -277,7 +295,7 @@ public class TableStore {
fieldPredicate);
}
- public Source<RowData, ?, ?> build() {
+ private Source<RowData, ?, ?> buildSource() {
if (isContinuous) {
LogStartupMode startupMode = logOptions().get(SCAN);
if (logSourceProvider == null) {
@@ -298,17 +316,27 @@ public class TableStore {
}
}
- public DataStreamSource<RowData> build(StreamExecutionEnvironment env)
{
+ public DataStreamSource<RowData> build() {
+ if (env == null) {
+ throw new IllegalArgumentException(
+ "StreamExecutionEnvironment should not be null.");
+ }
+
LogicalType produceType =
Optional.ofNullable(projectedFields)
.map(Projection::of)
.map(p -> p.project(type))
.orElse(type);
- return env.fromSource(
- build(),
- WatermarkStrategy.noWatermarks(),
- tableIdentifier.asSummaryString(),
- InternalTypeInfo.of(produceType));
+ DataStreamSource<RowData> dataStream =
+ env.fromSource(
+ buildSource(),
+ WatermarkStrategy.noWatermarks(),
+ tableIdentifier.asSummaryString(),
+ InternalTypeInfo.of(produceType));
+ if (parallelism != null) {
+ dataStream.setParallelism(parallelism);
+ }
+ return dataStream;
}
}
@@ -323,6 +351,8 @@ public class TableStore {
@Nullable private LogSinkProvider logSinkProvider;
+ @Nullable private Integer parallelism;
+
public SinkBuilder withInput(DataStream<RowData> input) {
this.input = input;
return this;
@@ -343,6 +373,11 @@ public class TableStore {
return this;
}
+ public SinkBuilder withParallelism(@Nullable Integer parallelism) {
+ this.parallelism = parallelism;
+ return this;
+ }
+
public DataStreamSink<?> build() {
FileStore fileStore = buildFileStore();
int numBucket = options.get(BUCKET);
@@ -350,10 +385,11 @@ public class TableStore {
BucketStreamPartitioner partitioner =
new BucketStreamPartitioner(
numBucket, type, partitions, primaryKeys,
logPrimaryKeys);
- DataStream<RowData> partitioned =
- new DataStream<>(
- input.getExecutionEnvironment(),
- new
PartitionTransformation<>(input.getTransformation(), partitioner));
+ PartitionTransformation<RowData> partitioned =
+ new PartitionTransformation<>(input.getTransformation(),
partitioner);
+ if (parallelism != null) {
+ partitioned.setParallelism(parallelism);
+ }
StoreSink<?, ?> sink =
new StoreSink<>(
@@ -366,7 +402,8 @@ public class TableStore {
lockFactory,
overwritePartition,
logSinkProvider);
- return GlobalCommittingSinkTranslator.translate(partitioned, sink);
+ return GlobalCommittingSinkTranslator.translate(
+ new DataStream<>(input.getExecutionEnvironment(),
partitioned), sink);
}
}
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
index 4cd73a8..78cba33 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
@@ -177,6 +177,7 @@ public class TableStoreFactory
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = FileStoreOptions.allOptions();
options.addAll(MergeTreeOptions.allOptions());
+ options.addAll(TableStoreFactoryOptions.allOptions());
return options;
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
index 8965370..4337ec5 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
@@ -20,6 +20,10 @@ package org.apache.flink.table.store.connector;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.HashSet;
+import java.util.Set;
/** Options for {@link TableStoreFactory}. */
public class TableStoreFactoryOptions {
@@ -29,4 +33,23 @@ public class TableStoreFactoryOptions {
.stringType()
.noDefaultValue()
.withDescription("The log system used to keep changes of
the table.");
+
+ public static final ConfigOption<Integer> SINK_PARALLELISM =
FactoryUtil.SINK_PARALLELISM;
+
+ public static final ConfigOption<Integer> SCAN_PARALLELISM =
+ ConfigOptions.key("scan.parallelism")
+ .intType()
+ .noDefaultValue()
+ .withDescription(
+ "Defines a custom parallelism for the scan source.
"
+ + "By default, if this option is not
defined, the planner will derive the parallelism "
+ + "for each statement individually by also
considering the global configuration.");
+
+ public static Set<ConfigOption<?>> allOptions() {
+ Set<ConfigOption<?>> allOptions = new HashSet<>();
+ allOptions.add(LOG_SYSTEM);
+ allOptions.add(SINK_PARALLELISM);
+ allOptions.add(SCAN_PARALLELISM);
+ return allOptions;
+ }
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
index eba324d..962aa0f 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
@@ -42,6 +42,7 @@ import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;
+import static
org.apache.flink.table.store.connector.TableStoreFactoryOptions.SINK_PARALLELISM;
import static org.apache.flink.table.store.log.LogOptions.CHANGELOG_MODE;
/** Table sink to create {@link StoreSink}. */
@@ -138,6 +139,7 @@ public class TableStoreSink
.withLockFactory(lockFactory)
.withLogSinkProvider(finalLogSinkProvider)
.withOverwritePartition(overwrite ?
staticPartitions : null)
+
.withParallelism(tableStore.options().get(SINK_PARALLELISM))
.build();
}
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
index e1681fe..6071c64 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
@@ -20,12 +20,16 @@ package org.apache.flink.table.store.connector.source;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.ProviderContext;
+import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
-import org.apache.flink.table.connector.source.SourceProvider;
import
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ExpressionVisitor;
@@ -47,6 +51,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
+import static
org.apache.flink.table.store.connector.TableStoreFactoryOptions.SCAN_PARALLELISM;
import static org.apache.flink.table.store.log.LogOptions.CHANGELOG_MODE;
import static org.apache.flink.table.store.log.LogOptions.CONSISTENCY;
import static org.apache.flink.table.store.log.LogOptions.LogChangelogMode.ALL;
@@ -130,15 +135,29 @@ public class TableStoreSource
},
projectFields);
}
- TableStore.SourceBuilder builder =
+
+ TableStore.SourceBuilder sourceBuilder =
tableStore
.sourceBuilder()
.withContinuousMode(streaming)
.withLogSourceProvider(logSourceProvider)
.withProjection(projectFields)
.withPartitionPredicate(PredicateConverter.convert(partitionFilters))
-
.withFieldPredicate(PredicateConverter.convert(fieldFilters));
- return SourceProvider.of(builder.build());
+
.withFieldPredicate(PredicateConverter.convert(fieldFilters))
+
.withParallelism(tableStore.options().get(SCAN_PARALLELISM));
+
+ return new DataStreamScanProvider() {
+ @Override
+ public DataStream<RowData> produceDataStream(
+ ProviderContext providerContext,
StreamExecutionEnvironment env) {
+ return sourceBuilder.withEnv(env).build();
+ }
+
+ @Override
+ public boolean isBounded() {
+ return !streaming;
+ }
+ };
}
@Override
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
index 07bc759..645e208 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
@@ -126,7 +126,7 @@ public class FileStoreITCase extends AbstractTestBase {
env.execute();
// read
- List<Row> results =
executeAndCollect(store.sourceBuilder().build(env));
+ List<Row> results =
executeAndCollect(store.sourceBuilder().withEnv(env).build());
// assert
Row[] expected =
@@ -145,7 +145,7 @@ public class FileStoreITCase extends AbstractTestBase {
env.execute();
// read
- List<Row> results =
executeAndCollect(store.sourceBuilder().build(env));
+ List<Row> results =
executeAndCollect(store.sourceBuilder().withEnv(env).build());
// assert
Row[] expected = new Row[] {Row.of(5, "p2", 1), Row.of(0, "p1", 2),
Row.of(3, "p2", 5)};
@@ -173,7 +173,7 @@ public class FileStoreITCase extends AbstractTestBase {
env.execute();
// read
- List<Row> results =
executeAndCollect(store.sourceBuilder().build(env));
+ List<Row> results =
executeAndCollect(store.sourceBuilder().withEnv(env).build());
Row[] expected = new Row[] {Row.of(9, "p2", 5), Row.of(5, "p1", 1),
Row.of(0, "p1", 2)};
assertThat(results).containsExactlyInAnyOrder(expected);
@@ -188,7 +188,7 @@ public class FileStoreITCase extends AbstractTestBase {
env.execute();
// read
- results = executeAndCollect(store.sourceBuilder().build(env));
+ results =
executeAndCollect(store.sourceBuilder().withEnv(env).build());
expected = new Row[] {Row.of(19, "p2", 6)};
assertThat(results).containsExactlyInAnyOrder(expected);
}
@@ -202,7 +202,7 @@ public class FileStoreITCase extends AbstractTestBase {
env.execute();
// read
- List<Row> results =
executeAndCollect(store.sourceBuilder().build(env));
+ List<Row> results =
executeAndCollect(store.sourceBuilder().withEnv(env).build());
// assert
// in streaming mode, expect origin data X 2 (FiniteTestSource)
@@ -242,7 +242,8 @@ public class FileStoreITCase extends AbstractTestBase {
executeAndCollect(
store.sourceBuilder()
.withProjection(projection.toNestedIndexes())
- .build(env),
+ .withEnv(env)
+ .build(),
converter);
// assert
@@ -281,7 +282,8 @@ public class FileStoreITCase extends AbstractTestBase {
BlockingIterator.of(
store.sourceBuilder()
.withContinuousMode(true)
- .build(env)
+ .withEnv(env)
+ .build()
.executeAndCollect(),
CONVERTER::toExternal);
Thread.sleep(ThreadLocalRandom.current().nextInt(1000));
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
index 6ea0367..88b608f 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
@@ -19,14 +19,33 @@
package org.apache.flink.table.store.connector;
import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableFactory.Context;
+import org.apache.flink.table.factories.FactoryUtil;
+import
org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+import org.apache.flink.table.store.connector.sink.TableStoreSink;
import org.apache.flink.table.store.file.FileStoreOptions;
import org.apache.flink.table.store.file.utils.BlockingIterator;
import org.apache.flink.table.store.kafka.KafkaTableTestBase;
@@ -46,11 +65,14 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
import static
org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData;
import static
org.apache.flink.table.store.connector.TableStoreFactoryOptions.LOG_SYSTEM;
+import static
org.apache.flink.table.store.connector.TableStoreFactoryOptions.SCAN_PARALLELISM;
+import static
org.apache.flink.table.store.connector.TableStoreFactoryOptions.SINK_PARALLELISM;
import static
org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
import static org.assertj.core.api.Assertions.assertThat;
@@ -1262,17 +1284,12 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
.close();
}
- private Tuple2<String, BlockingIterator<Row, Row>>
collectAndCheckUnderSameEnv(
+ private Tuple2<String, String> createSourceAndManagedTable(
boolean streaming,
boolean enableLogStore,
boolean insertOnly,
boolean partitioned,
- boolean hasPk,
- boolean writeFirst,
- Map<String, String> readHints,
- @Nullable String filter,
- List<String> projection,
- List<Row> expected)
+ boolean hasPk)
throws Exception {
Map<String, String> tableOptions = new HashMap<>();
rootPath = TEMPORARY_FOLDER.newFolder().getPath();
@@ -1305,6 +1322,26 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
tEnv = StreamTableEnvironment.create(env, builder.build());
tEnv.executeSql(helperTableDdl);
tEnv.executeSql(managedTableDdl);
+ return new Tuple2<>(sourceTable, managedTable);
+ }
+
+ private Tuple2<String, BlockingIterator<Row, Row>>
collectAndCheckUnderSameEnv(
+ boolean streaming,
+ boolean enableLogStore,
+ boolean insertOnly,
+ boolean partitioned,
+ boolean hasPk,
+ boolean writeFirst,
+ Map<String, String> readHints,
+ @Nullable String filter,
+ List<String> projection,
+ List<Row> expected)
+ throws Exception {
+ Tuple2<String, String> tables =
+ createSourceAndManagedTable(
+ streaming, enableLogStore, insertOnly, partitioned,
hasPk);
+ String sourceTable = tables.f0;
+ String managedTable = tables.f1;
String insertQuery = prepareInsertIntoQuery(sourceTable, managedTable);
String selectQuery = prepareSelectQuery(managedTable, readHints,
filter, projection);
@@ -1516,17 +1553,15 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
private static String buildHints(Map<String, String> hints) {
if (hints.size() > 0) {
- StringBuilder hintsBuilder = new StringBuilder("/*+ OPTIONS (");
- hints.forEach(
- (k, v) -> {
- hintsBuilder.append(String.format("'%s'", k));
- hintsBuilder.append(" = ");
- hintsBuilder.append(String.format("'%s', ", v));
- });
- int len = hintsBuilder.length();
- hintsBuilder.deleteCharAt(len - 2);
- hintsBuilder.append(") */");
- return hintsBuilder.toString();
+ String hintString =
+ hints.entrySet().stream()
+ .map(
+ entry ->
+ String.format(
+ "'%s' = '%s'",
+ entry.getKey(),
entry.getValue()))
+ .collect(Collectors.joining(", "));
+ return "/*+ OPTIONS (" + hintString + ") */";
}
return "";
}
@@ -1713,4 +1748,73 @@ public class ReadWriteTableITCase extends
KafkaTableTestBase {
env.setParallelism(2);
return env;
}
+
+ @Test
+ public void testSourceParallelism() throws Exception {
+ String managedTable = createSourceAndManagedTable(false, false, false,
false, false).f1;
+
+ // without hint
+ String query = prepareSimpleSelectQuery(managedTable,
Collections.emptyMap());
+ assertThat(sourceParallelism(query)).isEqualTo(env.getParallelism());
+
+ // with hint
+ query =
+ prepareSimpleSelectQuery(
+ managedTable,
Collections.singletonMap(SCAN_PARALLELISM.key(), "66"));
+ assertThat(sourceParallelism(query)).isEqualTo(66);
+ }
+
+ private int sourceParallelism(String sql) {
+ DataStream<Row> stream = tEnv.toChangelogStream(tEnv.sqlQuery(sql));
+ return stream.getParallelism();
+ }
+
+ @Test
+ public void testSinkParallelism() {
+ testSinkParallelism(null, env.getParallelism());
+ testSinkParallelism(23, 23);
+ }
+
+ private void testSinkParallelism(Integer configParallelism, int
expectedParallelism) {
+ // 1. create a mock table sink
+ ResolvedSchema schema = ResolvedSchema.of(Column.physical("a",
DataTypes.STRING()));
+ Map<String, String> options = new HashMap<>();
+ if (configParallelism != null) {
+ options.put(SINK_PARALLELISM.key(), configParallelism.toString());
+ }
+
+ Context context =
+ new FactoryUtil.DefaultDynamicTableContext(
+ ObjectIdentifier.of("default", "default", "t1"),
+ new ResolvedCatalogTable(
+ CatalogTable.of(
+
Schema.newBuilder().fromResolvedSchema(schema).build(),
+ "mock context",
+ Collections.emptyList(),
+ options),
+ schema),
+ Collections.emptyMap(),
+ new Configuration(),
+ Thread.currentThread().getContextClassLoader(),
+ false);
+ DynamicTableSink tableSink = new
TableStoreFactory().createDynamicTableSink(context);
+ assertThat(tableSink).isInstanceOf(TableStoreSink.class);
+
+ // 2. get sink provider
+ DynamicTableSink.SinkRuntimeProvider provider =
+ tableSink.getSinkRuntimeProvider(new
SinkRuntimeProviderContext(false));
+ assertThat(provider).isInstanceOf(DataStreamSinkProvider.class);
+ DataStreamSinkProvider sinkProvider = (DataStreamSinkProvider)
provider;
+
+ // 3. assert parallelism from transformation
+ DataStream<RowData> mockSource =
+
env.fromCollection(Collections.singletonList(GenericRowData.of()));
+ DataStreamSink<?> sink = sinkProvider.consumeDataStream(null,
mockSource);
+ Transformation<?> transformation = sink.getTransformation();
+ // until a PartitionTransformation, see TableStore.SinkBuilder.build()
+ while (!(transformation instanceof PartitionTransformation)) {
+ assertThat(transformation.getParallelism()).isIn(1,
expectedParallelism);
+ transformation = transformation.getInputs().get(0);
+ }
+ }
}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
index a58f517..07bd058 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
@@ -134,7 +134,7 @@ public class LogStoreSinkITCase extends KafkaTableTestBase {
env.execute();
// read
- List<Row> results =
executeAndCollect(store.sourceBuilder().build(env));
+ List<Row> results =
executeAndCollect(store.sourceBuilder().withEnv(env).build());
Row[] expected;
if (hasPk) {
@@ -164,7 +164,8 @@ public class LogStoreSinkITCase extends KafkaTableTestBase {
store.sourceBuilder()
.withContinuousMode(true)
.withLogSourceProvider(sourceProvider)
- .build(buildStreamEnv())
+ .withEnv(buildStreamEnv())
+ .build()
.executeAndCollect(),
CONVERTER::toExternal);
results = iterator.collectAndClose(expected.length);