This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
     new 765eda1  [FLINK-10455][Connectors/Kafka] Ensure all the KafkaProducers 
closed on an exception
765eda1 is described below

commit 765eda1ec412192ace4635892c7b459e5dcd7bfb
Author: Jiangjie (Becket) Qin <[email protected]>
AuthorDate: Tue Jun 11 16:47:37 2019 +0800

    [FLINK-10455][Connectors/Kafka] Ensure all the KafkaProducers closed on an 
exception
    
    The patch fixes the bugs reported in FLINK-10445 by making sure all the 
KafkaProducers are closed when FlinkKafkaProducer is closed. The same fix was 
applied to universal FlinkKafkaProducer and FlinkKafkaProducer011.
---
 .../connectors/kafka/FlinkKafkaProducer011.java    | 52 +++++++++--------
 .../kafka/FlinkKafkaProducer011ITCase.java         | 67 +++++++++++++++-------
 .../src/test/resources/log4j-test.properties       |  3 +-
 .../connectors/kafka/FlinkKafkaProducer.java       | 53 +++++++++--------
 .../connectors/kafka/FlinkKafkaProducerITCase.java | 67 +++++++++++++++-------
 .../src/test/resources/log4j-test.properties       |  3 +-
 6 files changed, 155 insertions(+), 90 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 8b3cccd..5d22cc5 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -650,33 +650,39 @@ public class FlinkKafkaProducer011<IN>
 
        @Override
        public void close() throws FlinkKafka011Exception {
-               final KafkaTransactionState currentTransaction = 
currentTransaction();
-               if (currentTransaction != null) {
-                       // to avoid exceptions on aborting transactions with 
some pending records
-                       flush(currentTransaction);
-
-                       // normal abort for AT_LEAST_ONCE and NONE do not clean 
up resources because of producer reusing, thus
-                       // we need to close it manually
-                       switch (semantic) {
-                               case EXACTLY_ONCE:
-                                       break;
-                               case AT_LEAST_ONCE:
-                               case NONE:
-                                       currentTransaction.producer.close();
-                                       break;
-                       }
-               }
+               // First close the producer for current transaction.
                try {
+                       final KafkaTransactionState currentTransaction = 
currentTransaction();
+                       if (currentTransaction != null) {
+                               // to avoid exceptions on aborting transactions 
with some pending records
+                               flush(currentTransaction);
+
+                               // normal abort for AT_LEAST_ONCE and NONE do 
not clean up resources because of producer reusing, thus
+                               // we need to close it manually
+                               switch (semantic) {
+                                       case EXACTLY_ONCE:
+                                               break;
+                                       case AT_LEAST_ONCE:
+                                       case NONE:
+                                               
currentTransaction.producer.close();
+                                               break;
+                               }
+                       }
                        super.close();
-               }
-               catch (Exception e) {
+               } catch (Exception e) {
                        asyncException = ExceptionUtils.firstOrSuppressed(e, 
asyncException);
+               } finally {
+                       // We may have to close producer of the current 
transaction in case some exception was thrown before
+                       // the normal close routine finishes.
+                       if (currentTransaction() != null) {
+                               
IOUtils.closeQuietly(currentTransaction().producer);
+                       }
+                       // Make sure all the producers for pending transactions 
are closed.
+                       pendingTransactions().forEach(transaction -> 
IOUtils.closeQuietly(transaction.getValue().producer)
+                       );
+                       // make sure we propagate pending errors
+                       checkErroneous();
                }
-               // make sure we propagate pending errors
-               checkErroneous();
-               pendingTransactions().forEach(transaction ->
-                       IOUtils.closeQuietly(transaction.getValue().producer)
-               );
        }
 
        // ------------------- Logic for handling checkpoint flushing 
-------------------------- //
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
index 5e4b0d5..0932d42 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
@@ -115,6 +115,7 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBaseWithFlink {
                        else {
                                initialActiveThreads = 
Optional.of(Thread.activeCount());
                        }
+                       checkProducerLeak();
                }
        }
 
@@ -163,6 +164,7 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBaseWithFlink {
                                throw ex;
                        }
                }
+               checkProducerLeak();
        }
 
        @Test
@@ -205,6 +207,7 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBaseWithFlink {
                assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(42, 43));
 
                deleteTestTopic(topic);
+               checkProducerLeak();
        }
 
        /**
@@ -216,32 +219,32 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBaseWithFlink {
        public void testFailBeforeNotifyAndResumeWorkAfterwards() throws 
Exception {
                String topic = "flink-kafka-producer-fail-before-notify";
 
-               OneInputStreamOperatorTestHarness<Integer, Object> testHarness 
= createTestHarness(topic);
+               OneInputStreamOperatorTestHarness<Integer, Object> testHarness1 
= createTestHarness(topic);
+               checkProducerLeak();
+               testHarness1.setup();
+               testHarness1.open();
+               testHarness1.processElement(42, 0);
+               testHarness1.snapshot(0, 1);
+               testHarness1.processElement(43, 2);
+               OperatorSubtaskState snapshot1 = testHarness1.snapshot(1, 3);
 
-               testHarness.setup();
-               testHarness.open();
-               testHarness.processElement(42, 0);
-               testHarness.snapshot(0, 1);
-               testHarness.processElement(43, 2);
-               OperatorSubtaskState snapshot1 = testHarness.snapshot(1, 3);
-
-               testHarness.processElement(44, 4);
-               testHarness.snapshot(2, 5);
-               testHarness.processElement(45, 6);
+               testHarness1.processElement(44, 4);
+               testHarness1.snapshot(2, 5);
+               testHarness1.processElement(45, 6);
 
                // do not close previous testHarness to make sure that closing 
do not clean up something (in case of failure
                // there might not be any close)
-               testHarness = createTestHarness(topic);
-               testHarness.setup();
+               OneInputStreamOperatorTestHarness<Integer, Object> testHarness2 
= createTestHarness(topic);
+               testHarness2.setup();
                // restore from snapshot1, transactions with records 44 and 45 
should be aborted
-               testHarness.initializeState(snapshot1);
-               testHarness.open();
+               testHarness2.initializeState(snapshot1);
+               testHarness2.open();
 
                // write and commit more records, after potentially lingering 
transactions
-               testHarness.processElement(46, 7);
-               testHarness.snapshot(4, 8);
-               testHarness.processElement(47, 9);
-               testHarness.notifyOfCompletedCheckpoint(4);
+               testHarness2.processElement(46, 7);
+               testHarness2.snapshot(4, 8);
+               testHarness2.processElement(47, 9);
+               testHarness2.notifyOfCompletedCheckpoint(4);
 
                //now we should have:
                // - records 42 and 43 in committed transactions
@@ -250,8 +253,18 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBaseWithFlink {
                // - pending transaction with record 47
                assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(42, 43, 46));
 
-               testHarness.close();
+               try {
+                       testHarness1.close();
+               } catch (Exception e) {
+                       // The only acceptable exception is 
ProducerFencedException because testHarness2 uses the same
+                       // transactional ID.
+                       if (!(e.getCause() instanceof ProducerFencedException)) 
{
+                               fail("Received unexpected exception " + e);
+                       }
+               }
+               testHarness2.close();
                deleteTestTopic(topic);
+               checkProducerLeak();
        }
 
        @Test
@@ -299,6 +312,7 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBaseWithFlink {
                // - aborted transactions with records 44 and 45
                assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(42, 43));
                deleteTestTopic(topic);
+               checkProducerLeak();
        }
 
        /**
@@ -357,6 +371,7 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBaseWithFlink {
                        closeIgnoringProducerFenced(operatorToClose);
                }
                deleteTestTopic(topic);
+               checkProducerLeak();
        }
 
        /**
@@ -424,6 +439,7 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBaseWithFlink {
                        0,
                        IntStream.range(0, parallelism1 + parallelism2 + 
parallelism3).boxed().collect(Collectors.toList()));
                deleteTestTopic(topic);
+               checkProducerLeak();
        }
 
        private OperatorSubtaskState repartitionAndExecute(
@@ -498,6 +514,7 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBaseWithFlink {
                assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(42));
 
                deleteTestTopic(topic);
+               checkProducerLeak();
        }
 
        @Test
@@ -520,6 +537,7 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBaseWithFlink {
                        }
                }
                deleteTestTopic(topic);
+               checkProducerLeak();
        }
 
        @Test
@@ -564,6 +582,7 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBaseWithFlink {
                        testHarness.notifyOfCompletedCheckpoint(2);
                        testHarness.processElement(47, 9);
                }
+               checkProducerLeak();
        }
 
        // shut down a Kafka broker
@@ -651,4 +670,12 @@ public class FlinkKafkaProducer011ITCase extends 
KafkaTestBaseWithFlink {
                }
                return false;
        }
+
+       private void checkProducerLeak() {
+               for (Thread t : Thread.getAllStackTraces().keySet()) {
+                       if 
(t.getName().contains("kafka-producer-network-thread")) {
+                               fail("Detected producer leak. Thread name: " + 
t.getName());
+                       }
+               }
+       }
 }
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/resources/log4j-test.properties
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/resources/log4j-test.properties
index fbeb110..2ce32f0 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/test/resources/log4j-test.properties
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
 # limitations under the License.
 
################################################################################
 
-log4j.rootLogger=INFO, testlogger
+log4j.rootLogger=OFF, testlogger
 
 log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
 log4j.appender.testlogger.target = System.err
@@ -24,7 +24,6 @@ 
log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
 log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
 
 # suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
 log4j.logger.org.apache.zookeeper=OFF, testlogger
 log4j.logger.state.change.logger=OFF, testlogger
 log4j.logger.kafka=OFF, testlogger
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 9eb2df8..d7c8129 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -652,33 +652,40 @@ public class FlinkKafkaProducer<IN>
 
        @Override
        public void close() throws FlinkKafkaException {
-               final FlinkKafkaProducer.KafkaTransactionState 
currentTransaction = currentTransaction();
-               if (currentTransaction != null) {
-                       // to avoid exceptions on aborting transactions with 
some pending records
-                       flush(currentTransaction);
-
-                       // normal abort for AT_LEAST_ONCE and NONE do not clean 
up resources because of producer reusing, thus
-                       // we need to close it manually
-                       switch (semantic) {
-                               case EXACTLY_ONCE:
-                                       break;
-                               case AT_LEAST_ONCE:
-                               case NONE:
-                                       currentTransaction.producer.close();
-                                       break;
-                       }
-               }
+               // First close the producer for current transaction.
                try {
+                       final KafkaTransactionState currentTransaction = 
currentTransaction();
+                       if (currentTransaction != null) {
+                               // to avoid exceptions on aborting transactions 
with some pending records
+                               flush(currentTransaction);
+
+                               // normal abort for AT_LEAST_ONCE and NONE do 
not clean up resources because of producer reusing, thus
+                               // we need to close it manually
+                               switch (semantic) {
+                                       case EXACTLY_ONCE:
+                                               break;
+                                       case AT_LEAST_ONCE:
+                                       case NONE:
+                                               
currentTransaction.producer.close();
+                                               break;
+                               }
+                       }
                        super.close();
-               }
-               catch (Exception e) {
+               } catch (Exception e) {
                        asyncException = ExceptionUtils.firstOrSuppressed(e, 
asyncException);
+               } finally {
+                       // We may have to close producer of the current 
transaction in case some exception was thrown before
+                       // the normal close routine finishes.
+                       if (currentTransaction() != null) {
+                               
IOUtils.closeQuietly(currentTransaction().producer);
+                       }
+                       // Make sure all the producers for pending transactions 
are closed.
+                       pendingTransactions().forEach(transaction ->
+                                       
IOUtils.closeQuietly(transaction.getValue().producer)
+                       );
+                       // make sure we propagate pending errors
+                       checkErroneous();
                }
-               // make sure we propagate pending errors
-               checkErroneous();
-               pendingTransactions().forEach(transaction ->
-                       IOUtils.closeQuietly(transaction.getValue().producer)
-               );
        }
 
        // ------------------- Logic for handling checkpoint flushing 
-------------------------- //
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
index 80e62c2..1097fd6 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
@@ -112,6 +112,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase 
{
                                initialActiveThreads = 
Optional.of(Thread.activeCount());
                        }
                }
+               checkProducerLeak();
        }
 
        /**
@@ -159,6 +160,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase 
{
                                throw ex;
                        }
                }
+               checkProducerLeak();
        }
 
        @Test
@@ -201,6 +203,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase 
{
                assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(42, 43));
 
                deleteTestTopic(topic);
+               checkProducerLeak();
        }
 
        /**
@@ -212,32 +215,32 @@ public class FlinkKafkaProducerITCase extends 
KafkaTestBase {
        public void testFailBeforeNotifyAndResumeWorkAfterwards() throws 
Exception {
                String topic = "flink-kafka-producer-fail-before-notify";
 
-               OneInputStreamOperatorTestHarness<Integer, Object> testHarness 
= createTestHarness(topic);
+               OneInputStreamOperatorTestHarness<Integer, Object> testHarness1 
= createTestHarness(topic);
+               checkProducerLeak();
+               testHarness1.setup();
+               testHarness1.open();
+               testHarness1.processElement(42, 0);
+               testHarness1.snapshot(0, 1);
+               testHarness1.processElement(43, 2);
+               OperatorSubtaskState snapshot1 = testHarness1.snapshot(1, 3);
 
-               testHarness.setup();
-               testHarness.open();
-               testHarness.processElement(42, 0);
-               testHarness.snapshot(0, 1);
-               testHarness.processElement(43, 2);
-               OperatorSubtaskState snapshot1 = testHarness.snapshot(1, 3);
-
-               testHarness.processElement(44, 4);
-               testHarness.snapshot(2, 5);
-               testHarness.processElement(45, 6);
+               testHarness1.processElement(44, 4);
+               testHarness1.snapshot(2, 5);
+               testHarness1.processElement(45, 6);
 
                // do not close previous testHarness to make sure that closing 
do not clean up something (in case of failure
                // there might not be any close)
-               testHarness = createTestHarness(topic);
-               testHarness.setup();
+               OneInputStreamOperatorTestHarness<Integer, Object> testHarness2 
= createTestHarness(topic);
+               testHarness2.setup();
                // restore from snapshot1, transactions with records 44 and 45 
should be aborted
-               testHarness.initializeState(snapshot1);
-               testHarness.open();
+               testHarness2.initializeState(snapshot1);
+               testHarness2.open();
 
                // write and commit more records, after potentially lingering 
transactions
-               testHarness.processElement(46, 7);
-               testHarness.snapshot(4, 8);
-               testHarness.processElement(47, 9);
-               testHarness.notifyOfCompletedCheckpoint(4);
+               testHarness2.processElement(46, 7);
+               testHarness2.snapshot(4, 8);
+               testHarness2.processElement(47, 9);
+               testHarness2.notifyOfCompletedCheckpoint(4);
 
                //now we should have:
                // - records 42 and 43 in committed transactions
@@ -246,8 +249,18 @@ public class FlinkKafkaProducerITCase extends 
KafkaTestBase {
                // - pending transaction with record 47
                assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(42, 43, 46));
 
-               testHarness.close();
+               try {
+                       testHarness1.close();
+               } catch (Exception e) {
+                       // The only acceptable exception is 
ProducerFencedException because testHarness2 uses the same
+                       // transactional ID.
+                       if (!(e.getCause() instanceof ProducerFencedException)) 
{
+                               fail("Received unexpected exception " + e);
+                       }
+               }
+               testHarness2.close();
                deleteTestTopic(topic);
+               checkProducerLeak();
        }
 
        @Test
@@ -295,6 +308,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase 
{
                // - aborted transactions with records 44 and 45
                assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(42, 43));
                deleteTestTopic(topic);
+               checkProducerLeak();
        }
 
        /**
@@ -353,6 +367,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase 
{
                        closeIgnoringProducerFenced(operatorToClose);
                }
                deleteTestTopic(topic);
+               checkProducerLeak();
        }
 
        /**
@@ -420,6 +435,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase 
{
                        0,
                        IntStream.range(0, parallelism1 + parallelism2 + 
parallelism3).boxed().collect(Collectors.toList()));
                deleteTestTopic(topic);
+               checkProducerLeak();
        }
 
        private OperatorSubtaskState repartitionAndExecute(
@@ -494,6 +510,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase 
{
                assertExactlyOnceForTopic(createProperties(), topic, 0, 
Arrays.asList(42));
 
                deleteTestTopic(topic);
+               checkProducerLeak();
        }
 
        @Test
@@ -516,6 +533,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase 
{
                        }
                }
                deleteTestTopic(topic);
+               checkProducerLeak();
        }
 
        @Test
@@ -560,6 +578,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase 
{
                        testHarness.notifyOfCompletedCheckpoint(2);
                        testHarness.processElement(47, 9);
                }
+               checkProducerLeak();
        }
 
        // 
-----------------------------------------------------------------------------------------------------------------
@@ -650,4 +669,12 @@ public class FlinkKafkaProducerITCase extends 
KafkaTestBase {
                return false;
        }
 
+       private void checkProducerLeak() {
+               for (Thread t : Thread.getAllStackTraces().keySet()) {
+                       if 
(t.getName().contains("kafka-producer-network-thread")) {
+                               fail("Detected producer leak. Thread name: " + 
t.getName());
+                       }
+               }
+       }
+
 }
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
 
b/flink-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
index fbeb110..2ce32f0 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
+++ 
b/flink-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
 # limitations under the License.
 
################################################################################
 
-log4j.rootLogger=INFO, testlogger
+log4j.rootLogger=OFF, testlogger
 
 log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
 log4j.appender.testlogger.target = System.err
@@ -24,7 +24,6 @@ 
log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
 log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
 
 # suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
 log4j.logger.org.apache.zookeeper=OFF, testlogger
 log4j.logger.state.change.logger=OFF, testlogger
 log4j.logger.kafka=OFF, testlogger

Reply via email to