JingGe commented on a change in pull request #18428:
URL: https://github.com/apache/flink/pull/18428#discussion_r793602829
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterStateHandler.java
##########
@@ -17,34 +17,27 @@
package org.apache.flink.streaming.runtime.operators.sink;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.util.function.FunctionWithException;
-
-import java.io.Serializable;
-import java.util.List;
/**
* Manages the state of a {@link
org.apache.flink.api.connector.sink.SinkWriter}. There are only two
* flavors: stateless handled by {@link StatelessSinkWriterStateHandler} and
stateful handled with
* {@link StatefulSinkWriterStateHandler}.
*
- * @param <WriterStateT>
+ * @param <InputT> the input type
*/
-interface SinkWriterStateHandler<WriterStateT> extends Serializable {
- /**
- * Extracts the writer state from the {@link StateInitializationContext}.
The state will be used
- * to create the writer.
- */
- List<WriterStateT> initializeState(StateInitializationContext context)
throws Exception;
+interface SinkWriterStateHandler<InputT> {
/**
- * Stores the state of the supplier. The supplier should only be queried
once.
+ * Stores the state of the writer.
*
- * @param stateExtractor
- * @param checkpointId
+ * @param checkpointId the checkpointId
*/
- void snapshotState(
- FunctionWithException<Long, List<WriterStateT>, Exception>
stateExtractor,
- long checkpointId)
+ void snapshotState(long checkpointId) throws Exception;
+
+ /** Creates a writer, potentially using state from {@link
StateInitializationContext}. */
+ SinkWriter<InputT> createWriter(InitContext initContext,
StateInitializationContext context)
Review comment:
This is the hidden magic method that neither the class name nor the java
doc has described. I would suggest using a more feasible class name to cover
this functionality.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]