[
https://issues.apache.org/jira/browse/FLINK-7902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16224863#comment-16224863
]
ASF GitHub Bot commented on FLINK-7902:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/4919#discussion_r147692117
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
---
@@ -362,4 +374,213 @@ public void setContext(Optional<CONTEXT> context) {
this.context = context;
}
}
+
+ /**
+ * Custom {@link TypeSerializer} for the sink state.
+ */
+ static final class StateSerializer<TXN, CONTEXT> extends
TypeSerializer<State<TXN, CONTEXT>> {
+
+ private final TypeSerializer<TXN> transactionSerializer;
+ private final TypeSerializer<CONTEXT> contextSerializer;
+
+ public StateSerializer(
+ TypeSerializer<TXN> transactionSerializer,
+ TypeSerializer<CONTEXT> contextSerializer) {
+ this.transactionSerializer =
checkNotNull(transactionSerializer);
+ this.contextSerializer =
checkNotNull(contextSerializer);
+ }
+
+ @Override
+ public boolean isImmutableType() {
+ return transactionSerializer.isImmutableType() &&
contextSerializer.isImmutableType();
+ }
+
+ @Override
+ public TypeSerializer<State<TXN, CONTEXT>> duplicate() {
+ return new StateSerializer<>(
+ transactionSerializer.duplicate(),
contextSerializer.duplicate());
+ }
+
+ @Override
+ public State<TXN, CONTEXT> createInstance() {
+ return null;
+ }
+
+ @Override
+ public State<TXN, CONTEXT> copy(State<TXN, CONTEXT> from) {
+ TXN copiedPendingTransaction =
transactionSerializer.copy(from.getPendingTransaction());
+ List<TXN> copiedPendingCommitTransactions = new
ArrayList<>();
+ for (TXN txn : from.getPendingCommitTransactions()) {
+
copiedPendingCommitTransactions.add(transactionSerializer.copy(txn));
+ }
+ Optional<CONTEXT> copiedContext =
from.getContext().map(contextSerializer::copy);
+ return new State<>(copiedPendingTransaction,
copiedPendingCommitTransactions, copiedContext);
+ }
+
+ @Override
+ public State<TXN, CONTEXT> copy(
+ State<TXN, CONTEXT> from,
+ State<TXN, CONTEXT> reuse) {
+ return copy(from);
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public void serialize(
+ State<TXN, CONTEXT> record,
+ DataOutputView target) throws IOException {
+
transactionSerializer.serialize(record.getPendingTransaction(), target);
+ List<TXN> pendingCommitTransactions =
record.getPendingCommitTransactions();
+ target.writeInt(pendingCommitTransactions.size());
+ for (TXN pendingTxn : pendingCommitTransactions) {
+ transactionSerializer.serialize(pendingTxn,
target);
+ }
+ Optional<CONTEXT> context = record.getContext();
+ if (context.isPresent()) {
+ target.writeBoolean(true);
+ contextSerializer.serialize(context.get(),
target);
+ } else {
+ target.writeBoolean(false);
+ }
+ }
+
+ @Override
+ public State<TXN, CONTEXT> deserialize(DataInputView source)
throws IOException {
+ TXN pendingTxn =
transactionSerializer.deserialize(source);
+ int numPendingCommitTxns = source.readInt();
+ List<TXN> pendingCommitTxns = new
ArrayList<>(numPendingCommitTxns);
+ for (int i = 0; i < numPendingCommitTxns; i++) {
+
pendingCommitTxns.add(transactionSerializer.deserialize(source));
+ }
+ Optional<CONTEXT> context = Optional.empty();
+ boolean hasContext = source.readBoolean();
+ if (hasContext) {
+ context =
Optional.of(contextSerializer.deserialize(source));
+ }
+ return new State<>(pendingTxn, pendingCommitTxns,
context);
+ }
+
+ @Override
+ public State<TXN, CONTEXT> deserialize(
+ State<TXN, CONTEXT> reuse,
+ DataInputView source) throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(
+ DataInputView source, DataOutputView target)
throws IOException {
+ TXN pendingTxn =
transactionSerializer.deserialize(source);
+ transactionSerializer.serialize(pendingTxn, target);
+ int numPendingCommitTxns = source.readInt();
+ target.writeInt(numPendingCommitTxns);
+ for (int i = 0; i < numPendingCommitTxns; i++) {
+ TXN pendingCommitTxn =
transactionSerializer.deserialize(source);
+
transactionSerializer.serialize(pendingCommitTxn, target);
+ }
+ boolean hasContext = source.readBoolean();
--- End diff --
fixing
> TwoPhaseCommitSinkFunctions should use custom TypeSerializer
> ------------------------------------------------------------
>
> Key: FLINK-7902
> URL: https://issues.apache.org/jira/browse/FLINK-7902
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Affects Versions: 1.4.0
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
> Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, the {{FlinkKafkaProducer011}} uses {{TypeInformation.of(new
> TypeHint<State<KafkaTransactionState, KafkaTransactionContext>>() {})}} to
> create a {{TypeInformation}} which in turn is used to create a
> {{StateDescriptor}} for the state that the Kafka sink stores.
> Behind the scenes, this would be roughly analysed as a
> {{PojoType(GenericType<KafkaTransactionState>,
> GenericType<KafkaTransactionContext>)}} which means we don't have explicit
> control over the serialisation format and we also use Kryo (which is the
> default for {{GenericTypeInfo}}). This can be problematic if we want to
> evolve the state schema in the future or if we want to change Kryo versions.
> We should change {{TwoPhaseCommitSinkFunction}} to only have this constructor:
> {code}
> public TwoPhaseCommitSinkFunction(TypeSerializer<State<TXN, CONTEXT>>
> stateSerializer) {
> {code}
> and we should then change the {{FlinkKafkaProducer011}} to hand in a
> custom-made {{TypeSerializer}} for the state.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)