This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.8 by this push:
new 765eda1 [FLINK-10455][Connectors/Kafka] Ensure all the KafkaProducers
closed on an exception
765eda1 is described below
commit 765eda1ec412192ace4635892c7b459e5dcd7bfb
Author: Jiangjie (Becket) Qin <[email protected]>
AuthorDate: Tue Jun 11 16:47:37 2019 +0800
[FLINK-10455][Connectors/Kafka] Ensure all the KafkaProducers closed on an
exception
The patch fixes the bugs reported in FLINK-10445 by making sure all the
KafkaProducers are closed when FlinkKafkaProducer is closed. The same fix was
applied to universal FlinkKafkaProducer and FlinkKafkaProducer011.
---
.../connectors/kafka/FlinkKafkaProducer011.java | 52 +++++++++--------
.../kafka/FlinkKafkaProducer011ITCase.java | 67 +++++++++++++++-------
.../src/test/resources/log4j-test.properties | 3 +-
.../connectors/kafka/FlinkKafkaProducer.java | 53 +++++++++--------
.../connectors/kafka/FlinkKafkaProducerITCase.java | 67 +++++++++++++++-------
.../src/test/resources/log4j-test.properties | 3 +-
6 files changed, 155 insertions(+), 90 deletions(-)
diff --git
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 8b3cccd..5d22cc5 100644
---
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -650,33 +650,39 @@ public class FlinkKafkaProducer011<IN>
@Override
public void close() throws FlinkKafka011Exception {
- final KafkaTransactionState currentTransaction =
currentTransaction();
- if (currentTransaction != null) {
- // to avoid exceptions on aborting transactions with
some pending records
- flush(currentTransaction);
-
- // normal abort for AT_LEAST_ONCE and NONE do not clean
up resources because of producer reusing, thus
- // we need to close it manually
- switch (semantic) {
- case EXACTLY_ONCE:
- break;
- case AT_LEAST_ONCE:
- case NONE:
- currentTransaction.producer.close();
- break;
- }
- }
+ // First close the producer for current transaction.
try {
+ final KafkaTransactionState currentTransaction =
currentTransaction();
+ if (currentTransaction != null) {
+ // to avoid exceptions on aborting transactions
with some pending records
+ flush(currentTransaction);
+
+ // normal abort for AT_LEAST_ONCE and NONE do
not clean up resources because of producer reusing, thus
+ // we need to close it manually
+ switch (semantic) {
+ case EXACTLY_ONCE:
+ break;
+ case AT_LEAST_ONCE:
+ case NONE:
+
currentTransaction.producer.close();
+ break;
+ }
+ }
super.close();
- }
- catch (Exception e) {
+ } catch (Exception e) {
asyncException = ExceptionUtils.firstOrSuppressed(e,
asyncException);
+ } finally {
+ // We may have to close producer of the current
transaction in case some exception was thrown before
+ // the normal close routine finishes.
+ if (currentTransaction() != null) {
+
IOUtils.closeQuietly(currentTransaction().producer);
+ }
+ // Make sure all the producers for pending transactions
are closed.
+ pendingTransactions().forEach(transaction ->
IOUtils.closeQuietly(transaction.getValue().producer)
+ );
+ // make sure we propagate pending errors
+ checkErroneous();
}
- // make sure we propagate pending errors
- checkErroneous();
- pendingTransactions().forEach(transaction ->
- IOUtils.closeQuietly(transaction.getValue().producer)
- );
}
// ------------------- Logic for handling checkpoint flushing
-------------------------- //
diff --git
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
index 5e4b0d5..0932d42 100644
---
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
+++
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
@@ -115,6 +115,7 @@ public class FlinkKafkaProducer011ITCase extends
KafkaTestBaseWithFlink {
else {
initialActiveThreads =
Optional.of(Thread.activeCount());
}
+ checkProducerLeak();
}
}
@@ -163,6 +164,7 @@ public class FlinkKafkaProducer011ITCase extends
KafkaTestBaseWithFlink {
throw ex;
}
}
+ checkProducerLeak();
}
@Test
@@ -205,6 +207,7 @@ public class FlinkKafkaProducer011ITCase extends
KafkaTestBaseWithFlink {
assertExactlyOnceForTopic(createProperties(), topic, 0,
Arrays.asList(42, 43));
deleteTestTopic(topic);
+ checkProducerLeak();
}
/**
@@ -216,32 +219,32 @@ public class FlinkKafkaProducer011ITCase extends
KafkaTestBaseWithFlink {
public void testFailBeforeNotifyAndResumeWorkAfterwards() throws
Exception {
String topic = "flink-kafka-producer-fail-before-notify";
- OneInputStreamOperatorTestHarness<Integer, Object> testHarness
= createTestHarness(topic);
+ OneInputStreamOperatorTestHarness<Integer, Object> testHarness1
= createTestHarness(topic);
+ checkProducerLeak();
+ testHarness1.setup();
+ testHarness1.open();
+ testHarness1.processElement(42, 0);
+ testHarness1.snapshot(0, 1);
+ testHarness1.processElement(43, 2);
+ OperatorSubtaskState snapshot1 = testHarness1.snapshot(1, 3);
- testHarness.setup();
- testHarness.open();
- testHarness.processElement(42, 0);
- testHarness.snapshot(0, 1);
- testHarness.processElement(43, 2);
- OperatorSubtaskState snapshot1 = testHarness.snapshot(1, 3);
-
- testHarness.processElement(44, 4);
- testHarness.snapshot(2, 5);
- testHarness.processElement(45, 6);
+ testHarness1.processElement(44, 4);
+ testHarness1.snapshot(2, 5);
+ testHarness1.processElement(45, 6);
// do not close previous testHarness to make sure that closing
do not clean up something (in case of failure
// there might not be any close)
- testHarness = createTestHarness(topic);
- testHarness.setup();
+ OneInputStreamOperatorTestHarness<Integer, Object> testHarness2
= createTestHarness(topic);
+ testHarness2.setup();
// restore from snapshot1, transactions with records 44 and 45
should be aborted
- testHarness.initializeState(snapshot1);
- testHarness.open();
+ testHarness2.initializeState(snapshot1);
+ testHarness2.open();
// write and commit more records, after potentially lingering
transactions
- testHarness.processElement(46, 7);
- testHarness.snapshot(4, 8);
- testHarness.processElement(47, 9);
- testHarness.notifyOfCompletedCheckpoint(4);
+ testHarness2.processElement(46, 7);
+ testHarness2.snapshot(4, 8);
+ testHarness2.processElement(47, 9);
+ testHarness2.notifyOfCompletedCheckpoint(4);
//now we should have:
// - records 42 and 43 in committed transactions
@@ -250,8 +253,18 @@ public class FlinkKafkaProducer011ITCase extends
KafkaTestBaseWithFlink {
// - pending transaction with record 47
assertExactlyOnceForTopic(createProperties(), topic, 0,
Arrays.asList(42, 43, 46));
- testHarness.close();
+ try {
+ testHarness1.close();
+ } catch (Exception e) {
+ // The only acceptable exception is
ProducerFencedException because testHarness2 uses the same
+ // transactional ID.
+ if (!(e.getCause() instanceof ProducerFencedException))
{
+ fail("Received unexpected exception " + e);
+ }
+ }
+ testHarness2.close();
deleteTestTopic(topic);
+ checkProducerLeak();
}
@Test
@@ -299,6 +312,7 @@ public class FlinkKafkaProducer011ITCase extends
KafkaTestBaseWithFlink {
// - aborted transactions with records 44 and 45
assertExactlyOnceForTopic(createProperties(), topic, 0,
Arrays.asList(42, 43));
deleteTestTopic(topic);
+ checkProducerLeak();
}
/**
@@ -357,6 +371,7 @@ public class FlinkKafkaProducer011ITCase extends
KafkaTestBaseWithFlink {
closeIgnoringProducerFenced(operatorToClose);
}
deleteTestTopic(topic);
+ checkProducerLeak();
}
/**
@@ -424,6 +439,7 @@ public class FlinkKafkaProducer011ITCase extends
KafkaTestBaseWithFlink {
0,
IntStream.range(0, parallelism1 + parallelism2 +
parallelism3).boxed().collect(Collectors.toList()));
deleteTestTopic(topic);
+ checkProducerLeak();
}
private OperatorSubtaskState repartitionAndExecute(
@@ -498,6 +514,7 @@ public class FlinkKafkaProducer011ITCase extends
KafkaTestBaseWithFlink {
assertExactlyOnceForTopic(createProperties(), topic, 0,
Arrays.asList(42));
deleteTestTopic(topic);
+ checkProducerLeak();
}
@Test
@@ -520,6 +537,7 @@ public class FlinkKafkaProducer011ITCase extends
KafkaTestBaseWithFlink {
}
}
deleteTestTopic(topic);
+ checkProducerLeak();
}
@Test
@@ -564,6 +582,7 @@ public class FlinkKafkaProducer011ITCase extends
KafkaTestBaseWithFlink {
testHarness.notifyOfCompletedCheckpoint(2);
testHarness.processElement(47, 9);
}
+ checkProducerLeak();
}
// shut down a Kafka broker
@@ -651,4 +670,12 @@ public class FlinkKafkaProducer011ITCase extends
KafkaTestBaseWithFlink {
}
return false;
}
+
+ private void checkProducerLeak() {
+ for (Thread t : Thread.getAllStackTraces().keySet()) {
+ if
(t.getName().contains("kafka-producer-network-thread")) {
+ fail("Detected producer leak. Thread name: " +
t.getName());
+ }
+ }
+ }
}
diff --git
a/flink-connectors/flink-connector-kafka-0.11/src/test/resources/log4j-test.properties
b/flink-connectors/flink-connector-kafka-0.11/src/test/resources/log4j-test.properties
index fbeb110..2ce32f0 100644
---
a/flink-connectors/flink-connector-kafka-0.11/src/test/resources/log4j-test.properties
+++
b/flink-connectors/flink-connector-kafka-0.11/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
-log4j.rootLogger=INFO, testlogger
+log4j.rootLogger=OFF, testlogger
log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
log4j.appender.testlogger.target = System.err
@@ -24,7 +24,6 @@
log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
log4j.logger.org.apache.zookeeper=OFF, testlogger
log4j.logger.state.change.logger=OFF, testlogger
log4j.logger.kafka=OFF, testlogger
diff --git
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 9eb2df8..d7c8129 100644
---
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -652,33 +652,40 @@ public class FlinkKafkaProducer<IN>
@Override
public void close() throws FlinkKafkaException {
- final FlinkKafkaProducer.KafkaTransactionState
currentTransaction = currentTransaction();
- if (currentTransaction != null) {
- // to avoid exceptions on aborting transactions with
some pending records
- flush(currentTransaction);
-
- // normal abort for AT_LEAST_ONCE and NONE do not clean
up resources because of producer reusing, thus
- // we need to close it manually
- switch (semantic) {
- case EXACTLY_ONCE:
- break;
- case AT_LEAST_ONCE:
- case NONE:
- currentTransaction.producer.close();
- break;
- }
- }
+ // First close the producer for current transaction.
try {
+ final KafkaTransactionState currentTransaction =
currentTransaction();
+ if (currentTransaction != null) {
+ // to avoid exceptions on aborting transactions
with some pending records
+ flush(currentTransaction);
+
+ // normal abort for AT_LEAST_ONCE and NONE do
not clean up resources because of producer reusing, thus
+ // we need to close it manually
+ switch (semantic) {
+ case EXACTLY_ONCE:
+ break;
+ case AT_LEAST_ONCE:
+ case NONE:
+
currentTransaction.producer.close();
+ break;
+ }
+ }
super.close();
- }
- catch (Exception e) {
+ } catch (Exception e) {
asyncException = ExceptionUtils.firstOrSuppressed(e,
asyncException);
+ } finally {
+ // We may have to close producer of the current
transaction in case some exception was thrown before
+ // the normal close routine finishes.
+ if (currentTransaction() != null) {
+
IOUtils.closeQuietly(currentTransaction().producer);
+ }
+ // Make sure all the producers for pending transactions
are closed.
+ pendingTransactions().forEach(transaction ->
+
IOUtils.closeQuietly(transaction.getValue().producer)
+ );
+ // make sure we propagate pending errors
+ checkErroneous();
}
- // make sure we propagate pending errors
- checkErroneous();
- pendingTransactions().forEach(transaction ->
- IOUtils.closeQuietly(transaction.getValue().producer)
- );
}
// ------------------- Logic for handling checkpoint flushing
-------------------------- //
diff --git
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
index 80e62c2..1097fd6 100644
---
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
+++
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java
@@ -112,6 +112,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase
{
initialActiveThreads =
Optional.of(Thread.activeCount());
}
}
+ checkProducerLeak();
}
/**
@@ -159,6 +160,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase
{
throw ex;
}
}
+ checkProducerLeak();
}
@Test
@@ -201,6 +203,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase
{
assertExactlyOnceForTopic(createProperties(), topic, 0,
Arrays.asList(42, 43));
deleteTestTopic(topic);
+ checkProducerLeak();
}
/**
@@ -212,32 +215,32 @@ public class FlinkKafkaProducerITCase extends
KafkaTestBase {
public void testFailBeforeNotifyAndResumeWorkAfterwards() throws
Exception {
String topic = "flink-kafka-producer-fail-before-notify";
- OneInputStreamOperatorTestHarness<Integer, Object> testHarness
= createTestHarness(topic);
+ OneInputStreamOperatorTestHarness<Integer, Object> testHarness1
= createTestHarness(topic);
+ checkProducerLeak();
+ testHarness1.setup();
+ testHarness1.open();
+ testHarness1.processElement(42, 0);
+ testHarness1.snapshot(0, 1);
+ testHarness1.processElement(43, 2);
+ OperatorSubtaskState snapshot1 = testHarness1.snapshot(1, 3);
- testHarness.setup();
- testHarness.open();
- testHarness.processElement(42, 0);
- testHarness.snapshot(0, 1);
- testHarness.processElement(43, 2);
- OperatorSubtaskState snapshot1 = testHarness.snapshot(1, 3);
-
- testHarness.processElement(44, 4);
- testHarness.snapshot(2, 5);
- testHarness.processElement(45, 6);
+ testHarness1.processElement(44, 4);
+ testHarness1.snapshot(2, 5);
+ testHarness1.processElement(45, 6);
// do not close previous testHarness to make sure that closing
do not clean up something (in case of failure
// there might not be any close)
- testHarness = createTestHarness(topic);
- testHarness.setup();
+ OneInputStreamOperatorTestHarness<Integer, Object> testHarness2
= createTestHarness(topic);
+ testHarness2.setup();
// restore from snapshot1, transactions with records 44 and 45
should be aborted
- testHarness.initializeState(snapshot1);
- testHarness.open();
+ testHarness2.initializeState(snapshot1);
+ testHarness2.open();
// write and commit more records, after potentially lingering
transactions
- testHarness.processElement(46, 7);
- testHarness.snapshot(4, 8);
- testHarness.processElement(47, 9);
- testHarness.notifyOfCompletedCheckpoint(4);
+ testHarness2.processElement(46, 7);
+ testHarness2.snapshot(4, 8);
+ testHarness2.processElement(47, 9);
+ testHarness2.notifyOfCompletedCheckpoint(4);
//now we should have:
// - records 42 and 43 in committed transactions
@@ -246,8 +249,18 @@ public class FlinkKafkaProducerITCase extends
KafkaTestBase {
// - pending transaction with record 47
assertExactlyOnceForTopic(createProperties(), topic, 0,
Arrays.asList(42, 43, 46));
- testHarness.close();
+ try {
+ testHarness1.close();
+ } catch (Exception e) {
+ // The only acceptable exception is
ProducerFencedException because testHarness2 uses the same
+ // transactional ID.
+ if (!(e.getCause() instanceof ProducerFencedException))
{
+ fail("Received unexpected exception " + e);
+ }
+ }
+ testHarness2.close();
deleteTestTopic(topic);
+ checkProducerLeak();
}
@Test
@@ -295,6 +308,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase
{
// - aborted transactions with records 44 and 45
assertExactlyOnceForTopic(createProperties(), topic, 0,
Arrays.asList(42, 43));
deleteTestTopic(topic);
+ checkProducerLeak();
}
/**
@@ -353,6 +367,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase
{
closeIgnoringProducerFenced(operatorToClose);
}
deleteTestTopic(topic);
+ checkProducerLeak();
}
/**
@@ -420,6 +435,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase
{
0,
IntStream.range(0, parallelism1 + parallelism2 +
parallelism3).boxed().collect(Collectors.toList()));
deleteTestTopic(topic);
+ checkProducerLeak();
}
private OperatorSubtaskState repartitionAndExecute(
@@ -494,6 +510,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase
{
assertExactlyOnceForTopic(createProperties(), topic, 0,
Arrays.asList(42));
deleteTestTopic(topic);
+ checkProducerLeak();
}
@Test
@@ -516,6 +533,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase
{
}
}
deleteTestTopic(topic);
+ checkProducerLeak();
}
@Test
@@ -560,6 +578,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase
{
testHarness.notifyOfCompletedCheckpoint(2);
testHarness.processElement(47, 9);
}
+ checkProducerLeak();
}
//
-----------------------------------------------------------------------------------------------------------------
@@ -650,4 +669,12 @@ public class FlinkKafkaProducerITCase extends
KafkaTestBase {
return false;
}
+ private void checkProducerLeak() {
+ for (Thread t : Thread.getAllStackTraces().keySet()) {
+ if
(t.getName().contains("kafka-producer-network-thread")) {
+ fail("Detected producer leak. Thread name: " +
t.getName());
+ }
+ }
+ }
+
}
diff --git
a/flink-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
b/flink-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
index fbeb110..2ce32f0 100644
---
a/flink-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
+++
b/flink-connectors/flink-connector-kafka/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
-log4j.rootLogger=INFO, testlogger
+log4j.rootLogger=OFF, testlogger
log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
log4j.appender.testlogger.target = System.err
@@ -24,7 +24,6 @@
log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
log4j.logger.org.apache.zookeeper=OFF, testlogger
log4j.logger.state.change.logger=OFF, testlogger
log4j.logger.kafka=OFF, testlogger