[hotfix] [kafka connector] Minor code cleanups in the Kafka Producer

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f8cd9ba8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f8cd9ba8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f8cd9ba8

Branch: refs/heads/master
Commit: f8cd9ba8c4c528c316d99aa7c5c9a35b2f2dab47
Parents: 64aa7c8
Author: Stephan Ewen <[email protected]>
Authored: Tue Jul 5 10:18:38 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Thu Jul 14 21:11:48 2016 +0200

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaProducer08.java  |  1 -
 .../kafka/FlinkKafkaProducerBase.java           | 39 ++++++++------------
 2 files changed, 15 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f8cd9ba8/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
index e509d2f..65de5fc 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
@@ -129,7 +129,6 @@ public class FlinkKafkaProducer08<IN> extends 
FlinkKafkaProducerBase<IN>  {
        protected void flush() {
                // The Kafka 0.8 producer doesn't support flushing, we wait here
                // until all pending records are confirmed
-               //noinspection SynchronizeOnNonFinalField
                synchronized (pendingRecordsLock) {
                        while (pendingRecords > 0) {
                                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/f8cd9ba8/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index a9d4917..0e05f91 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
@@ -60,7 +61,6 @@ import static java.util.Objects.requireNonNull;
  *
  * @param <IN> Type of the messages to write into Kafka.
  */
-@SuppressWarnings("SynchronizeOnNonFinalField")
 public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> 
implements Checkpointed<Serializable> {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(FlinkKafkaProducerBase.class);
@@ -112,7 +112,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN> im
        /**
         * If true, the producer will wait until all outstanding records have 
been send to the broker.
         */
-       private boolean flushOnCheckpoint = false;
+       private boolean flushOnCheckpoint;
        
        // -------------------------------- Runtime fields 
------------------------------------------
 
@@ -121,19 +121,15 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN> im
 
        /** The callback than handles error propagation or logging callbacks */
        protected transient Callback callback;
-       
+
        /** Errors encountered in the async producer are stored here */
        protected transient volatile Exception asyncException;
 
-       /**
-        * Number of unacknowledged records.
-        */
-       protected long pendingRecords = 0;
+       /** Lock for accessing the pending records */
+       protected final SerializableObject pendingRecordsLock = new 
SerializableObject();
 
-       /**
-        * Lock for accessing the pending records
-        */
-       protected transient Object pendingRecordsLock;
+       /** Number of unacknowledged records. */
+       protected long pendingRecords;
 
 
        /**
@@ -233,10 +229,10 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN> im
                }
 
                LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into 
topic {}", 
-                               ctx.getIndexOfThisSubtask(), 
ctx.getNumberOfParallelSubtasks(), defaultTopicId);
+                               ctx.getIndexOfThisSubtask() + 1, 
ctx.getNumberOfParallelSubtasks(), defaultTopicId);
 
                // register Kafka metrics to Flink accumulators
-               if 
(!Boolean.valueOf(producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) {
+               if 
(!Boolean.parseBoolean(producerConfig.getProperty(KEY_DISABLE_METRICS, 
"false"))) {
                        Map<MetricName, ? extends Metric> metrics = 
this.producer.metrics();
 
                        if (metrics == null) {
@@ -258,9 +254,6 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN> im
                        LOG.warn("Flushing on checkpoint is enabled, but 
checkpointing is not enabled. Disabling flushing.");
                        flushOnCheckpoint = false;
                }
-               if (flushOnCheckpoint) {
-                       pendingRecordsLock = new Object();
-               }
 
                if (logFailuresOnly) {
                        callback = new Callback() {
@@ -332,14 +325,12 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN> im
        // ------------------- Logic for handling checkpoint flushing 
-------------------------- //
 
        private void acknowledgeMessage() {
-               if (!flushOnCheckpoint) {
-                       // the logic is disabled
-                       return;
-               }
-               synchronized (pendingRecordsLock) {
-                       pendingRecords--;
-                       if (pendingRecords == 0) {
-                               pendingRecordsLock.notifyAll();
+               if (flushOnCheckpoint) {
+                       synchronized (pendingRecordsLock) {
+                               pendingRecords--;
+                               if (pendingRecords == 0) {
+                                       pendingRecordsLock.notifyAll();
+                               }
                        }
                }
        }

Reply via email to