Repository: flink
Updated Branches:
  refs/heads/release-1.4 3fe70d761 -> 2f3f8c773


[hotfix][kafka] Improve logging in FlinkKafkaProducer011


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a6121a6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a6121a6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a6121a6

Branch: refs/heads/release-1.4
Commit: 1a6121a69df86aecd5cc252b3f6142394ef68047
Parents: 3fe70d7
Author: Piotr Nowojski <[email protected]>
Authored: Fri Nov 17 14:31:07 2017 +0100
Committer: Aljoscha Krettek <[email protected]>
Committed: Mon Nov 20 13:26:08 2017 +0100

----------------------------------------------------------------------
 .../streaming/connectors/kafka/FlinkKafkaProducer011.java     | 7 ++++++-
 .../connectors/kafka/internal/FlinkKafkaProducer.java         | 2 +-
 2 files changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1a6121a6/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 08599d8..611a3d5 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -1022,7 +1022,12 @@ public class FlinkKafkaProducer011<IN>
 
                @Override
                public String toString() {
-                       return String.format("%s [transactionalId=%s]", 
this.getClass().getSimpleName(), transactionalId);
+                       return String.format(
+                               "%s [transactionalId=%s, producerId=%s, 
epoch=%s]",
+                               this.getClass().getSimpleName(),
+                               transactionalId,
+                               producerId,
+                               epoch);
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/1a6121a6/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
index 9d50379..2f58d56 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java
@@ -188,7 +188,7 @@ public class FlinkKafkaProducer<K, V> implements 
Producer<K, V> {
         */
        public void resumeTransaction(long producerId, short epoch) {
                Preconditions.checkState(producerId >= 0 && epoch >= 0, 
"Incorrect values for producerId {} and epoch {}", producerId, epoch);
-               LOG.info("Attempting to resume transaction with producerId {} 
and epoch {}", producerId, epoch);
+               LOG.info("Attempting to resume transaction {} with producerId 
{} and epoch {}", transactionalId, producerId, epoch);
 
                Object transactionManager = getValue(kafkaProducer, 
"transactionManager");
                synchronized (transactionManager) {

Reply via email to