[hotfix][kafka] Do not return producers to a pool in abort for non EXACTLY_ONCE 
modes

Previously on abort(...) producers were returned to the pool. This was minor 
bug,
probably without any negative side effect, however this patch fixes it
and adds additional sanity checks to guard against similar bugs
in the future.

This closes #4915.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51e9a015
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51e9a015
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51e9a015

Branch: refs/heads/master
Commit: 51e9a0159ba215e3779e4ef6e0ceb77d3df48f7d
Parents: 5058c3f
Author: Piotr Nowojski <[email protected]>
Authored: Fri Oct 27 15:47:26 2017 +0200
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Thu Nov 2 12:44:16 2017 +0800

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaProducer011.java | 28 ++++++++++++--------
 1 file changed, 17 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/51e9a015/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index f349df3..c27c620 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -45,7 +45,6 @@ import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWra
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.NetUtils;
-import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 
@@ -226,7 +225,7 @@ public class FlinkKafkaProducer011<IN>
        /**
         * Pool of KafkaProducers objects.
         */
-       private transient ProducersPool producersPool = new ProducersPool();
+       private transient Optional<ProducersPool> producersPool = 
Optional.empty();
 
        /**
         * Flag controlling whether we are writing the Flink record's timestamp 
into Kafka.
@@ -596,7 +595,7 @@ public class FlinkKafkaProducer011<IN>
                        asyncException = ExceptionUtils.firstOrSuppressed(e, 
asyncException);
                }
                try {
-                       producersPool.close();
+                       producersPool.ifPresent(pool -> pool.close());
                }
                catch (Exception e) {
                        asyncException = ExceptionUtils.firstOrSuppressed(e, 
asyncException);
@@ -628,7 +627,7 @@ public class FlinkKafkaProducer011<IN>
        }
 
        private FlinkKafkaProducer<byte[], byte[]> 
createOrGetProducerFromPool() throws Exception {
-               FlinkKafkaProducer<byte[], byte[]> producer = 
producersPool.poll();
+               FlinkKafkaProducer<byte[], byte[]> producer = 
getProducersPool().poll();
                if (producer == null) {
                        String transactionalId = 
availableTransactionalIds.poll();
                        if (transactionalId == null) {
@@ -661,7 +660,7 @@ public class FlinkKafkaProducer011<IN>
                switch (semantic) {
                        case EXACTLY_ONCE:
                                transaction.producer.commitTransaction();
-                               producersPool.add(transaction.producer);
+                               getProducersPool().add(transaction.producer);
                                break;
                        case AT_LEAST_ONCE:
                        case NONE:
@@ -703,11 +702,10 @@ public class FlinkKafkaProducer011<IN>
                switch (semantic) {
                        case EXACTLY_ONCE:
                                transaction.producer.abortTransaction();
-                               producersPool.add(transaction.producer);
+                               getProducersPool().add(transaction.producer);
                                break;
                        case AT_LEAST_ONCE:
                        case NONE:
-                               producersPool.add(transaction.producer);
                                break;
                        default:
                                throw new UnsupportedOperationException("Not 
implemented semantic");
@@ -760,7 +758,8 @@ public class FlinkKafkaProducer011<IN>
                nextTransactionalIdHintState.clear();
                // To avoid duplication only first subtask keeps track of next 
transactional id hint. Otherwise all of the
                // subtasks would write exactly same information.
-               if (getRuntimeContext().getIndexOfThisSubtask() == 0 && 
nextTransactionalIdHint != null) {
+               if (getRuntimeContext().getIndexOfThisSubtask() == 0 && 
semantic == Semantic.EXACTLY_ONCE) {
+                       checkState(nextTransactionalIdHint != null, 
"nextTransactionalIdHint must be set for EXACTLY_ONCE");
                        long nextFreeTransactionalId = 
nextTransactionalIdHint.nextFreeTransactionalId;
 
                        // If we scaled up, some (unknown) subtask must have 
created new transactional ids from scratch. In that
@@ -788,7 +787,10 @@ public class FlinkKafkaProducer011<IN>
 
                if (semantic != Semantic.EXACTLY_ONCE) {
                        nextTransactionalIdHint = null;
+                       producersPool = Optional.empty();
                } else {
+                       producersPool = Optional.of(new ProducersPool());
+
                        ArrayList<NextTransactionalIdHint> transactionalIdHints 
= Lists.newArrayList(nextTransactionalIdHintState.get());
                        if (transactionalIdHints.size() > 1) {
                                throw new IllegalStateException(
@@ -829,8 +831,7 @@ public class FlinkKafkaProducer011<IN>
        }
 
        private Set<String> generateNewTransactionalIds() {
-               Preconditions.checkState(nextTransactionalIdHint != null,
-                       "nextTransactionalIdHint must be present for 
EXACTLY_ONCE");
+               checkState(nextTransactionalIdHint != null, 
"nextTransactionalIdHint must be present for EXACTLY_ONCE");
 
                // range of available transactional ids is:
                // [nextFreeTransactionalId, nextFreeTransactionalId + 
parallelism * kafkaProducersPoolSize)
@@ -903,6 +904,11 @@ public class FlinkKafkaProducer011<IN>
                return initProducer(registerMetrics);
        }
 
+       private ProducersPool getProducersPool() {
+               checkState(producersPool.isPresent(), "Trying to access 
uninitialized producer pool");
+               return producersPool.get();
+       }
+
        private FlinkKafkaProducer<byte[], byte[]> initProducer(boolean 
registerMetrics) {
                FlinkKafkaProducer<byte[], byte[]> producer = new 
FlinkKafkaProducer<>(this.producerConfig);
 
@@ -958,7 +964,7 @@ public class FlinkKafkaProducer011<IN>
 
        private void readObject(java.io.ObjectInputStream in) throws 
IOException, ClassNotFoundException {
                in.defaultReadObject();
-               producersPool = new ProducersPool();
+               producersPool = Optional.empty();
        }
 
        private static Properties getPropertiesFromBrokerList(String 
brokerList) {

Reply via email to