kl0u commented on a change in pull request #13678:
URL: https://github.com/apache/flink/pull/13678#discussion_r507834050



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalStreamingCommitterOperatorFactory.java
##########
@@ -0,0 +1,36 @@
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+
+/**
+ * A {@link org.apache.flink.streaming.api.operators.StreamOperatorFactory} 
for {@link GlobalStreamingCommitterOperator}.
+ *
+ * @param <CommT> The committable type of the {@link GlobalCommitter}.
+ * @param <GlobalCommT> The global committable type of the {@link 
GlobalCommitter}.
+ */
+public class GlobalStreamingCommitterOperatorFactory<CommT, GlobalCommT> 
extends AbstractStreamingCommitterOperatorFactory<CommT, GlobalCommT> {
+
+       private final Sink<?, CommT, ?, GlobalCommT> sink;
+
+       public GlobalStreamingCommitterOperatorFactory(Sink<?, CommT, ?, 
GlobalCommT> sink) {
+               this.sink = sink;
+       }
+
+       @Override
+       AbstractStreamingCommitterOperator<CommT, GlobalCommT> 
createStreamingCommitterOperator() {
+               return new GlobalStreamingCommitterOperator<>(
+                               sink.createGlobalCommitter()
+                                               .orElseThrow(() -> new 
IllegalArgumentException(

Review comment:
       Probably throw an `IllegalStateException` instead of an 
`IllegalArgumentException`. What do you think @guoweiM ?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalStreamingCommitterOperatorFactory.java
##########
@@ -0,0 +1,36 @@
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+
+/**
+ * A {@link org.apache.flink.streaming.api.operators.StreamOperatorFactory} 
for {@link GlobalStreamingCommitterOperator}.
+ *
+ * @param <CommT> The committable type of the {@link GlobalCommitter}.
+ * @param <GlobalCommT> The global committable type of the {@link 
GlobalCommitter}.
+ */
+public class GlobalStreamingCommitterOperatorFactory<CommT, GlobalCommT> 
extends AbstractStreamingCommitterOperatorFactory<CommT, GlobalCommT> {
+
+       private final Sink<?, CommT, ?, GlobalCommT> sink;
+
+       public GlobalStreamingCommitterOperatorFactory(Sink<?, CommT, ?, 
GlobalCommT> sink) {
+               this.sink = sink;
+       }
+
+       @Override
+       AbstractStreamingCommitterOperator<CommT, GlobalCommT> 
createStreamingCommitterOperator() {
+               return new GlobalStreamingCommitterOperator<>(
+                               sink.createGlobalCommitter()
+                                               .orElseThrow(() -> new 
IllegalArgumentException(
+                                                               "Could not 
create global committer from the sink")),
+                               sink.getGlobalCommittableSerializer()
+                                               .orElseThrow(() -> new 
IllegalArgumentException(

Review comment:
       Same as above.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to