This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.7 by this push:
new ff4c143 [FLINK-10455][Connectors/Kafka] Ensure all the KafkaProducers
closed on an exception
ff4c143 is described below
commit ff4c14331db69d5142c03555ac900e4eaffd32b4
Author: Jiangjie (Becket) Qin <[email protected]>
AuthorDate: Tue Jun 11 16:47:37 2019 +0800
[FLINK-10455][Connectors/Kafka] Ensure all the KafkaProducers closed on an
exception
The patch fixes the bugs reported in FLINK-10445 by making sure all the
KafkaProducers are closed when FlinkKafkaProducer is closed. The same fix was
applied to universal FlinkKafkaProducer and FlinkKafkaProducer011.
---
.../connectors/kafka/FlinkKafkaProducer011.java | 52 +++++++++--------
.../kafka/FlinkKafkaProducer011ITCase.java | 67 +++++++++++++++-------
.../src/test/resources/log4j-test.properties | 3 +-
.../connectors/kafka/FlinkKafkaProducer.java | 53 +++++++++--------
.../connectors/kafka/FlinkKafkaProducerITCase.java | 67 +++++++++++++++-------
.../src/test/resources/log4j-test.properties | 3 +-
6 files changed, 155 insertions(+), 90 deletions(-)
diff --git
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 3e7cf2b..e401164 100644
---
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -648,33 +648,39 @@ public class FlinkKafkaProducer011<IN>
@Override
public void close() throws FlinkKafka011Exception {
- final KafkaTransactionState currentTransaction =
currentTransaction();
- if (currentTransaction != null) {
- // to avoid exceptions on aborting transactions with
some pending records
- flush(currentTransaction);
-
- // normal abort for AT_LEAST_ONCE and NONE do not clean
up resources because of producer reusing, thus
- // we need to close it manually
- switch (semantic) {
- case EXACTLY_ONCE:
- break;
- case AT_LEAST_ONCE:
- case NONE:
- currentTransaction.producer.close();
- break;
- }
- }
+ // First close the producer for current transaction.
try {
+ final KafkaTransactionState currentTransaction =
currentTransaction();
+ if (currentTransaction != null) {
+ // to avoid exceptions on aborting transactions
with some pending records
+ flush(currentTransaction);
+
+ // normal abort for AT_LEAST_ONCE and NONE do
not clean up resources because of producer reusing, thus
+ // we need to close it manually
+ switch (semantic) {
+ case EXACTLY_ONCE:
+ break;
+ case AT_LEAST_ONCE:
+ case NONE:
+
currentTransaction.producer.close();
+ break;
+ }
+ }
super.close();
- }
- catch (Exception e) {
+ } catch (Exception e) {
asyncException = ExceptionUtils.firstOrSuppressed(e,
asyncException);
+ } finally {
+ // We may have to close producer of the current
transaction in case some exception was thrown before
+ // the normal close routine finishes.
+ if (currentTransaction() != null) {
+
IOUtils.closeQuietly(currentTransaction().producer);
+ }
+ // Make sure all the producers for pending transactions
are closed.
+ pendingTransactions().forEach(transaction ->
IOUtils.closeQuietly(transaction.getValue().producer)
+ );
+ // make sure we propagate pending errors
+ checkErroneous();
}
- // make sure we propagate pending errors
- checkErroneous();
- pendingTransactions().forEach(transaction ->
- IOUtils.closeQuietly(transaction.getValue().producer)
- );
}
// ------------------- Logic for handling checkpoint flushing
-------------------------- //
diff --git
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
index ca75b1a..13f8dd2 100644
---
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
+++
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
@@ -116,6 +116,7 @@ public class FlinkKafkaProducer011ITCase extends
KafkaTestBase {
else {
initialActiveThreads =
Optional.of(Thread.activeCount());
}
+ checkProducerLeak();
}
}
@@ -164,6 +165,7 @@ public class FlinkKafkaProducer011ITCase extends
KafkaTestBase {
throw ex;
}
}
+ checkProducerLeak();
}
@Test
@@ -206,6 +208,7 @@ public class FlinkKafkaProducer011ITCase extends
KafkaTestBase {
assertExactlyOnceForTopic(createProperties(), topic, 0,
Arrays.asList(42, 43));
deleteTestTopic(topic);
+ checkProducerLeak();
}
@Test
@@ -266,32 +269,32 @@ public class FlinkKafkaProducer011ITCase extends
KafkaTestBase {
public void testFailBeforeNotifyAndResumeWorkAfterwards() throws
Exception {
String topic = "flink-kafka-producer-fail-before-notify";
- OneInputStreamOperatorTestHarness<Integer, Object> testHarness
= createTestHarness(topic);
-
- testHarness.setup();
- testHarness.open();
- testHarness.processElement(42, 0);
- testHarness.snapshot(0, 1);
- testHarness.processElement(43, 2);
- OperatorSubtaskState snapshot1 = testHarness.snapshot(1, 3);
+ OneInputStreamOperatorTestHarness<Integer, Object> testHarness1
= createTestHarness(topic);
+ checkProducerLeak();
+ testHarness1.setup();
+ testHarness1.open();
+ testHarness1.processElement(42, 0);
+ testHarness1.snapshot(0, 1);
+ testHarness1.processElement(43, 2);
+ OperatorSubtaskState snapshot1 = testHarness1.snapshot(1, 3);
- testHarness.processElement(44, 4);
- testHarness.snapshot(2, 5);
- testHarness.processElement(45, 6);
+ testHarness1.processElement(44, 4);
+ testHarness1.snapshot(2, 5);
+ testHarness1.processElement(45, 6);
// do not close previous testHarness to make sure that closing
do not clean up something (in case of failure
// there might not be any close)
- testHarness = createTestHarness(topic);
- testHarness.setup();
+ OneInputStreamOperatorTestHarness<Integer, Object> testHarness2
= createTestHarness(topic);
+ testHarness2.setup();
// restore from snapshot1, transactions with records 44 and 45
should be aborted
- testHarness.initializeState(snapshot1);
- testHarness.open();
+ testHarness2.initializeState(snapshot1);
+ testHarness2.open();
// write and commit more records, after potentially lingering
transactions
- testHarness.processElement(46, 7);
- testHarness.snapshot(4, 8);
- testHarness.processElement(47, 9);
- testHarness.notifyOfCompletedCheckpoint(4);
+ testHarness2.processElement(46, 7);
+ testHarness2.snapshot(4, 8);
+ testHarness2.processElement(47, 9);
+ testHarness2.notifyOfCompletedCheckpoint(4);
//now we should have:
// - records 42 and 43 in committed transactions
@@ -300,8 +303,18 @@ public class FlinkKafkaProducer011ITCase extends
KafkaTestBase {
// - pending transaction with record 47
assertExactlyOnceForTopic(createProperties(), topic, 0,
Arrays.asList(42, 43, 46));
- testHarness.close();
+ try {
+ testHarness1.close();
+ } catch (Exception e) {
+ // The only acceptable exception is
ProducerFencedException because testHarness2 uses the same
+ // transactional ID.
+ if (!(e.getCause() instanceof ProducerFencedException))
{
+ fail("Received unexpected exception " + e);
+ }
+ }
+ testHarness2.close();
deleteTestTopic(topic);
+ checkProducerLeak();
}
@Test
@@ -349,6 +362,7 @@ public class FlinkKafkaProducer011ITCase extends
KafkaTestBase {
// - aborted transactions with records 44 and 45
assertExactlyOnceForTopic(createProperties(), topic, 0,
Arrays.asList(42, 43));
deleteTestTopic(topic);
+ checkProducerLeak();
}
/**
@@ -407,6 +421,7 @@ public class FlinkKafkaProducer011ITCase extends
KafkaTestBase {
closeIgnoringProducerFenced(operatorToClose);
}
deleteTestTopic(topic);
+ checkProducerLeak();
}
/**
@@ -470,6 +485,7 @@ public class FlinkKafkaProducer011ITCase extends
KafkaTestBase {
0,
IntStream.range(0, parallelism1 + parallelism2 +
parallelism3).boxed().collect(Collectors.toList()));
deleteTestTopic(topic);
+ checkProducerLeak();
}
private List<OperatorStateHandle> repartitionAndExecute(
@@ -543,6 +559,7 @@ public class FlinkKafkaProducer011ITCase extends
KafkaTestBase {
assertExactlyOnceForTopic(createProperties(), topic, 0,
Arrays.asList(42));
deleteTestTopic(topic);
+ checkProducerLeak();
}
@Test
@@ -565,6 +582,7 @@ public class FlinkKafkaProducer011ITCase extends
KafkaTestBase {
}
}
deleteTestTopic(topic);
+ checkProducerLeak();
}
@Test
@@ -609,6 +627,7 @@ public class FlinkKafkaProducer011ITCase extends
KafkaTestBase {
testHarness.notifyOfCompletedCheckpoint(2);
testHarness.processElement(47, 9);
}
+ checkProducerLeak();
}
// shut down a Kafka broker
@@ -696,4 +715,12 @@ public class FlinkKafkaProducer011ITCase extends
KafkaTestBase {
}
return false;
}
+
+ private void checkProducerLeak() {
+ for (Thread t : Thread.getAllStackTraces().keySet()) {
+ if
(t.getName().contains("kafka-producer-network-thread")) {
+ fail("Detected producer leak. Thread name: " +
t.getName());
+ }
+ }
+ }
}
diff --git
a/flink-connectors/flink-connector-kafka-0.11/src/test/resources/log4j-test.properties
b/flink-connectors/flink-connector-kafka-0.11/src/test/resources/log4j-test.properties
index fbeb110..2ce32f0 100644
---
a/flink-connectors/flink-connector-kafka-0.11/src/test/resources/log4j-test.properties
+++
b/flink-connectors/flink-connector-kafka-0.11/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
-log4j.rootLogger=INFO, testlogger
+log4j.rootLogger=OFF, testlogger
log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
log4j.appender.testlogger.target = System.err
@@ -24,7 +24,6 @@
log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
log4j.logger.org.apache.zookeeper=OFF, testlogger
log4j.logger.state.change.logger=OFF, testlogger
log4j.logger.kafka=OFF, testlogger
diff --git
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 10e8ef1..97ecc6f 100644
---
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -650,33 +650,40 @@ public class FlinkKafkaProducer<IN>
@Override
public void close() throws FlinkKafkaException {
- final FlinkKafkaProducer.KafkaTransactionState
currentTransaction = currentTransaction();
- if (currentTransaction != null) {
- // to avoid exceptions on aborting transactions with
some pending records
- flush(currentTransaction);
-
- // normal abort for AT_LEAST_ONCE and NONE do not clean
up resources because of producer reusing, thus
- // we need to close it manually
- switch (semantic) {
- case EXACTLY_ONCE:
- break;
- case AT_LEAST_ONCE:
- case NONE:
- currentTransaction.producer.close();
- break;
- }
- }
+ // First close the producer for current transaction.
try {
+ final KafkaTransactionState currentTransaction =
currentTransaction();
+ if (currentTransaction != null) {
+ // to avoid exceptions on aborting transactions
with some pending records
+ flush(currentTransaction);
+
+ // normal abort for AT_LEAST_ONCE and NONE do
not clean up resources because of producer reusing, thus
+ // we need to close it manually
+ switch (semantic) {
+ case EXACTLY_ONCE:
+ break;
+ case AT_LEAST_ONCE:
+ case NONE:
+
currentTransaction.producer.close();
+ break;
+ }
+ }
super.close();
- }
- catch (Exception e) {
+ } catch (Exception e) {
asyncException = ExceptionUtils.firstOrSuppressed(e,
asyncException);
+ } finally {
+ // We may have to close producer of the current
transaction in case some exception was thrown before
+ // the normal close routine finishes.
+ if (currentTransaction() != null) {
+
IOUtils.closeQuietly(currentTransaction().producer);
+ }
+ // Make sure all the producers for pending transactions
are closed.
+ pendingTransactions().forEach(transaction ->
+
IOUtils.closeQuietly(transaction.getValue().producer)
+ );
+ // make sure we propagate pending errors
+ checkErroneous();
}
- // make sure we propagate pending errors
- checkErroneous();
- pendingTransactions().forEach(transaction ->
- IOUtils.closeQuietly(transaction.getValue().producer)
- );
}
// ------------------- Logic for handling checkpoint flushing
-------------------------- //
diff --git
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
index 29f157f..e4c9da3 100644
---
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
+++
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
@@ -113,6 +113,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase
{
initialActiveThreads =
Optional.of(Thread.activeCount());
}
}
+ checkProducerLeak();
}
/**
@@ -160,6 +161,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase
{
throw ex;
}
}
+ checkProducerLeak();
}
@Test
@@ -202,6 +204,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase
{
assertExactlyOnceForTopic(createProperties(), topic, 0,
Arrays.asList(42, 43));
deleteTestTopic(topic);
+ checkProducerLeak();
}
@Test
@@ -262,32 +265,32 @@ public class FlinkKafkaProducerITCase extends
KafkaTestBase {
public void testFailBeforeNotifyAndResumeWorkAfterwards() throws
Exception {
String topic = "flink-kafka-producer-fail-before-notify";
- OneInputStreamOperatorTestHarness<Integer, Object> testHarness
= createTestHarness(topic);
-
- testHarness.setup();
- testHarness.open();
- testHarness.processElement(42, 0);
- testHarness.snapshot(0, 1);
- testHarness.processElement(43, 2);
- OperatorSubtaskState snapshot1 = testHarness.snapshot(1, 3);
+ OneInputStreamOperatorTestHarness<Integer, Object> testHarness1
= createTestHarness(topic);
+ checkProducerLeak();
+ testHarness1.setup();
+ testHarness1.open();
+ testHarness1.processElement(42, 0);
+ testHarness1.snapshot(0, 1);
+ testHarness1.processElement(43, 2);
+ OperatorSubtaskState snapshot1 = testHarness1.snapshot(1, 3);
- testHarness.processElement(44, 4);
- testHarness.snapshot(2, 5);
- testHarness.processElement(45, 6);
+ testHarness1.processElement(44, 4);
+ testHarness1.snapshot(2, 5);
+ testHarness1.processElement(45, 6);
// do not close previous testHarness to make sure that closing
do not clean up something (in case of failure
// there might not be any close)
- testHarness = createTestHarness(topic);
- testHarness.setup();
+ OneInputStreamOperatorTestHarness<Integer, Object> testHarness2
= createTestHarness(topic);
+ testHarness2.setup();
// restore from snapshot1, transactions with records 44 and 45
should be aborted
- testHarness.initializeState(snapshot1);
- testHarness.open();
+ testHarness2.initializeState(snapshot1);
+ testHarness2.open();
// write and commit more records, after potentially lingering
transactions
- testHarness.processElement(46, 7);
- testHarness.snapshot(4, 8);
- testHarness.processElement(47, 9);
- testHarness.notifyOfCompletedCheckpoint(4);
+ testHarness2.processElement(46, 7);
+ testHarness2.snapshot(4, 8);
+ testHarness2.processElement(47, 9);
+ testHarness2.notifyOfCompletedCheckpoint(4);
//now we should have:
// - records 42 and 43 in committed transactions
@@ -296,8 +299,18 @@ public class FlinkKafkaProducerITCase extends
KafkaTestBase {
// - pending transaction with record 47
assertExactlyOnceForTopic(createProperties(), topic, 0,
Arrays.asList(42, 43, 46));
- testHarness.close();
+ try {
+ testHarness1.close();
+ } catch (Exception e) {
+ // The only acceptable exception is
ProducerFencedException because testHarness2 uses the same
+ // transactional ID.
+ if (!(e.getCause() instanceof ProducerFencedException))
{
+ fail("Received unexpected exception " + e);
+ }
+ }
+ testHarness2.close();
deleteTestTopic(topic);
+ checkProducerLeak();
}
@Test
@@ -345,6 +358,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase
{
// - aborted transactions with records 44 and 45
assertExactlyOnceForTopic(createProperties(), topic, 0,
Arrays.asList(42, 43));
deleteTestTopic(topic);
+ checkProducerLeak();
}
/**
@@ -403,6 +417,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase
{
closeIgnoringProducerFenced(operatorToClose);
}
deleteTestTopic(topic);
+ checkProducerLeak();
}
/**
@@ -466,6 +481,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase
{
0,
IntStream.range(0, parallelism1 + parallelism2 +
parallelism3).boxed().collect(Collectors.toList()));
deleteTestTopic(topic);
+ checkProducerLeak();
}
private List<OperatorStateHandle> repartitionAndExecute(
@@ -539,6 +555,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase
{
assertExactlyOnceForTopic(createProperties(), topic, 0,
Arrays.asList(42));
deleteTestTopic(topic);
+ checkProducerLeak();
}
@Test
@@ -561,6 +578,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase
{
}
}
deleteTestTopic(topic);
+ checkProducerLeak();
}
@Test
@@ -605,6 +623,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase
{
testHarness.notifyOfCompletedCheckpoint(2);
testHarness.processElement(47, 9);
}
+ checkProducerLeak();
}
//
-----------------------------------------------------------------------------------------------------------------
@@ -695,4 +714,12 @@ public class FlinkKafkaProducerITCase extends
KafkaTestBase {
return false;
}
+ private void checkProducerLeak() {
+ for (Thread t : Thread.getAllStackTraces().keySet()) {
+ if
(t.getName().contains("kafka-producer-network-thread")) {
+ fail("Detected producer leak. Thread name: " +
t.getName());
+ }
+ }
+ }
+
}
diff --git
a/flink-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
b/flink-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
index fbeb110..2ce32f0 100644
---
a/flink-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
+++
b/flink-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
-log4j.rootLogger=INFO, testlogger
+log4j.rootLogger=OFF, testlogger
log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
log4j.appender.testlogger.target = System.err
@@ -24,7 +24,6 @@
log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
log4j.logger.org.apache.zookeeper=OFF, testlogger
log4j.logger.state.change.logger=OFF, testlogger
log4j.logger.kafka=OFF, testlogger