RocMarshal commented on code in PR #134:
URL: 
https://github.com/apache/flink-connector-kafka/pull/134#discussion_r1863022748


##########
docs/content.zh/docs/connectors/table/upsert-kafka.md:
##########
@@ -180,6 +180,14 @@ of all available metadata fields.
        </ul>
        </td>
     </tr>
+    <tr>
+      <td><h5>scan.parallelism</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Integer</td>
+      <td>定义 upsert-kafka source 算子的并行度。默认情况下会使用全局默认并发。</td>

Review Comment:
   ```suggestion
         <td>定义 upsert-kafka source 算子的并行度。默认情况下会使用全局默认并行度。</td>
   ```



##########
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java:
##########
@@ -212,14 +212,56 @@ public void testTableSource() {
                         KAFKA_SOURCE_PROPERTIES,
                         StartupMode.SPECIFIC_OFFSETS,
                         specificOffsets,
-                        0);
+                        0,
+                        null);
         assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource);
 
         ScanTableSource.ScanRuntimeProvider provider =
                 
actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
         assertKafkaSource(provider);
     }
 
+    @Test
+    public void testTableSourceWithParallelism() {
+        final Map<String, String> modifiedOptions =
+                getModifiedOptions(
+                        getBasicSourceOptions(), options -> 
options.put("scan.parallelism", "100"));
+        final DynamicTableSource actualSource = createTableSource(SCHEMA, 
modifiedOptions);
+        final KafkaDynamicSource actualKafkaSource = (KafkaDynamicSource) 
actualSource;
+
+        final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
+        specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_0), 
OFFSET_0);
+        specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_1), 
OFFSET_1);
+
+        final DecodingFormat<DeserializationSchema<RowData>> 
valueDecodingFormat =
+                new DecodingFormatMock(",", true);
+
+        // Test scan source equals
+        final KafkaDynamicSource expectedKafkaSource =
+                createExpectedScanSource(
+                        SCHEMA_DATA_TYPE,
+                        null,
+                        valueDecodingFormat,
+                        new int[0],
+                        new int[] {0, 1, 2},
+                        null,
+                        Collections.singletonList(TOPIC),
+                        null,
+                        KAFKA_SOURCE_PROPERTIES,
+                        StartupMode.SPECIFIC_OFFSETS,
+                        specificOffsets,
+                        0,
+                        100);
+        assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource);
+
+        ScanTableSource.ScanRuntimeProvider provider =
+                
actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+        assertThat(provider).isInstanceOf(DataStreamScanProvider.class);
+        final DataStreamScanProvider sourceProvider = (DataStreamScanProvider) 
provider;
+        assertThat(sourceProvider.getParallelism().isPresent()).isTrue();
+        assertThat(sourceProvider.getParallelism().get()).isEqualTo(100);

Review Comment:
   ```suggestion
           assertThat(sourceProvider.getParallelism());
           assertThat(sourceProvider.getParallelism()).hasValue(100);
   ```



##########
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java:
##########
@@ -177,6 +180,37 @@ public void testTableSource() {
         assertKafkaSource(provider);
     }
 
+    @Test
+    public void testTableSourceWithParallelism() {
+        final DataType producedDataType = 
SOURCE_SCHEMA.toPhysicalRowDataType();
+        // Construct table source using options and table source factory
+        final Map<String, String> modifiedOptions =
+                getModifiedOptions(
+                        getFullSourceOptions(), options -> 
options.put("scan.parallelism", "100"));

Review Comment:
   ditto



##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java:
##########
@@ -150,6 +152,8 @@ public DynamicTableSource createDynamicTableSource(Context 
context) {
 
         final BoundedOptions boundedOptions = getBoundedOptions(tableOptions);
 
+        Integer parallelism = tableOptions.get(SCAN_PARALLELISM);

Review Comment:
   A trivial comment, I'm not sure if it's appropriate:
   what about inlining the line into line 174  ?



##########
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java:
##########
@@ -212,14 +212,56 @@ public void testTableSource() {
                         KAFKA_SOURCE_PROPERTIES,
                         StartupMode.SPECIFIC_OFFSETS,
                         specificOffsets,
-                        0);
+                        0,
+                        null);
         assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource);
 
         ScanTableSource.ScanRuntimeProvider provider =
                 
actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
         assertKafkaSource(provider);
     }
 
+    @Test
+    public void testTableSourceWithParallelism() {
+        final Map<String, String> modifiedOptions =
+                getModifiedOptions(
+                        getBasicSourceOptions(), options -> 
options.put("scan.parallelism", "100"));

Review Comment:
   what about replace the string 'scan.parallelism' with 
SOURCE_PARALLELISM.key() ?



##########
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java:
##########
@@ -177,6 +180,37 @@ public void testTableSource() {
         assertKafkaSource(provider);
     }
 
+    @Test
+    public void testTableSourceWithParallelism() {
+        final DataType producedDataType = 
SOURCE_SCHEMA.toPhysicalRowDataType();
+        // Construct table source using options and table source factory
+        final Map<String, String> modifiedOptions =
+                getModifiedOptions(
+                        getFullSourceOptions(), options -> 
options.put("scan.parallelism", "100"));
+        final DynamicTableSource actualSource = 
createTableSource(SOURCE_SCHEMA, modifiedOptions);
+
+        final KafkaDynamicSource expectedSource =
+                createExpectedScanSource(
+                        producedDataType,
+                        keyDecodingFormat,
+                        valueDecodingFormat,
+                        SOURCE_KEY_FIELDS,
+                        SOURCE_VALUE_FIELDS,
+                        null,
+                        Collections.singletonList(SOURCE_TOPIC),
+                        UPSERT_KAFKA_SOURCE_PROPERTIES,
+                        100);
+        assertThat(actualSource).isEqualTo(expectedSource);
+
+        final KafkaDynamicSource actualUpsertKafkaSource = 
(KafkaDynamicSource) actualSource;
+        ScanTableSource.ScanRuntimeProvider provider =
+                
actualUpsertKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
+        assertThat(provider).isInstanceOf(DataStreamScanProvider.class);
+        final DataStreamScanProvider sourceProvider = (DataStreamScanProvider) 
provider;
+        assertThat(sourceProvider.getParallelism().isPresent()).isTrue();
+        assertThat(sourceProvider.getParallelism().get()).isEqualTo(100);

Review Comment:
   ditto~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to