wuchong commented on a change in pull request #13902:
URL: https://github.com/apache/flink/pull/13902#discussion_r517071743
##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -184,6 +184,18 @@ Connector Options
<td>String</td>
<td>Defines the delivery semantic for the Kafka sink. Valid
enumerationns are <code>'at-lease-once'</code>, <code>'exactly-once'</code> and
<code>'none'</code>. See <a href='#consistency-guarantees'>Consistency
guarantees</a> for more details. </td>
</tr>
+ <tr>
+ <td><h5>sink.parallelism</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Integer</td>
+ <td>Defines the parallelism for the Kafka sink. If not specified, the
parallelism are
+ <ul>
+ <li><code>Chained</code>: Use upstream parallelism.</li>
+ <li><code>Non-Chained</code>: Use global parallelism setting.</li>
+ </ul>
Review comment:
I think it's confusing what is "parallelism are chained.".
What do you think about:
> Defines the parallelism of the Kafka sink operator. By default, the
parallelism is determined by the framework using the same parallelism of the
upstream chained operator.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
##########
@@ -145,7 +150,17 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context
context) {
final FlinkKafkaProducer<RowData> kafkaProducer =
createKafkaProducer(keySerialization,
valueSerialization);
- return SinkFunctionProvider.of(kafkaProducer);
+ return new SinkFunctionProvider() {
Review comment:
Could you add a new util method in `SinkFunctionProvider`? I think most
of the connector can use this new method.
```java
/**
* Helper method for creating a SinkFunction provider with a provided
sink parallelism.
*/
static SinkFunctionProvider of(SinkFunction<RowData> sinkFunction,
Integer sinkParallelism) {
return new SinkFunctionProvider() {
@Override
public SinkFunction<RowData> createSinkFunction() {
return sinkFunction;
}
@Override
public Optional<Integer> getParallelism() {
return Optional.ofNullable(sinkParallelism);
}
};
}
```
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
##########
@@ -282,6 +288,20 @@ public int hashCode() {
return format.createRuntimeEncoder(context,
physicalFormatDataType);
}
+ /**
+ * Returns the parallelism for this instance.
+ *
+ * <p>The parallelism denotes how many parallel instances of a source
or sink will be spawned
+ * during the execution.
+ *
+ * @return empty if the connector does not provide a custom
parallelism, then the planner will
+ * decide the number of parallel instances by itself.
+ */
+ @Override
+ public Optional<Integer> getParallelism() {
+ return parallelism != null ? Optional.of(parallelism) :
Optional.empty();
Review comment:
Can simplify to `Optional.ofNullable(parallelism)`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]