autophagy commented on a change in pull request #16941:
URL: https://github.com/apache/flink/pull/16941#discussion_r698580433
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
##########
@@ -61,13 +62,25 @@ private void commitTransaction(KafkaCommittable
committable) {
new
FlinkKafkaInternalProducer<>(createKafkaProducerConfig(transactionalId))) {
producer.resumeTransaction(committable.getProducerId(),
committable.getEpoch());
producer.commitTransaction();
- } catch (InvalidTxnStateException | ProducerFencedException e) {
- // That means we have committed this transaction before.
+ } catch (InvalidTxnStateException e) {
LOG.warn(
- "Encountered error {} while recovering transaction {}. "
- + "Presumably this transaction has been already
committed before",
- e,
- committable);
+ "Unable to commit recovered transaction ({}) because it's
in invalid state. "
+ + "Most likely the transaction has been aborted
for some reason. Please check Kafka logs for more details.",
+ committable,
+ e);
+ } catch (ProducerFencedException e) {
+ LOG.warn(
+ "Unable to commit recovered transaction ({}) because its
producer is already fenced."
+ + " This means that you either have a different
producer with the same '{}' (this is"
+ + " unlikely with the '{}' as all generated ids
are unique and shouldn't be reused)"
+ + " or recovery took longer than '{}' ({}ms). In
both cases this most likely signals data loss,"
+ + " please consult documentation for more
details.",
Review comment:
Would it be possible to link to some documentation on this here or
provide an indication where the user might look?
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
##########
@@ -1355,6 +1353,7 @@ String getTransactionalId() {
FlinkKafkaInternalProducer<byte[], byte[]> producer =
initTransactionalProducer(transactionalId, true);
producer.initTransactions();
+ System.out.println("NEW PRODUCER " + transactionalId);
Review comment:
Was this intentionally comitted? All the other code uses `LOG`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]