[
https://issues.apache.org/jira/browse/FLINK-7902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16224838#comment-16224838
]
ASF GitHub Bot commented on FLINK-7902:
---------------------------------------
Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/4919#discussion_r147678569
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
---
@@ -79,19 +92,18 @@
* TwoPhaseCommitSinkFunction(TypeInformation.of(new
TypeHint<State<TXN, CONTEXT>>() {}));
* }
* </pre>
- * @param stateTypeInformation {@link TypeInformation} for POJO holding
state of opened transactions.
- */
- public TwoPhaseCommitSinkFunction(TypeInformation<State<TXN, CONTEXT>>
stateTypeInformation) {
- this(new ListStateDescriptor<State<TXN, CONTEXT>>("state",
stateTypeInformation));
- }
-
- /**
- * Instantiate {@link TwoPhaseCommitSinkFunction} with custom state
descriptors.
*
- * @param stateDescriptor descriptor for transactions POJO.
+ * @param transactionSerializer {@link TypeSerializer} for the
transaction type of this sink
+ * @param contextSerializer {@link TypeSerializer} for the context type
of this sink
*/
- public TwoPhaseCommitSinkFunction(ListStateDescriptor<State<TXN,
CONTEXT>> stateDescriptor) {
- this.stateDescriptor = requireNonNull(stateDescriptor,
"stateDescriptor is null");
+ public TwoPhaseCommitSinkFunction(
--- End diff --
I know this is not part of this PR, but lines 81, 82, 84 should be
`transient`.
> TwoPhaseCommitSinkFunctions should use custom TypeSerializer
> ------------------------------------------------------------
>
> Key: FLINK-7902
> URL: https://issues.apache.org/jira/browse/FLINK-7902
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Affects Versions: 1.4.0
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
> Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, the {{FlinkKafkaProducer011}} uses {{TypeInformation.of(new
> TypeHint<State<KafkaTransactionState, KafkaTransactionContext>>() {})}} to
> create a {{TypeInformation}} which in turn is used to create a
> {{StateDescriptor}} for the state that the Kafka sink stores.
> Behind the scenes, this would be roughly analysed as a
> {{PojoType(GenericType<KafkaTransactionState>,
> GenericType<KafkaTransactionContext>)}} which means we don't have explicit
> control over the serialisation format and we also use Kryo (which is the
> default for {{GenericTypeInfo}}). This can be problematic if we want to
> evolve the state schema in the future or if we want to change Kryo versions.
> We should change {{TwoPhaseCommitSinkFunction}} to only have this constructor:
> {code}
> public TwoPhaseCommitSinkFunction(TypeSerializer<State<TXN, CONTEXT>>
> stateSerializer) {
> {code}
> and we should then change the {{FlinkKafkaProducer011}} to hand in a
> custom-made {{TypeSerializer}} for the state.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)