This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.7 by this push:
     new ff4c143  [FLINK-10455][Connectors/Kafka] Ensure all the KafkaProducers 
closed on an exception
ff4c143 is described below

commit ff4c14331db69d5142c03555ac900e4eaffd32b4
Author: Jiangjie (Becket) Qin <[email protected]>
AuthorDate: Tue Jun 11 16:47:37 2019 +0800

    [FLINK-10455][Connectors/Kafka] Ensure all the KafkaProducers closed on an 
exception
    
    The patch fixes the bugs reported in FLINK-10445 by making sure all the 
KafkaProducers are closed when FlinkKafkaProducer is closed. The same fix was 
applied to universal FlinkKafkaProducer and FlinkKafkaProducer011.
---
 .../connectors/kafka/FlinkKafkaProducer011.java    | 52 +++++++++--------
 .../kafka/FlinkKafkaProducer011ITCase.java         | 67 +++++++++++++++-------
 .../src/test/resources/log4j-test.properties       |  3 +-
 .../connectors/kafka/FlinkKafkaProducer.java       | 53 +++++++++--------
 .../connectors/kafka/FlinkKafkaProducerITCase.java | 67 +++++++++++++++-------
 .../src/test/resources/log4j-test.properties       |  3 +-
 6 files changed, 155 insertions(+), 90 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 3e7cf2b..e401164 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -648,33 +648,39 @@ public class FlinkKafkaProducer011<IN>
 
        @Override
        public void close() throws FlinkKafka011Exception {
-               final KafkaTransactionState currentTransaction = 
currentTransaction();
-               if (currentTransaction != null) {
-                       // to avoid exceptions on aborting transactions with 
some pending records
-                       flush(currentTransaction);
-
-                       // normal abort for AT_LEAST_ONCE and NONE do not clean 
up resources because of producer reusing, thus
-                       // we need to close it manually
-                       switch (semantic) {
-                               case EXACTLY_ONCE:
-                                       break;
-                               case AT_LEAST_ONCE:
-                               case NONE:
-                                       currentTransaction.producer.close();
-                                       break;
-                       }
-               }
+               // First close the producer for current transaction.
                try {
+                       final KafkaTransactionState currentTransaction = 
currentTransaction();
+                       if (currentTransaction != null) {
+                               // to avoid exceptions on aborting transactions 
with some pending records
+                               flush(currentTransaction);
+
+                               // normal abort for AT_LEAST_ONCE and NONE do 
not clean up resources because of producer reusing, thus
+                               // we need to close it manually
+                               switch (semantic) {
+                                       case EXACTLY_ONCE:
+                                               break;
+                                       case AT_LEAST_ONCE:
+                                       case NONE:
+                                               
currentTransaction.producer.close();
+                                               break;
+                               }
+                       }
                        super.close();
-               }
-               catch (Exception e) {
+               } catch (Exception e) {
                        asyncException = ExceptionUtils.firstOrSuppressed(e, 
asyncException);
+               } finally {
+                       // We may have to close producer of the current 
transaction in case some exception was thrown before
+                       // the normal close routine finishes.
+                       if (currentTransaction() != null) {
+                               
IOUtils.closeQuietly(currentTransaction().producer);
+                       }
+                       // Make sure all the producers for pending transactions 
are closed.
+                       pendingTransactions().forEach(transaction -> 
IOUtils.closeQuietly(transaction.getValue().producer)
+                       );
+                       // make sure we propagate pending errors
+                       checkErroneous();
                }
-               // make sure we propagate pending errors
-               checkErroneous();
-               pendingTransactions().forEach(transaction ->
-                       IOUtils.closeQuietly(transaction.getValue().producer)
-               );
        }
 
        // ------------------- Logic for handling checkpoint flushing 
-------------------------- //
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
index ca75b1a..13f8dd2 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
@@ -116,6 +116,7 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBase {
                        else {
                                initialActiveThreads = 
Optional.of(Thread.activeCount());
                        }
+                       checkProducerLeak();
                }
        }
 
@@ -164,6 +165,7 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBase {
                                throw ex;
                        }
                }
+               checkProducerLeak();
        }
 
        @Test
@@ -206,6 +208,7 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBase {
                assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(42, 43));
 
                deleteTestTopic(topic);
+               checkProducerLeak();
        }
 
        @Test
@@ -266,32 +269,32 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBase {
        public void testFailBeforeNotifyAndResumeWorkAfterwards() throws 
Exception {
                String topic = "flink-kafka-producer-fail-before-notify";
 
-               OneInputStreamOperatorTestHarness<Integer, Object> testHarness 
= createTestHarness(topic);
-
-               testHarness.setup();
-               testHarness.open();
-               testHarness.processElement(42, 0);
-               testHarness.snapshot(0, 1);
-               testHarness.processElement(43, 2);
-               OperatorSubtaskState snapshot1 = testHarness.snapshot(1, 3);
+               OneInputStreamOperatorTestHarness<Integer, Object> testHarness1 
= createTestHarness(topic);
+               checkProducerLeak();
+               testHarness1.setup();
+               testHarness1.open();
+               testHarness1.processElement(42, 0);
+               testHarness1.snapshot(0, 1);
+               testHarness1.processElement(43, 2);
+               OperatorSubtaskState snapshot1 = testHarness1.snapshot(1, 3);
 
-               testHarness.processElement(44, 4);
-               testHarness.snapshot(2, 5);
-               testHarness.processElement(45, 6);
+               testHarness1.processElement(44, 4);
+               testHarness1.snapshot(2, 5);
+               testHarness1.processElement(45, 6);
 
                // do not close previous testHarness to make sure that closing 
do not clean up something (in case of failure
                // there might not be any close)
-               testHarness = createTestHarness(topic);
-               testHarness.setup();
+               OneInputStreamOperatorTestHarness<Integer, Object> testHarness2 
= createTestHarness(topic);
+               testHarness2.setup();
                // restore from snapshot1, transactions with records 44 and 45 
should be aborted
-               testHarness.initializeState(snapshot1);
-               testHarness.open();
+               testHarness2.initializeState(snapshot1);
+               testHarness2.open();
 
                // write and commit more records, after potentially lingering 
transactions
-               testHarness.processElement(46, 7);
-               testHarness.snapshot(4, 8);
-               testHarness.processElement(47, 9);
-               testHarness.notifyOfCompletedCheckpoint(4);
+               testHarness2.processElement(46, 7);
+               testHarness2.snapshot(4, 8);
+               testHarness2.processElement(47, 9);
+               testHarness2.notifyOfCompletedCheckpoint(4);
 
                //now we should have:
                // - records 42 and 43 in committed transactions
@@ -300,8 +303,18 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBase {
                // - pending transaction with record 47
                assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(42, 43, 46));
 
-               testHarness.close();
+               try {
+                       testHarness1.close();
+               } catch (Exception e) {
+                       // The only acceptable exception is 
ProducerFencedException because testHarness2 uses the same
+                       // transactional ID.
+                       if (!(e.getCause() instanceof ProducerFencedException)) 
{
+                               fail("Received unexpected exception " + e);
+                       }
+               }
+               testHarness2.close();
                deleteTestTopic(topic);
+               checkProducerLeak();
        }
 
        @Test
@@ -349,6 +362,7 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBase {
                // - aborted transactions with records 44 and 45
                assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(42, 43));
                deleteTestTopic(topic);
+               checkProducerLeak();
        }
 
        /**
@@ -407,6 +421,7 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBase {
                        closeIgnoringProducerFenced(operatorToClose);
                }
                deleteTestTopic(topic);
+               checkProducerLeak();
        }
 
        /**
@@ -470,6 +485,7 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBase {
                        0,
                        IntStream.range(0, parallelism1 + parallelism2 + 
parallelism3).boxed().collect(Collectors.toList()));
                deleteTestTopic(topic);
+               checkProducerLeak();
        }
 
        private List<OperatorStateHandle> repartitionAndExecute(
@@ -543,6 +559,7 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBase {
                assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(42));
 
                deleteTestTopic(topic);
+               checkProducerLeak();
        }
 
        @Test
@@ -565,6 +582,7 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBase {
                        }
                }
                deleteTestTopic(topic);
+               checkProducerLeak();
        }
 
        @Test
@@ -609,6 +627,7 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBase {
                        testHarness.notifyOfCompletedCheckpoint(2);
                        testHarness.processElement(47, 9);
                }
+               checkProducerLeak();
        }
 
        // shut down a Kafka broker
@@ -696,4 +715,12 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBase {
                }
                return false;
        }
+
+       private void checkProducerLeak() {
+               for (Thread t : Thread.getAllStackTraces().keySet()) {
+                       if 
(t.getName().contains("kafka-producer-network-thread")) {
+                               fail("Detected producer leak. Thread name: " + 
t.getName());
+                       }
+               }
+       }
 }
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/resources/log4j-test.properties
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/resources/log4j-test.properties
index fbeb110..2ce32f0 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/test/resources/log4j-test.properties
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
 # limitations under the License.
 
################################################################################
 
-log4j.rootLogger=INFO, testlogger
+log4j.rootLogger=OFF, testlogger
 
 log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
 log4j.appender.testlogger.target = System.err
@@ -24,7 +24,6 @@ 
log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
 log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
 
 # suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
 log4j.logger.org.apache.zookeeper=OFF, testlogger
 log4j.logger.state.change.logger=OFF, testlogger
 log4j.logger.kafka=OFF, testlogger
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 10e8ef1..97ecc6f 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -650,33 +650,40 @@ public class FlinkKafkaProducer<IN>
 
        @Override
        public void close() throws FlinkKafkaException {
-               final FlinkKafkaProducer.KafkaTransactionState 
currentTransaction = currentTransaction();
-               if (currentTransaction != null) {
-                       // to avoid exceptions on aborting transactions with 
some pending records
-                       flush(currentTransaction);
-
-                       // normal abort for AT_LEAST_ONCE and NONE do not clean 
up resources because of producer reusing, thus
-                       // we need to close it manually
-                       switch (semantic) {
-                               case EXACTLY_ONCE:
-                                       break;
-                               case AT_LEAST_ONCE:
-                               case NONE:
-                                       currentTransaction.producer.close();
-                                       break;
-                       }
-               }
+               // First close the producer for current transaction.
                try {
+                       final KafkaTransactionState currentTransaction = 
currentTransaction();
+                       if (currentTransaction != null) {
+                               // to avoid exceptions on aborting transactions 
with some pending records
+                               flush(currentTransaction);
+
+                               // normal abort for AT_LEAST_ONCE and NONE do 
not clean up resources because of producer reusing, thus
+                               // we need to close it manually
+                               switch (semantic) {
+                                       case EXACTLY_ONCE:
+                                               break;
+                                       case AT_LEAST_ONCE:
+                                       case NONE:
+                                               
currentTransaction.producer.close();
+                                               break;
+                               }
+                       }
                        super.close();
-               }
-               catch (Exception e) {
+               } catch (Exception e) {
                        asyncException = ExceptionUtils.firstOrSuppressed(e, 
asyncException);
+               } finally {
+                       // We may have to close producer of the current 
transaction in case some exception was thrown before
+                       // the normal close routine finishes.
+                       if (currentTransaction() != null) {
+                               
IOUtils.closeQuietly(currentTransaction().producer);
+                       }
+                       // Make sure all the producers for pending transactions 
are closed.
+                       pendingTransactions().forEach(transaction ->
+                                       
IOUtils.closeQuietly(transaction.getValue().producer)
+                       );
+                       // make sure we propagate pending errors
+                       checkErroneous();
                }
-               // make sure we propagate pending errors
-               checkErroneous();
-               pendingTransactions().forEach(transaction ->
-                       IOUtils.closeQuietly(transaction.getValue().producer)
-               );
        }
 
        // ------------------- Logic for handling checkpoint flushing 
-------------------------- //
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
index 29f157f..e4c9da3 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
@@ -113,6 +113,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase 
{
                                initialActiveThreads = 
Optional.of(Thread.activeCount());
                        }
                }
+               checkProducerLeak();
        }
 
        /**
@@ -160,6 +161,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase 
{
                                throw ex;
                        }
                }
+               checkProducerLeak();
        }
 
        @Test
@@ -202,6 +204,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase 
{
                assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(42, 43));
 
                deleteTestTopic(topic);
+               checkProducerLeak();
        }
 
        @Test
@@ -262,32 +265,32 @@ public class FlinkKafkaProducerITCase extends 
KafkaTestBase {
        public void testFailBeforeNotifyAndResumeWorkAfterwards() throws 
Exception {
                String topic = "flink-kafka-producer-fail-before-notify";
 
-               OneInputStreamOperatorTestHarness<Integer, Object> testHarness 
= createTestHarness(topic);
-
-               testHarness.setup();
-               testHarness.open();
-               testHarness.processElement(42, 0);
-               testHarness.snapshot(0, 1);
-               testHarness.processElement(43, 2);
-               OperatorSubtaskState snapshot1 = testHarness.snapshot(1, 3);
+               OneInputStreamOperatorTestHarness<Integer, Object> testHarness1 
= createTestHarness(topic);
+               checkProducerLeak();
+               testHarness1.setup();
+               testHarness1.open();
+               testHarness1.processElement(42, 0);
+               testHarness1.snapshot(0, 1);
+               testHarness1.processElement(43, 2);
+               OperatorSubtaskState snapshot1 = testHarness1.snapshot(1, 3);
 
-               testHarness.processElement(44, 4);
-               testHarness.snapshot(2, 5);
-               testHarness.processElement(45, 6);
+               testHarness1.processElement(44, 4);
+               testHarness1.snapshot(2, 5);
+               testHarness1.processElement(45, 6);
 
                // do not close previous testHarness to make sure that closing 
do not clean up something (in case of failure
                // there might not be any close)
-               testHarness = createTestHarness(topic);
-               testHarness.setup();
+               OneInputStreamOperatorTestHarness<Integer, Object> testHarness2 
= createTestHarness(topic);
+               testHarness2.setup();
                // restore from snapshot1, transactions with records 44 and 45 
should be aborted
-               testHarness.initializeState(snapshot1);
-               testHarness.open();
+               testHarness2.initializeState(snapshot1);
+               testHarness2.open();
 
                // write and commit more records, after potentially lingering 
transactions
-               testHarness.processElement(46, 7);
-               testHarness.snapshot(4, 8);
-               testHarness.processElement(47, 9);
-               testHarness.notifyOfCompletedCheckpoint(4);
+               testHarness2.processElement(46, 7);
+               testHarness2.snapshot(4, 8);
+               testHarness2.processElement(47, 9);
+               testHarness2.notifyOfCompletedCheckpoint(4);
 
                //now we should have:
                // - records 42 and 43 in committed transactions
@@ -296,8 +299,18 @@ public class FlinkKafkaProducerITCase extends 
KafkaTestBase {
                // - pending transaction with record 47
                assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(42, 43, 46));
 
-               testHarness.close();
+               try {
+                       testHarness1.close();
+               } catch (Exception e) {
+                       // The only acceptable exception is 
ProducerFencedException because testHarness2 uses the same
+                       // transactional ID.
+                       if (!(e.getCause() instanceof ProducerFencedException)) 
{
+                               fail("Received unexpected exception " + e);
+                       }
+               }
+               testHarness2.close();
                deleteTestTopic(topic);
+               checkProducerLeak();
        }
 
        @Test
@@ -345,6 +358,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase 
{
                // - aborted transactions with records 44 and 45
                assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(42, 43));
                deleteTestTopic(topic);
+               checkProducerLeak();
        }
 
        /**
@@ -403,6 +417,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase 
{
                        closeIgnoringProducerFenced(operatorToClose);
                }
                deleteTestTopic(topic);
+               checkProducerLeak();
        }
 
        /**
@@ -466,6 +481,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase 
{
                        0,
                        IntStream.range(0, parallelism1 + parallelism2 + 
parallelism3).boxed().collect(Collectors.toList()));
                deleteTestTopic(topic);
+               checkProducerLeak();
        }
 
        private List<OperatorStateHandle> repartitionAndExecute(
@@ -539,6 +555,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase 
{
                assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(42));
 
                deleteTestTopic(topic);
+               checkProducerLeak();
        }
 
        @Test
@@ -561,6 +578,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase 
{
                        }
                }
                deleteTestTopic(topic);
+               checkProducerLeak();
        }
 
        @Test
@@ -605,6 +623,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase 
{
                        testHarness.notifyOfCompletedCheckpoint(2);
                        testHarness.processElement(47, 9);
                }
+               checkProducerLeak();
        }
 
        // 
-----------------------------------------------------------------------------------------------------------------
@@ -695,4 +714,12 @@ public class FlinkKafkaProducerITCase extends 
KafkaTestBase {
                return false;
        }
 
+       private void checkProducerLeak() {
+               for (Thread t : Thread.getAllStackTraces().keySet()) {
+                       if 
(t.getName().contains("kafka-producer-network-thread")) {
+                               fail("Detected producer leak. Thread name: " + 
t.getName());
+                       }
+               }
+       }
+
 }
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
 
b/flink-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
index fbeb110..2ce32f0 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
+++ 
b/flink-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
 # limitations under the License.
 
################################################################################
 
-log4j.rootLogger=INFO, testlogger
+log4j.rootLogger=OFF, testlogger
 
 log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
 log4j.appender.testlogger.target = System.err
@@ -24,7 +24,6 @@ 
log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
 log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
 
 # suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
 log4j.logger.org.apache.zookeeper=OFF, testlogger
 log4j.logger.state.change.logger=OFF, testlogger
 log4j.logger.kafka=OFF, testlogger

Reply via email to