This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.1
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit 621d54a7accafe82dee51a0c0fa17d9724f856d7
Author: Jingsong Lee <[email protected]>
AuthorDate: Sat Apr 2 14:27:09 2022 +0800

    [FLINK-26911] Introduce parallelism setter for table store
    
    This closes #67
---
 .../flink/table/store/connector/TableStore.java    |  61 +++++++--
 .../table/store/connector/TableStoreFactory.java   |   1 +
 .../store/connector/TableStoreFactoryOptions.java  |  23 ++++
 .../table/store/connector/sink/TableStoreSink.java |   2 +
 .../store/connector/source/TableStoreSource.java   |  27 +++-
 .../table/store/connector/FileStoreITCase.java     |  16 +--
 .../store/connector/ReadWriteTableITCase.java      | 140 ++++++++++++++++++---
 .../store/connector/sink/LogStoreSinkITCase.java   |   5 +-
 8 files changed, 232 insertions(+), 43 deletions(-)

diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
index d222018..88826f0 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
@@ -148,6 +148,10 @@ public class TableStore {
         return primaryKeyType.getFieldNames();
     }
 
+    public Configuration options() {
+        return options;
+    }
+
     public Configuration logOptions() {
         return new DelegatingConfiguration(options, LOG_PREFIX);
     }
@@ -227,6 +231,8 @@ public class TableStore {
 
         private boolean isContinuous = false;
 
+        private StreamExecutionEnvironment env;
+
         @Nullable private int[][] projectedFields;
 
         @Nullable private Predicate partitionPredicate;
@@ -235,6 +241,13 @@ public class TableStore {
 
         @Nullable private LogSourceProvider logSourceProvider;
 
+        @Nullable private Integer parallelism;
+
+        public SourceBuilder withEnv(StreamExecutionEnvironment env) {
+            this.env = env;
+            return this;
+        }
+
         public SourceBuilder withProjection(int[][] projectedFields) {
             this.projectedFields = projectedFields;
             return this;
@@ -260,6 +273,11 @@ public class TableStore {
             return this;
         }
 
+        public SourceBuilder withParallelism(@Nullable Integer parallelism) {
+            this.parallelism = parallelism;
+            return this;
+        }
+
         private long discoveryIntervalMills() {
             return options.get(CONTINUOUS_DISCOVERY_INTERVAL).toMillis();
         }
@@ -277,7 +295,7 @@ public class TableStore {
                     fieldPredicate);
         }
 
-        public Source<RowData, ?, ?> build() {
+        private Source<RowData, ?, ?> buildSource() {
             if (isContinuous) {
                 LogStartupMode startupMode = logOptions().get(SCAN);
                 if (logSourceProvider == null) {
@@ -298,17 +316,27 @@ public class TableStore {
             }
         }
 
-        public DataStreamSource<RowData> build(StreamExecutionEnvironment env) 
{
+        public DataStreamSource<RowData> build() {
+            if (env == null) {
+                throw new IllegalArgumentException(
+                        "StreamExecutionEnvironment should not be null.");
+            }
+
             LogicalType produceType =
                     Optional.ofNullable(projectedFields)
                             .map(Projection::of)
                             .map(p -> p.project(type))
                             .orElse(type);
-            return env.fromSource(
-                    build(),
-                    WatermarkStrategy.noWatermarks(),
-                    tableIdentifier.asSummaryString(),
-                    InternalTypeInfo.of(produceType));
+            DataStreamSource<RowData> dataStream =
+                    env.fromSource(
+                            buildSource(),
+                            WatermarkStrategy.noWatermarks(),
+                            tableIdentifier.asSummaryString(),
+                            InternalTypeInfo.of(produceType));
+            if (parallelism != null) {
+                dataStream.setParallelism(parallelism);
+            }
+            return dataStream;
         }
     }
 
@@ -323,6 +351,8 @@ public class TableStore {
 
         @Nullable private LogSinkProvider logSinkProvider;
 
+        @Nullable private Integer parallelism;
+
         public SinkBuilder withInput(DataStream<RowData> input) {
             this.input = input;
             return this;
@@ -343,6 +373,11 @@ public class TableStore {
             return this;
         }
 
+        public SinkBuilder withParallelism(@Nullable Integer parallelism) {
+            this.parallelism = parallelism;
+            return this;
+        }
+
         public DataStreamSink<?> build() {
             FileStore fileStore = buildFileStore();
             int numBucket = options.get(BUCKET);
@@ -350,10 +385,11 @@ public class TableStore {
             BucketStreamPartitioner partitioner =
                     new BucketStreamPartitioner(
                             numBucket, type, partitions, primaryKeys, 
logPrimaryKeys);
-            DataStream<RowData> partitioned =
-                    new DataStream<>(
-                            input.getExecutionEnvironment(),
-                            new 
PartitionTransformation<>(input.getTransformation(), partitioner));
+            PartitionTransformation<RowData> partitioned =
+                    new PartitionTransformation<>(input.getTransformation(), 
partitioner);
+            if (parallelism != null) {
+                partitioned.setParallelism(parallelism);
+            }
 
             StoreSink<?, ?> sink =
                     new StoreSink<>(
@@ -366,7 +402,8 @@ public class TableStore {
                             lockFactory,
                             overwritePartition,
                             logSinkProvider);
-            return GlobalCommittingSinkTranslator.translate(partitioned, sink);
+            return GlobalCommittingSinkTranslator.translate(
+                    new DataStream<>(input.getExecutionEnvironment(), 
partitioned), sink);
         }
     }
 }
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
index 4cd73a8..78cba33 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactory.java
@@ -177,6 +177,7 @@ public class TableStoreFactory
     public Set<ConfigOption<?>> optionalOptions() {
         Set<ConfigOption<?>> options = FileStoreOptions.allOptions();
         options.addAll(MergeTreeOptions.allOptions());
+        options.addAll(TableStoreFactoryOptions.allOptions());
         return options;
     }
 
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
index 8965370..4337ec5 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStoreFactoryOptions.java
@@ -20,6 +20,10 @@ package org.apache.flink.table.store.connector;
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.HashSet;
+import java.util.Set;
 
 /** Options for {@link TableStoreFactory}. */
 public class TableStoreFactoryOptions {
@@ -29,4 +33,23 @@ public class TableStoreFactoryOptions {
                     .stringType()
                     .noDefaultValue()
                     .withDescription("The log system used to keep changes of 
the table.");
+
+    public static final ConfigOption<Integer> SINK_PARALLELISM = 
FactoryUtil.SINK_PARALLELISM;
+
+    public static final ConfigOption<Integer> SCAN_PARALLELISM =
+            ConfigOptions.key("scan.parallelism")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Defines a custom parallelism for the scan source. 
"
+                                    + "By default, if this option is not 
defined, the planner will derive the parallelism "
+                                    + "for each statement individually by also 
considering the global configuration.");
+
+    public static Set<ConfigOption<?>> allOptions() {
+        Set<ConfigOption<?>> allOptions = new HashSet<>();
+        allOptions.add(LOG_SYSTEM);
+        allOptions.add(SINK_PARALLELISM);
+        allOptions.add(SCAN_PARALLELISM);
+        return allOptions;
+    }
 }
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
index eba324d..962aa0f 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
@@ -42,6 +42,7 @@ import javax.annotation.Nullable;
 import java.util.HashMap;
 import java.util.Map;
 
+import static 
org.apache.flink.table.store.connector.TableStoreFactoryOptions.SINK_PARALLELISM;
 import static org.apache.flink.table.store.log.LogOptions.CHANGELOG_MODE;
 
 /** Table sink to create {@link StoreSink}. */
@@ -138,6 +139,7 @@ public class TableStoreSink
                                 .withLockFactory(lockFactory)
                                 .withLogSinkProvider(finalLogSinkProvider)
                                 .withOverwritePartition(overwrite ? 
staticPartitions : null)
+                                
.withParallelism(tableStore.options().get(SINK_PARALLELISM))
                                 .build();
     }
 
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
index e1681fe..6071c64 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
@@ -20,12 +20,16 @@ package org.apache.flink.table.store.connector.source;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.ProviderContext;
+import org.apache.flink.table.connector.source.DataStreamScanProvider;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
-import org.apache.flink.table.connector.source.SourceProvider;
 import 
org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
 import 
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.expressions.CallExpression;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionVisitor;
@@ -47,6 +51,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.table.store.connector.TableStoreFactoryOptions.SCAN_PARALLELISM;
 import static org.apache.flink.table.store.log.LogOptions.CHANGELOG_MODE;
 import static org.apache.flink.table.store.log.LogOptions.CONSISTENCY;
 import static org.apache.flink.table.store.log.LogOptions.LogChangelogMode.ALL;
@@ -130,15 +135,29 @@ public class TableStoreSource
                             },
                             projectFields);
         }
-        TableStore.SourceBuilder builder =
+
+        TableStore.SourceBuilder sourceBuilder =
                 tableStore
                         .sourceBuilder()
                         .withContinuousMode(streaming)
                         .withLogSourceProvider(logSourceProvider)
                         .withProjection(projectFields)
                         
.withPartitionPredicate(PredicateConverter.convert(partitionFilters))
-                        
.withFieldPredicate(PredicateConverter.convert(fieldFilters));
-        return SourceProvider.of(builder.build());
+                        
.withFieldPredicate(PredicateConverter.convert(fieldFilters))
+                        
.withParallelism(tableStore.options().get(SCAN_PARALLELISM));
+
+        return new DataStreamScanProvider() {
+            @Override
+            public DataStream<RowData> produceDataStream(
+                    ProviderContext providerContext, 
StreamExecutionEnvironment env) {
+                return sourceBuilder.withEnv(env).build();
+            }
+
+            @Override
+            public boolean isBounded() {
+                return !streaming;
+            }
+        };
     }
 
     @Override
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
index 07bc759..645e208 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreITCase.java
@@ -126,7 +126,7 @@ public class FileStoreITCase extends AbstractTestBase {
         env.execute();
 
         // read
-        List<Row> results = 
executeAndCollect(store.sourceBuilder().build(env));
+        List<Row> results = 
executeAndCollect(store.sourceBuilder().withEnv(env).build());
 
         // assert
         Row[] expected =
@@ -145,7 +145,7 @@ public class FileStoreITCase extends AbstractTestBase {
         env.execute();
 
         // read
-        List<Row> results = 
executeAndCollect(store.sourceBuilder().build(env));
+        List<Row> results = 
executeAndCollect(store.sourceBuilder().withEnv(env).build());
 
         // assert
         Row[] expected = new Row[] {Row.of(5, "p2", 1), Row.of(0, "p1", 2), 
Row.of(3, "p2", 5)};
@@ -173,7 +173,7 @@ public class FileStoreITCase extends AbstractTestBase {
         env.execute();
 
         // read
-        List<Row> results = 
executeAndCollect(store.sourceBuilder().build(env));
+        List<Row> results = 
executeAndCollect(store.sourceBuilder().withEnv(env).build());
 
         Row[] expected = new Row[] {Row.of(9, "p2", 5), Row.of(5, "p1", 1), 
Row.of(0, "p1", 2)};
         assertThat(results).containsExactlyInAnyOrder(expected);
@@ -188,7 +188,7 @@ public class FileStoreITCase extends AbstractTestBase {
         env.execute();
 
         // read
-        results = executeAndCollect(store.sourceBuilder().build(env));
+        results = 
executeAndCollect(store.sourceBuilder().withEnv(env).build());
         expected = new Row[] {Row.of(19, "p2", 6)};
         assertThat(results).containsExactlyInAnyOrder(expected);
     }
@@ -202,7 +202,7 @@ public class FileStoreITCase extends AbstractTestBase {
         env.execute();
 
         // read
-        List<Row> results = 
executeAndCollect(store.sourceBuilder().build(env));
+        List<Row> results = 
executeAndCollect(store.sourceBuilder().withEnv(env).build());
 
         // assert
         // in streaming mode, expect origin data X 2 (FiniteTestSource)
@@ -242,7 +242,8 @@ public class FileStoreITCase extends AbstractTestBase {
                 executeAndCollect(
                         store.sourceBuilder()
                                 .withProjection(projection.toNestedIndexes())
-                                .build(env),
+                                .withEnv(env)
+                                .build(),
                         converter);
 
         // assert
@@ -281,7 +282,8 @@ public class FileStoreITCase extends AbstractTestBase {
                 BlockingIterator.of(
                         store.sourceBuilder()
                                 .withContinuousMode(true)
-                                .build(env)
+                                .withEnv(env)
+                                .build()
                                 .executeAndCollect(),
                         CONVERTER::toExternal);
         Thread.sleep(ThreadLocalRandom.current().nextInt(1000));
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
index 6ea0367..88b608f 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ReadWriteTableITCase.java
@@ -19,14 +19,33 @@
 package org.apache.flink.table.store.connector;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DynamicTableFactory.Context;
+import org.apache.flink.table.factories.FactoryUtil;
+import 
org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+import org.apache.flink.table.store.connector.sink.TableStoreSink;
 import org.apache.flink.table.store.file.FileStoreOptions;
 import org.apache.flink.table.store.file.utils.BlockingIterator;
 import org.apache.flink.table.store.kafka.KafkaTableTestBase;
@@ -46,11 +65,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static 
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
 import static 
org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData;
 import static 
org.apache.flink.table.store.connector.TableStoreFactoryOptions.LOG_SYSTEM;
+import static 
org.apache.flink.table.store.connector.TableStoreFactoryOptions.SCAN_PARALLELISM;
+import static 
org.apache.flink.table.store.connector.TableStoreFactoryOptions.SINK_PARALLELISM;
 import static 
org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
 import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -1262,17 +1284,12 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
                 .close();
     }
 
-    private Tuple2<String, BlockingIterator<Row, Row>> 
collectAndCheckUnderSameEnv(
+    private Tuple2<String, String> createSourceAndManagedTable(
             boolean streaming,
             boolean enableLogStore,
             boolean insertOnly,
             boolean partitioned,
-            boolean hasPk,
-            boolean writeFirst,
-            Map<String, String> readHints,
-            @Nullable String filter,
-            List<String> projection,
-            List<Row> expected)
+            boolean hasPk)
             throws Exception {
         Map<String, String> tableOptions = new HashMap<>();
         rootPath = TEMPORARY_FOLDER.newFolder().getPath();
@@ -1305,6 +1322,26 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
         tEnv = StreamTableEnvironment.create(env, builder.build());
         tEnv.executeSql(helperTableDdl);
         tEnv.executeSql(managedTableDdl);
+        return new Tuple2<>(sourceTable, managedTable);
+    }
+
+    private Tuple2<String, BlockingIterator<Row, Row>> 
collectAndCheckUnderSameEnv(
+            boolean streaming,
+            boolean enableLogStore,
+            boolean insertOnly,
+            boolean partitioned,
+            boolean hasPk,
+            boolean writeFirst,
+            Map<String, String> readHints,
+            @Nullable String filter,
+            List<String> projection,
+            List<Row> expected)
+            throws Exception {
+        Tuple2<String, String> tables =
+                createSourceAndManagedTable(
+                        streaming, enableLogStore, insertOnly, partitioned, 
hasPk);
+        String sourceTable = tables.f0;
+        String managedTable = tables.f1;
 
         String insertQuery = prepareInsertIntoQuery(sourceTable, managedTable);
         String selectQuery = prepareSelectQuery(managedTable, readHints, 
filter, projection);
@@ -1516,17 +1553,15 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
 
     private static String buildHints(Map<String, String> hints) {
         if (hints.size() > 0) {
-            StringBuilder hintsBuilder = new StringBuilder("/*+ OPTIONS (");
-            hints.forEach(
-                    (k, v) -> {
-                        hintsBuilder.append(String.format("'%s'", k));
-                        hintsBuilder.append(" = ");
-                        hintsBuilder.append(String.format("'%s', ", v));
-                    });
-            int len = hintsBuilder.length();
-            hintsBuilder.deleteCharAt(len - 2);
-            hintsBuilder.append(") */");
-            return hintsBuilder.toString();
+            String hintString =
+                    hints.entrySet().stream()
+                            .map(
+                                    entry ->
+                                            String.format(
+                                                    "'%s' = '%s'",
+                                                    entry.getKey(), 
entry.getValue()))
+                            .collect(Collectors.joining(", "));
+            return "/*+ OPTIONS (" + hintString + ") */";
         }
         return "";
     }
@@ -1713,4 +1748,73 @@ public class ReadWriteTableITCase extends 
KafkaTableTestBase {
         env.setParallelism(2);
         return env;
     }
+
+    @Test
+    public void testSourceParallelism() throws Exception {
+        String managedTable = createSourceAndManagedTable(false, false, false, 
false, false).f1;
+
+        // without hint
+        String query = prepareSimpleSelectQuery(managedTable, 
Collections.emptyMap());
+        assertThat(sourceParallelism(query)).isEqualTo(env.getParallelism());
+
+        // with hint
+        query =
+                prepareSimpleSelectQuery(
+                        managedTable, 
Collections.singletonMap(SCAN_PARALLELISM.key(), "66"));
+        assertThat(sourceParallelism(query)).isEqualTo(66);
+    }
+
+    private int sourceParallelism(String sql) {
+        DataStream<Row> stream = tEnv.toChangelogStream(tEnv.sqlQuery(sql));
+        return stream.getParallelism();
+    }
+
+    @Test
+    public void testSinkParallelism() {
+        testSinkParallelism(null, env.getParallelism());
+        testSinkParallelism(23, 23);
+    }
+
+    private void testSinkParallelism(Integer configParallelism, int 
expectedParallelism) {
+        // 1. create a mock table sink
+        ResolvedSchema schema = ResolvedSchema.of(Column.physical("a", 
DataTypes.STRING()));
+        Map<String, String> options = new HashMap<>();
+        if (configParallelism != null) {
+            options.put(SINK_PARALLELISM.key(), configParallelism.toString());
+        }
+
+        Context context =
+                new FactoryUtil.DefaultDynamicTableContext(
+                        ObjectIdentifier.of("default", "default", "t1"),
+                        new ResolvedCatalogTable(
+                                CatalogTable.of(
+                                        
Schema.newBuilder().fromResolvedSchema(schema).build(),
+                                        "mock context",
+                                        Collections.emptyList(),
+                                        options),
+                                schema),
+                        Collections.emptyMap(),
+                        new Configuration(),
+                        Thread.currentThread().getContextClassLoader(),
+                        false);
+        DynamicTableSink tableSink = new 
TableStoreFactory().createDynamicTableSink(context);
+        assertThat(tableSink).isInstanceOf(TableStoreSink.class);
+
+        // 2. get sink provider
+        DynamicTableSink.SinkRuntimeProvider provider =
+                tableSink.getSinkRuntimeProvider(new 
SinkRuntimeProviderContext(false));
+        assertThat(provider).isInstanceOf(DataStreamSinkProvider.class);
+        DataStreamSinkProvider sinkProvider = (DataStreamSinkProvider) 
provider;
+
+        // 3. assert parallelism from transformation
+        DataStream<RowData> mockSource =
+                
env.fromCollection(Collections.singletonList(GenericRowData.of()));
+        DataStreamSink<?> sink = sinkProvider.consumeDataStream(null, 
mockSource);
+        Transformation<?> transformation = sink.getTransformation();
+        // until a PartitionTransformation, see TableStore.SinkBuilder.build()
+        while (!(transformation instanceof PartitionTransformation)) {
+            assertThat(transformation.getParallelism()).isIn(1, 
expectedParallelism);
+            transformation = transformation.getInputs().get(0);
+        }
+    }
 }
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
index a58f517..07bd058 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.java
@@ -134,7 +134,7 @@ public class LogStoreSinkITCase extends KafkaTableTestBase {
             env.execute();
 
             // read
-            List<Row> results = 
executeAndCollect(store.sourceBuilder().build(env));
+            List<Row> results = 
executeAndCollect(store.sourceBuilder().withEnv(env).build());
 
             Row[] expected;
             if (hasPk) {
@@ -164,7 +164,8 @@ public class LogStoreSinkITCase extends KafkaTableTestBase {
                             store.sourceBuilder()
                                     .withContinuousMode(true)
                                     .withLogSourceProvider(sourceProvider)
-                                    .build(buildStreamEnv())
+                                    .withEnv(buildStreamEnv())
+                                    .build()
                                     .executeAndCollect(),
                             CONVERTER::toExternal);
             results = iterator.collectAndClose(expected.length);

Reply via email to