Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/4919#discussion_r147692002
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
---
@@ -362,4 +374,213 @@ public void setContext(Optional<CONTEXT> context) {
this.context = context;
}
}
+
+ /**
+ * Custom {@link TypeSerializer} for the sink state.
+ */
+ static final class StateSerializer<TXN, CONTEXT> extends
TypeSerializer<State<TXN, CONTEXT>> {
+
--- End diff --
fixing
---