wuchong commented on a change in pull request #13902:
URL: https://github.com/apache/flink/pull/13902#discussion_r517071743



##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -184,6 +184,18 @@ Connector Options
       <td>String</td>
       <td>Defines the delivery semantic for the Kafka sink. Valid 
enumerationns are <code>'at-lease-once'</code>, <code>'exactly-once'</code> and 
<code>'none'</code>. See <a href='#consistency-guarantees'>Consistency 
guarantees</a> for more details. </td>
     </tr>
+    <tr>
+      <td><h5>sink.parallelism</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>Integer</td>
+      <td>Defines the parallelism for the Kafka sink. If not specified, the 
parallelism are
+      <ul>
+        <li><code>Chained</code>: Use upstream parallelism.</li>
+        <li><code>Non-Chained</code>: Use global parallelism setting.</li>
+      </ul>

Review comment:
       I think it's confusing what is "parallelism are chained.". 
   
   What do you think about:
   
   > Defines the parallelism of the Kafka sink operator. By default, the 
parallelism is determined by the framework using the same parallelism of the 
upstream chained operator. 
   
   
   

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
##########
@@ -145,7 +150,17 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context 
context) {
                final FlinkKafkaProducer<RowData> kafkaProducer =
                                createKafkaProducer(keySerialization, 
valueSerialization);
 
-               return SinkFunctionProvider.of(kafkaProducer);
+               return new SinkFunctionProvider() {

Review comment:
       Could you add a new util method in `SinkFunctionProvider`? I think most 
of the connector can use this new method. 
   
   ```java
        /**
         * Helper method for creating a SinkFunction provider with a provided 
sink parallelism.
         */
        static SinkFunctionProvider of(SinkFunction<RowData> sinkFunction, 
Integer sinkParallelism) {
                return new SinkFunctionProvider() {
   
                        @Override
                        public SinkFunction<RowData> createSinkFunction() {
                                return sinkFunction;
                        }
   
                        @Override
                        public Optional<Integer> getParallelism() {
                                return Optional.ofNullable(sinkParallelism);
                        }
                };
        }
   ```
   
   

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
##########
@@ -282,6 +288,20 @@ public int hashCode() {
                return format.createRuntimeEncoder(context, 
physicalFormatDataType);
        }
 
+       /**
+        * Returns the parallelism for this instance.
+        *
+        * <p>The parallelism denotes how many parallel instances of a source 
or sink will be spawned
+        * during the execution.
+        *
+        * @return empty if the connector does not provide a custom 
parallelism, then the planner will
+        * decide the number of parallel instances by itself.
+        */
+       @Override
+       public Optional<Integer> getParallelism() {
+               return parallelism != null ? Optional.of(parallelism) : 
Optional.empty();

Review comment:
       Can simplify to `Optional.ofNullable(parallelism)`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to