XComp commented on a change in pull request #17152:
URL: https://github.com/apache/flink/pull/17152#discussion_r702662226
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
##########
@@ -350,4 +365,17 @@ private static void transitionTransactionManagerStateTo(
Object transactionManager, String state) {
invoke(transactionManager, "transitionTo",
getTransactionManagerState(state));
}
+
+ @Override
+ public String toString() {
+ return "FlinkKafkaInternalProducer{"
+ + "transactionalId='"
+ + transactionalId
+ + '\''
+ + ", inTransaction="
Review comment:
```suggestion
+ "', inTransaction="
```
nit
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
##########
@@ -64,19 +65,31 @@
.<FlinkKafkaInternalProducer<?,
?>>map(Recyclable::getObject)
.orElseGet(() ->
getRecoveryProducer(committable));
producer.commitTransaction();
- recyclable.ifPresent(Recyclable::close);
- } catch (ProducerFencedException | InvalidTxnStateException e) {
- // That means we have committed this transaction before.
+ } catch (RetriableException e) {
LOG.warn(
- "Encountered error {} while recovering transaction {}.
"
- + "Presumably this transaction has been
already committed before",
- e,
- committable);
- recyclable.ifPresent(Recyclable::close);
- } catch (Throwable e) {
- LOG.warn("Cannot commit Kafka transaction, retrying.", e);
+ "Encountered retriable exception while committing
{}.", transactionalId, e);
retryableCommittables.add(committable);
+ continue;
+ } catch (ProducerFencedException e) {
+ // initTransaction has been called on this transaction before
+ LOG.error(
+ "Transactions {} timed out or was overridden and data
has been potentially lost.",
Review comment:
```suggestion
"Transaction {} timed out or was overridden and data
has been potentially lost.",
```
nit
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -125,12 +122,17 @@
this.kafkaProducerConfig = checkNotNull(kafkaProducerConfig,
"kafkaProducerConfig");
this.transactionalIdPrefix = checkNotNull(transactionalIdPrefix,
"transactionalIdPrefix");
this.recordSerializer = checkNotNull(recordSerializer,
"recordSerializer");
+ MailboxExecutor mailboxExecutor = sinkInitContext.getMailboxExecutor();
this.deliveryCallback =
(metadata, exception) -> {
- if (exception != null && producerAsyncException == null) {
- producerAsyncException = exception;
+ if (exception != null) {
+ mailboxExecutor.execute(
Review comment:
Can you shortly explain what throwing an `RuntimeException` on the
mailbox executor would trigger?
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -325,18 +318,17 @@ private void flushRecords(boolean finalFlush) {
producer = new FlinkKafkaInternalProducer<>(kafkaProducerConfig,
transactionalId);
closer.register(producer);
producer.initTransactions();
- initMetrics(producer);
+ initKafkaMetrics(producer);
} else {
producer.initTransactionId(transactionalId);
}
return producer;
}
- private void initMetrics(FlinkKafkaInternalProducer<byte[], byte[]>
producer) {
+ private void initKafkaMetrics(FlinkKafkaInternalProducer<byte[], byte[]>
producer) {
byteOutMetric =
MetricUtil.getKafkaMetric(
producer.metrics(), "producer-metrics",
"outgoing-byte-total");
Review comment:
Should we move the metrics labels into constants? At least,
`producer-metrics` is used multiple times within `KafkaWriter`.
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java
##########
@@ -31,12 +34,37 @@
/** Tests for {@link KafkaCommitter}. */
public class KafkaCommitterTest extends TestLogger {
+ private static final int PRODUCER_ID = 0;
+ private static final short EPOCH = 0;
+ private static final String TRANSACTIONAL_ID = "transactionalId";
+
+ /** Causes a network error by inactive broker and tests that a retry will
happen. */
@Test
- public void testRetryCommittableOnFailure() throws IOException {
- final KafkaCommitter committer = new KafkaCommitter(new Properties());
- final short epoch = 0;
+ public void testRetryCommittableOnRetriableError() throws IOException {
+ final KafkaCommitter committer = new KafkaCommitter(getProperties());
final List<KafkaCommittable> committables =
- Collections.singletonList(new KafkaCommittable(0, epoch,
"transactionalId", null));
+ Collections.singletonList(
+ new KafkaCommittable(PRODUCER_ID, EPOCH,
TRANSACTIONAL_ID, null));
assertEquals(committables, committer.commit(committables));
}
+
+ /** Causes a permanent error by misconfiguration. */
+ @Test
+ public void testRetryCommittableOnFatalError() throws IOException {
+ final KafkaCommitter committer = new KafkaCommitter(new Properties());
Review comment:
```suggestion
@Test
public void testRetryCommittableOnFatalError() throws IOException {
// Causes a permanent error by misconfiguration.
final KafkaCommitter committer = new KafkaCommitter(new
Properties());
```
nit: the comment actually describes the behaviour caused by the `new
Properties()` parameter value. It doesn't describe the test itself which is
totally fine because of the test method being descriptive enough.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -206,14 +208,26 @@ public void write(IN element, Context context) throws
IOException {
@Override
public void close() throws Exception {
+ closed = true;
+ closeAll(
+ this::abortCurrentProducer,
+ closer,
+ producerPool::clear,
+ () -> {
+ checkState(currentProducer.isClosed());
+ currentProducer = null;
+ });
+ }
+
+ private void abortCurrentProducer() {
if (currentProducer.isInTransaction()) {
- currentProducer.abortTransaction();
+ try {
+ currentProducer.abortTransaction();
+ } catch (ProducerFencedException e) {
+ LOG.trace(
Review comment:
Does this happen that often that we shouldn't use `debug` here?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
##########
@@ -72,6 +73,7 @@ public CommitterOperator(
public void initializeState(StateInitializationContext context) throws
Exception {
super.initializeState(context);
committerHandler.initializeState(context);
+ commitRetrier.retryWithDelay();
Review comment:
Why is it necessary to retry the handler during initialization of the
operator? To me as a reader, it isn't that obviously clear. Would it make sense
to add a comment here?
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
##########
@@ -64,19 +65,31 @@
.<FlinkKafkaInternalProducer<?,
?>>map(Recyclable::getObject)
.orElseGet(() ->
getRecoveryProducer(committable));
producer.commitTransaction();
- recyclable.ifPresent(Recyclable::close);
- } catch (ProducerFencedException | InvalidTxnStateException e) {
- // That means we have committed this transaction before.
+ } catch (RetriableException e) {
LOG.warn(
- "Encountered error {} while recovering transaction {}.
"
- + "Presumably this transaction has been
already committed before",
- e,
- committable);
- recyclable.ifPresent(Recyclable::close);
- } catch (Throwable e) {
- LOG.warn("Cannot commit Kafka transaction, retrying.", e);
+ "Encountered retriable exception while committing
{}.", transactionalId, e);
retryableCommittables.add(committable);
+ continue;
+ } catch (ProducerFencedException e) {
+ // initTransaction has been called on this transaction before
+ LOG.error(
+ "Transactions {} timed out or was overridden and data
has been potentially lost.",
+ transactionalId,
+ e);
+ } catch (InvalidTxnStateException e) {
+ // This exception only occurs when aborting after a commit or
vice versa.
+ // It does not appear on double commits or double aborts.
+ LOG.error(
+ "Transaction {} was previously canceled and data has
been lost.",
+ committable,
+ e);
+ } catch (Exception e) {
+ LOG.error(
+ "Transaction {} encountered error and data has been
potentially lost.",
+ committable,
+ e);
}
+ recyclable.ifPresent(Recyclable::close);
Review comment:
Why can we close the producer if we might want to retry committing the
Committables? 🤔
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
##########
@@ -197,12 +212,28 @@ public void testRecoveryWithAtLeastOnceGuarantee() throws
Exception {
public void testRecoveryWithExactlyOnceGuarantee() throws Exception {
testRecoveryWithAssertion(
DeliveryGuarantee.EXACTLY_ONCE,
+ 1,
+ (records) ->
+ assertThat(
+ records,
+ contains(
+ LongStream.range(1,
lastCheckpointedRecord.get().get() + 1)
+ .boxed()
+ .toArray())));
+ }
+
+ @Test
+ public void testRecoveryWithExactlyOnceGuaranteeAndConcurrentCheckpoints()
throws Exception {
Review comment:
I would have expected this test to fail without your changes considering
that we ran into issues when doing the release testing in
[FLINK-23850](https://issues.apache.org/jira/browse/FLINK-23850) resulting in
[FLINK-24151](https://issues.apache.org/jira/browse/FLINK-24151). But reverting
all you changes and only applying the `KafkaSinkITCase` diff didn't result in
any failure on my local machine after 80 runs of
`testRecoveryWithExactlyOnceGuaranteeAndConcurrentCheckpoints`
Is this expected? 🤔
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java
##########
@@ -31,12 +34,37 @@
/** Tests for {@link KafkaCommitter}. */
public class KafkaCommitterTest extends TestLogger {
+ private static final int PRODUCER_ID = 0;
+ private static final short EPOCH = 0;
+ private static final String TRANSACTIONAL_ID = "transactionalId";
+
+ /** Causes a network error by inactive broker and tests that a retry will
happen. */
@Test
- public void testRetryCommittableOnFailure() throws IOException {
- final KafkaCommitter committer = new KafkaCommitter(new Properties());
- final short epoch = 0;
+ public void testRetryCommittableOnRetriableError() throws IOException {
+ final KafkaCommitter committer = new KafkaCommitter(getProperties());
Review comment:
```suggestion
// causes a network error by inactive broker
final KafkaCommitter committer = new KafkaCommitter(getProperties());
```
nit: I would remove the test method comment. The method name is descriptive
enough to show that we're testing the retry here. Why the configuration
returned by `getProperties()` causes the expected behavior is valuable, though,
and should be added to the respective line.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -155,7 +157,7 @@
}
this.kafkaWriterState = new KafkaWriterState(transactionalIdPrefix);
- this.lastCheckpointId =
sinkInitContext.getRestoredCheckpointId().orElse(-1);
+ this.lastCheckpointId =
sinkInitContext.getRestoredCheckpointId().orElse(0);
Review comment:
Or should we use
`CheckpointConfig.DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA` instead? 🤔
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
##########
@@ -64,19 +65,31 @@
.<FlinkKafkaInternalProducer<?,
?>>map(Recyclable::getObject)
.orElseGet(() ->
getRecoveryProducer(committable));
producer.commitTransaction();
- recyclable.ifPresent(Recyclable::close);
- } catch (ProducerFencedException | InvalidTxnStateException e) {
- // That means we have committed this transaction before.
+ } catch (RetriableException e) {
LOG.warn(
- "Encountered error {} while recovering transaction {}.
"
- + "Presumably this transaction has been
already committed before",
- e,
- committable);
- recyclable.ifPresent(Recyclable::close);
- } catch (Throwable e) {
- LOG.warn("Cannot commit Kafka transaction, retrying.", e);
+ "Encountered retriable exception while committing
{}.", transactionalId, e);
retryableCommittables.add(committable);
+ continue;
+ } catch (ProducerFencedException e) {
+ // initTransaction has been called on this transaction before
+ LOG.error(
+ "Transactions {} timed out or was overridden and data
has been potentially lost.",
+ transactionalId,
+ e);
+ } catch (InvalidTxnStateException e) {
+ // This exception only occurs when aborting after a commit or
vice versa.
+ // It does not appear on double commits or double aborts.
+ LOG.error(
+ "Transaction {} was previously canceled and data has
been lost.",
+ committable,
+ e);
+ } catch (Exception e) {
+ LOG.error(
+ "Transaction {} encountered error and data has been
potentially lost.",
+ committable,
+ e);
}
+ recyclable.ifPresent(Recyclable::close);
Review comment:
Could we add a test for the closing to` KafkaCommitterTest`?
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -165,20 +167,20 @@
|| deliveryGuarantee == DeliveryGuarantee.NONE) {
this.currentProducer = new
FlinkKafkaInternalProducer<>(this.kafkaProducerConfig, null);
closer.register(this.currentProducer);
- initMetrics(this.currentProducer);
+ initKafkaMetrics(this.currentProducer);
} else {
throw new UnsupportedOperationException(
"Unsupported Kafka writer semantic " +
this.deliveryGuarantee);
}
+
+ metricGroup.setCurrentSendTimeGauge(() -> computeSendTime());
Review comment:
Would it make sense to have a separate method `initFlinkMetrics` to
cover that line analogously to `initKafkaMetrics`?
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
##########
@@ -123,6 +130,13 @@
private SharedReference<AtomicBoolean> failed;
private SharedReference<AtomicLong> lastCheckpointedRecord;
+ @Parameterized.Parameter public int run;
+
+ @Parameterized.Parameters
+ public static Set<Integer> getConfigurations() {
Review comment:
Would it make sense to add some comment here describing the intention of
`run`? `run` is never used in any of the methods. I guess, the intention is
that we want to loop over the tests for some reason? `getConfigurations` is a
quite generic term to describe the intention. Maybe, there's a better name
(like `getTestRunCounter()`) to describe what this method returns?
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -155,7 +157,7 @@
}
this.kafkaWriterState = new KafkaWriterState(transactionalIdPrefix);
- this.lastCheckpointId =
sinkInitContext.getRestoredCheckpointId().orElse(-1);
+ this.lastCheckpointId =
sinkInitContext.getRestoredCheckpointId().orElse(0);
Review comment:
We change it to `0` because the checkpoint ID counting starts at `1`.
That knowledge was a bit hidden in
[StandaloneCheckpointIDCounter:34](https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java#L34),
[ZooKeeperCheckpointIDCounter:86](https://github.com/apache/flink/blob/79936be37dff2756f3829f89deec00a676db323d/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java#L86)
or
[KubernetesCheckpointIDCounter:167](https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointIDCounter.java#L167).
Can't we move this value in a single place like
`CheckpointConfig.DEFAULT_INITIAL_CHECKPOINT_ID` and use it here as well like
`CheckpointConfig.DEFAULT_INITIAL_CHECKPOINT_ID - 1`? This would make the
intention of this
literal `0` more obvious.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkOperator.java
##########
@@ -104,6 +105,9 @@
private final MailboxExecutor mailboxExecutor;
private Counter numRecordsOutCounter;
+ // record endOfInput state to avoid duplicate prepareCommit on final
notifyCheckpointComplete
+ // once FLIP-147 is fully operational all endOfInput processing needs to
be removed
Review comment:
Is there a ticket for the removal which could be mentioned in the
comment?
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
##########
@@ -64,19 +65,31 @@
.<FlinkKafkaInternalProducer<?,
?>>map(Recyclable::getObject)
.orElseGet(() ->
getRecoveryProducer(committable));
producer.commitTransaction();
- recyclable.ifPresent(Recyclable::close);
- } catch (ProducerFencedException | InvalidTxnStateException e) {
- // That means we have committed this transaction before.
+ } catch (RetriableException e) {
LOG.warn(
- "Encountered error {} while recovering transaction {}.
"
- + "Presumably this transaction has been
already committed before",
- e,
- committable);
- recyclable.ifPresent(Recyclable::close);
- } catch (Throwable e) {
- LOG.warn("Cannot commit Kafka transaction, retrying.", e);
+ "Encountered retriable exception while committing
{}.", transactionalId, e);
retryableCommittables.add(committable);
+ continue;
+ } catch (ProducerFencedException e) {
+ // initTransaction has been called on this transaction before
+ LOG.error(
+ "Transactions {} timed out or was overridden and data
has been potentially lost.",
+ transactionalId,
+ e);
+ } catch (InvalidTxnStateException e) {
+ // This exception only occurs when aborting after a commit or
vice versa.
+ // It does not appear on double commits or double aborts.
+ LOG.error(
+ "Transaction {} was previously canceled and data has
been lost.",
+ committable,
+ e);
+ } catch (Exception e) {
Review comment:
Just to be sure we don't miss anything here: Why don't we handle
`Throwable` here anymore?
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -189,7 +191,7 @@ public void write(IN element, Context context) throws
IOException {
final List<KafkaCommittable> committables =
Collections.singletonList(
KafkaCommittable.of(currentProducer,
producerPool::add));
- LOG.info("Committing {} committables.", committables);
+ LOG.debug("Committing {} committables {}.", committables, flush);
Review comment:
This log message might be confusing. What does the `flush` flag say in
the end? I would suggest making the log message more descriptive.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]