Repository: flink Updated Branches: refs/heads/release-1.4 3fe70d761 -> 2f3f8c773
[hotfix][kafka] Improve logging in FlinkKafkaProducer011 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a6121a6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a6121a6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a6121a6 Branch: refs/heads/release-1.4 Commit: 1a6121a69df86aecd5cc252b3f6142394ef68047 Parents: 3fe70d7 Author: Piotr Nowojski <[email protected]> Authored: Fri Nov 17 14:31:07 2017 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Mon Nov 20 13:26:08 2017 +0100 ---------------------------------------------------------------------- .../streaming/connectors/kafka/FlinkKafkaProducer011.java | 7 ++++++- .../connectors/kafka/internal/FlinkKafkaProducer.java | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1a6121a6/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java index 08599d8..611a3d5 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java @@ -1022,7 +1022,12 @@ public class FlinkKafkaProducer011<IN> @Override public String toString() { - return String.format("%s [transactionalId=%s]", this.getClass().getSimpleName(), transactionalId); + return String.format( + "%s [transactionalId=%s, producerId=%s, epoch=%s]", + this.getClass().getSimpleName(), + transactionalId, + producerId, + epoch); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/1a6121a6/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java index 9d50379..2f58d56 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java @@ -188,7 +188,7 @@ public class FlinkKafkaProducer<K, V> implements Producer<K, V> { */ public void resumeTransaction(long producerId, short epoch) { Preconditions.checkState(producerId >= 0 && epoch >= 0, "Incorrect values for producerId {} and epoch {}", producerId, epoch); - LOG.info("Attempting to resume transaction with producerId {} and epoch {}", producerId, epoch); + LOG.info("Attempting to resume transaction {} with producerId {} and epoch {}", transactionalId, producerId, epoch); Object transactionManager = getValue(kafkaProducer, "transactionManager"); synchronized (transactionManager) {
