[hotfix] [kafka connector] Minor code cleanups in the Kafka Producer
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f8cd9ba8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f8cd9ba8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f8cd9ba8 Branch: refs/heads/master Commit: f8cd9ba8c4c528c316d99aa7c5c9a35b2f2dab47 Parents: 64aa7c8 Author: Stephan Ewen <[email protected]> Authored: Tue Jul 5 10:18:38 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Thu Jul 14 21:11:48 2016 +0200 ---------------------------------------------------------------------- .../connectors/kafka/FlinkKafkaProducer08.java | 1 - .../kafka/FlinkKafkaProducerBase.java | 39 ++++++++------------ 2 files changed, 15 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f8cd9ba8/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java index e509d2f..65de5fc 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java @@ -129,7 +129,6 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> { protected void flush() { // The Kafka 0.8 producer doesn't support flushing, we wait here // until all pending records are confirmed - //noinspection SynchronizeOnNonFinalField synchronized (pendingRecordsLock) { while (pendingRecords > 0) { try { http://git-wip-us.apache.org/repos/asf/flink/blob/f8cd9ba8/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java index a9d4917..0e05f91 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.util.SerializableObject; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; @@ -60,7 +61,6 @@ import static java.util.Objects.requireNonNull; * * @param <IN> Type of the messages to write into Kafka. */ -@SuppressWarnings("SynchronizeOnNonFinalField") public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> implements Checkpointed<Serializable> { private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducerBase.class); @@ -112,7 +112,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im /** * If true, the producer will wait until all outstanding records have been send to the broker. */ - private boolean flushOnCheckpoint = false; + private boolean flushOnCheckpoint; // -------------------------------- Runtime fields ------------------------------------------ @@ -121,19 +121,15 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im /** The callback than handles error propagation or logging callbacks */ protected transient Callback callback; - + /** Errors encountered in the async producer are stored here */ protected transient volatile Exception asyncException; - /** - * Number of unacknowledged records. - */ - protected long pendingRecords = 0; + /** Lock for accessing the pending records */ + protected final SerializableObject pendingRecordsLock = new SerializableObject(); - /** - * Lock for accessing the pending records - */ - protected transient Object pendingRecordsLock; + /** Number of unacknowledged records. */ + protected long pendingRecords; /** @@ -233,10 +229,10 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im } LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into topic {}", - ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), defaultTopicId); + ctx.getIndexOfThisSubtask() + 1, ctx.getNumberOfParallelSubtasks(), defaultTopicId); // register Kafka metrics to Flink accumulators - if (!Boolean.valueOf(producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) { + if (!Boolean.parseBoolean(producerConfig.getProperty(KEY_DISABLE_METRICS, "false"))) { Map<MetricName, ? extends Metric> metrics = this.producer.metrics(); if (metrics == null) { @@ -258,9 +254,6 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im LOG.warn("Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing."); flushOnCheckpoint = false; } - if (flushOnCheckpoint) { - pendingRecordsLock = new Object(); - } if (logFailuresOnly) { callback = new Callback() { @@ -332,14 +325,12 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im // ------------------- Logic for handling checkpoint flushing -------------------------- // private void acknowledgeMessage() { - if (!flushOnCheckpoint) { - // the logic is disabled - return; - } - synchronized (pendingRecordsLock) { - pendingRecords--; - if (pendingRecords == 0) { - pendingRecordsLock.notifyAll(); + if (flushOnCheckpoint) { + synchronized (pendingRecordsLock) { + pendingRecords--; + if (pendingRecords == 0) { + pendingRecordsLock.notifyAll(); + } } } }
