[
https://issues.apache.org/jira/browse/FLINK-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16686224#comment-16686224
]
ASF GitHub Bot commented on FLINK-10455:
----------------------------------------
pnowojski closed pull request #6989: [FLINK-10455][Kafka Tx] Close
transactional producers in case of failure and termination
URL: https://github.com/apache/flink/pull/6989
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index c7f84c36b6a..3e7cf2b1aca 100644
---
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -45,6 +45,7 @@
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@@ -671,6 +672,9 @@ public void close() throws FlinkKafka011Exception {
}
// make sure we propagate pending errors
checkErroneous();
+ pendingTransactions().forEach(transaction ->
+ IOUtils.closeQuietly(transaction.getValue().producer)
+ );
}
// ------------------- Logic for handling checkpoint flushing
-------------------------- //
@@ -713,8 +717,11 @@ protected void preCommit(KafkaTransactionState
transaction) throws FlinkKafka011
@Override
protected void commit(KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
- transaction.producer.commitTransaction();
- recycleTransactionalProducer(transaction.producer);
+ try {
+ transaction.producer.commitTransaction();
+ } finally {
+
recycleTransactionalProducer(transaction.producer);
+ }
}
}
diff --git
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index df1a4b5727f..10e8ef1713d 100644
---
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -45,6 +45,7 @@
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.IOUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@@ -673,6 +674,9 @@ public void close() throws FlinkKafkaException {
}
// make sure we propagate pending errors
checkErroneous();
+ pendingTransactions().forEach(transaction ->
+ IOUtils.closeQuietly(transaction.getValue().producer)
+ );
}
// ------------------- Logic for handling checkpoint flushing
-------------------------- //
@@ -715,8 +719,11 @@ protected void
preCommit(FlinkKafkaProducer.KafkaTransactionState transaction) t
@Override
protected void commit(FlinkKafkaProducer.KafkaTransactionState
transaction) {
if (transaction.isTransactional()) {
- transaction.producer.commitTransaction();
- recycleTransactionalProducer(transaction.producer);
+ try {
+ transaction.producer.commitTransaction();
+ } finally {
+
recycleTransactionalProducer(transaction.producer);
+ }
}
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
index d2735d566ee..e39335479f3 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java
@@ -39,20 +39,24 @@
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.util.FlinkRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.time.Clock;
+import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.stream.Stream;
import static java.util.Objects.requireNonNull;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -150,6 +154,12 @@ protected TXN currentTransaction() {
return currentTransactionHolder == null ? null :
currentTransactionHolder.handle;
}
+ @Nonnull
+ protected Stream<Map.Entry<Long, TXN>> pendingTransactions() {
+ return pendingCommitTransactions.entrySet().stream()
+ .map(e -> new AbstractMap.SimpleEntry<>(e.getKey(),
e.getValue().handle));
+ }
+
// ------ methods that should be implemented in child class to support
two phase commit algorithm ------
/**
@@ -257,6 +267,7 @@ public final void notifyCheckpointComplete(long
checkpointId) throws Exception {
Iterator<Map.Entry<Long, TransactionHolder<TXN>>>
pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();
checkState(pendingTransactionIterator.hasNext(), "checkpoint
completed, but no transaction pending");
+ Throwable firstError = null;
while (pendingTransactionIterator.hasNext()) {
Map.Entry<Long, TransactionHolder<TXN>> entry =
pendingTransactionIterator.next();
@@ -270,12 +281,23 @@ public final void notifyCheckpointComplete(long
checkpointId) throws Exception {
name(), checkpointId, pendingTransaction,
pendingTransactionCheckpointId);
logWarningIfTimeoutAlmostReached(pendingTransaction);
- commit(pendingTransaction.handle);
+ try {
+ commit(pendingTransaction.handle);
+ } catch (Throwable t) {
+ if (firstError == null) {
+ firstError = t;
+ }
+ }
LOG.debug("{} - committed checkpoint transaction {}",
name(), pendingTransaction);
pendingTransactionIterator.remove();
}
+
+ if (firstError != null) {
+ throw new FlinkRuntimeException("Committing one of
transactions failed, logging first encountered failure",
+ firstError);
+ }
}
@Override
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Potential Kafka producer leak in case of failures
> -------------------------------------------------
>
> Key: FLINK-10455
> URL: https://issues.apache.org/jira/browse/FLINK-10455
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Affects Versions: 1.5.2
> Reporter: Nico Kruber
> Assignee: Andrey Zagrebin
> Priority: Major
> Labels: pull-request-available
>
> If the Kafka brokers' timeout is too low for our checkpoint interval [1], we
> may get an {{ProducerFencedException}}. Documentation around
> {{ProducerFencedException}} explicitly states that we should close the
> producer after encountering it.
> By looking at the code, it doesn't seem like this is actually done in
> {{FlinkKafkaProducer011}}. Also, in case one transaction's commit in
> {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}} fails with an
> exception, we don't clean up (nor try to commit) any other transaction.
> -> from what I see, {{TwoPhaseCommitSinkFunction#notifyCheckpointComplete}}
> simply iterates over the {{pendingCommitTransactions}} which is not touched
> during {{close()}}
> Now if we restart the failing job on the same Flink cluster, any resources
> from the previous attempt will still linger around.
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html#kafka-011
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)