Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/4919#discussion_r147688912
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
---
@@ -362,4 +374,213 @@ public void setContext(Optional<CONTEXT> context) {
this.context = context;
}
}
+
+ /**
+ * Custom {@link TypeSerializer} for the sink state.
+ */
+ static final class StateSerializer<TXN, CONTEXT> extends
TypeSerializer<State<TXN, CONTEXT>> {
+
+ private final TypeSerializer<TXN> transactionSerializer;
+ private final TypeSerializer<CONTEXT> contextSerializer;
+
+ public StateSerializer(
+ TypeSerializer<TXN> transactionSerializer,
+ TypeSerializer<CONTEXT> contextSerializer) {
+ this.transactionSerializer =
checkNotNull(transactionSerializer);
+ this.contextSerializer =
checkNotNull(contextSerializer);
+ }
+
+ @Override
+ public boolean isImmutableType() {
+ return transactionSerializer.isImmutableType() &&
contextSerializer.isImmutableType();
+ }
+
+ @Override
+ public TypeSerializer<State<TXN, CONTEXT>> duplicate() {
+ return new StateSerializer<>(
+ transactionSerializer.duplicate(),
contextSerializer.duplicate());
+ }
+
+ @Override
+ public State<TXN, CONTEXT> createInstance() {
+ return null;
+ }
+
+ @Override
+ public State<TXN, CONTEXT> copy(State<TXN, CONTEXT> from) {
+ TXN copiedPendingTransaction =
transactionSerializer.copy(from.getPendingTransaction());
+ List<TXN> copiedPendingCommitTransactions = new
ArrayList<>();
+ for (TXN txn : from.getPendingCommitTransactions()) {
+
copiedPendingCommitTransactions.add(transactionSerializer.copy(txn));
+ }
+ Optional<CONTEXT> copiedContext =
from.getContext().map(contextSerializer::copy);
+ return new State<>(copiedPendingTransaction,
copiedPendingCommitTransactions, copiedContext);
+ }
+
+ @Override
+ public State<TXN, CONTEXT> copy(
+ State<TXN, CONTEXT> from,
+ State<TXN, CONTEXT> reuse) {
+ return copy(from);
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public void serialize(
+ State<TXN, CONTEXT> record,
+ DataOutputView target) throws IOException {
+
transactionSerializer.serialize(record.getPendingTransaction(), target);
+ List<TXN> pendingCommitTransactions =
record.getPendingCommitTransactions();
+ target.writeInt(pendingCommitTransactions.size());
+ for (TXN pendingTxn : pendingCommitTransactions) {
+ transactionSerializer.serialize(pendingTxn,
target);
+ }
+ Optional<CONTEXT> context = record.getContext();
+ if (context.isPresent()) {
+ target.writeBoolean(true);
+ contextSerializer.serialize(context.get(),
target);
+ } else {
+ target.writeBoolean(false);
+ }
+ }
+
+ @Override
+ public State<TXN, CONTEXT> deserialize(DataInputView source)
throws IOException {
+ TXN pendingTxn =
transactionSerializer.deserialize(source);
+ int numPendingCommitTxns = source.readInt();
+ List<TXN> pendingCommitTxns = new
ArrayList<>(numPendingCommitTxns);
+ for (int i = 0; i < numPendingCommitTxns; i++) {
+
pendingCommitTxns.add(transactionSerializer.deserialize(source));
+ }
+ Optional<CONTEXT> context = Optional.empty();
+ boolean hasContext = source.readBoolean();
+ if (hasContext) {
+ context =
Optional.of(contextSerializer.deserialize(source));
+ }
+ return new State<>(pendingTxn, pendingCommitTxns,
context);
+ }
+
+ @Override
+ public State<TXN, CONTEXT> deserialize(
+ State<TXN, CONTEXT> reuse,
+ DataInputView source) throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(
+ DataInputView source, DataOutputView target)
throws IOException {
+ TXN pendingTxn =
transactionSerializer.deserialize(source);
+ transactionSerializer.serialize(pendingTxn, target);
+ int numPendingCommitTxns = source.readInt();
+ target.writeInt(numPendingCommitTxns);
+ for (int i = 0; i < numPendingCommitTxns; i++) {
+ TXN pendingCommitTxn =
transactionSerializer.deserialize(source);
+
transactionSerializer.serialize(pendingCommitTxn, target);
+ }
+ boolean hasContext = source.readBoolean();
--- End diff --
Here you should also write the `hasContext` to the `target`.
---