fapaul commented on a change in pull request #17019:
URL: https://github.com/apache/flink/pull/17019#discussion_r699070362
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PrioritizedOperatorSubtaskState.java
##########
@@ -192,7 +196,17 @@
* snapshots.
*/
public boolean isRestored() {
- return restored;
+ return restoredCheckpointId != null;
+ }
+
+ /**
+ * Returns non-empty if this was created for a restored operator, false
otherwise. Restored
Review comment:
Update the comment because it is not optional anymore?
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
##########
@@ -334,9 +348,9 @@ private void writeRecordsToKafka(
drainAllRecordsFromTopic(topic);
final long recordsCount = expectedRecords.get().get();
assertEquals(collectedRecords.size(), recordsCount);
- assertEquals(
+ assertThat(
deserializeValues(collectedRecords),
- LongStream.range(1, recordsCount +
1).boxed().collect(Collectors.toList()));
+ containsInAnyOrder(LongStream.range(1, recordsCount +
1).boxed().toArray()));
Review comment:
order?
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
##########
@@ -226,11 +236,12 @@ public void
testAbortTransactionsOfPendingCheckpointsAfterFailure() throws Excep
new FailingCheckpointMapper(failed, lastCheckpointedRecord),
config, "newPrefix");
final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
drainAllRecordsFromTopic(topic);
- assertEquals(
+ assertThat(
deserializeValues(collectedRecords),
- LongStream.range(1, lastCheckpointedRecord.get().get() + 1)
- .boxed()
- .collect(Collectors.toList()));
+ containsInAnyOrder(
Review comment:
Shouldn't the order be guaranteed?
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
##########
@@ -254,11 +265,12 @@ public void
testAbortTransactionsAfterScaleInBeforeFirstCheckpoint() throws Exce
new FailingCheckpointMapper(failed, lastCheckpointedRecord),
config, null);
final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
drainAllRecordsFromTopic(topic);
- assertEquals(
+ assertThat(
deserializeValues(collectedRecords),
- LongStream.range(1, lastCheckpointedRecord.get().get() + 1)
- .boxed()
- .collect(Collectors.toList()));
+ containsInAnyOrder(
Review comment:
Same here what about the order?
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionAborter.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * Aborts lingering transactions on restart.
+ *
+ * <p>Transactions are lingering if they are not tracked anywhere. For
example, if a job is started
+ * transactions are opened. A restart without checkpoint would not allow Flink
to abort old
+ * transactions. Since Kafka's transactions are sequential, newly produced
data does not become
+ * visible for read_committed consumers. However, Kafka has no API for
querying open transactions,
+ * so they become lingering.
+ *
+ * <p>Flink solves this by assuming consecutive transaction ids. On restart of
checkpoint C on
+ * subtask S, it will sequentially cancel transaction C+1, C+2, ... of S until
it finds the first
+ * unused transaction.
+ *
+ * <p>Additionally, to cover for weird downscaling cases without checkpoints,
it also checks for
+ * transactions of subtask S+P where P is the current parallelism until it
finds a subtask without
+ * transactions.
+ */
+class TransactionAborter {
+ private final int subtaskId;
+ private final int parallelism;
+ private final Function<String, FlinkKafkaInternalProducer<byte[], byte[]>>
producerFactory;
+ @Nullable FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
+
+ public TransactionAborter(
+ int subtaskId,
+ int parallelism,
+ Function<String, FlinkKafkaInternalProducer<byte[], byte[]>>
producerFactory) {
+ this.subtaskId = subtaskId;
+ this.parallelism = parallelism;
+ this.producerFactory = producerFactory;
+ }
+
+ void abortLingeringTransactions(List<String> prefixesToAbort, long
startCheckpointId) {
+ for (String prefix : prefixesToAbort) {
+ abortTransactionsWithPrefix(prefix, startCheckpointId);
+ }
+ }
+
+ /**
+ * Aborts all transactions that have been created by this subtask in a
previous run.
+ *
+ * <p>It also aborts transactions from subtasks that may have been removed
because of
+ * downscaling.
+ *
+ * <p>When Flink downscales X subtasks to Y subtasks, then subtask i is
responsible for cleaning
+ * all subtasks j in [0; X), where j % Y = i. For example, if we downscale
to 2, then subtask 0
+ * is responsible for all even and subtask 1 for all odd subtasks.
+ */
+ void abortTransactionsWithPrefix(String prefix, long startCheckpointId) {
+ for (int subtaskId = this.subtaskId; ; subtaskId += parallelism) {
+ if (abortTransactionOfSubtask(prefix, startCheckpointId,
subtaskId) == 0) {
+ // If Flink didn't abort any transaction for current subtask,
then we assume that no
+ // such subtask existed and no subtask with a higher number as
well.
+ break;
+ }
+ }
+ }
+
+ /**
+ * Aborts all transactions that have been created by a subtask in a
previous run after the given
+ * checkpoint id.
+ *
+ * <p>We assume that transaction ids are consecutively used and thus Flink
can stop aborting as
+ * soon as Flink notices that a particular transaction id was unused.
+ */
+ int abortTransactionOfSubtask(String prefix, long startCheckpointId, int
subtaskId) {
Review comment:
why package-private?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitRetrier.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.clock.Clock;
+import org.apache.flink.util.clock.SystemClock;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Retries the committables of a {@link CommitterHandler} until all
committables are eventually
+ * committed.
+ */
+public class CommitRetrier {
+ private final ProcessingTimeService processingTimeService;
+ private final CommitterHandler<?, ?> committerHandler;
+ private final Clock clock;
+ public static final int RETRY_DELAY = 1000;
Review comment:
Nit: Does it need to be public?
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
##########
@@ -47,14 +47,23 @@
"org.apache.kafka.clients.producer.internals.TransactionManager$State";
private static final String PRODUCER_ID_AND_EPOCH_FIELD_NAME =
"producerIdAndEpoch";
- private final Properties kafkaProducerConfig;
- @Nullable private final String transactionalId;
+ @Nullable private String transactionalId;
private volatile boolean inTransaction;
- public FlinkKafkaInternalProducer(Properties properties) {
- super(properties);
- this.kafkaProducerConfig = properties;
- this.transactionalId =
properties.getProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
+ public FlinkKafkaInternalProducer(Properties properties, @Nullable String
transactionalId) {
+ super(withTransactionId(properties, transactionalId));
+ this.transactionalId = transactionalId;
+ }
+
+ private static Properties withTransactionId(
Review comment:
`withTransactionalId` to keep it consistent
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionAborter.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * Aborts lingering transactions on restart.
+ *
+ * <p>Transactions are lingering if they are not tracked anywhere. For
example, if a job is started
+ * transactions are opened. A restart without checkpoint would not allow Flink
to abort old
+ * transactions. Since Kafka's transactions are sequential, newly produced
data does not become
+ * visible for read_committed consumers. However, Kafka has no API for
querying open transactions,
+ * so they become lingering.
+ *
+ * <p>Flink solves this by assuming consecutive transaction ids. On restart of
checkpoint C on
+ * subtask S, it will sequentially cancel transaction C+1, C+2, ... of S until
it finds the first
+ * unused transaction.
+ *
+ * <p>Additionally, to cover for weird downscaling cases without checkpoints,
it also checks for
+ * transactions of subtask S+P where P is the current parallelism until it
finds a subtask without
+ * transactions.
+ */
+class TransactionAborter {
+ private final int subtaskId;
+ private final int parallelism;
+ private final Function<String, FlinkKafkaInternalProducer<byte[], byte[]>>
producerFactory;
+ @Nullable FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
+
+ public TransactionAborter(
+ int subtaskId,
+ int parallelism,
+ Function<String, FlinkKafkaInternalProducer<byte[], byte[]>>
producerFactory) {
+ this.subtaskId = subtaskId;
+ this.parallelism = parallelism;
+ this.producerFactory = producerFactory;
+ }
+
+ void abortLingeringTransactions(List<String> prefixesToAbort, long
startCheckpointId) {
+ for (String prefix : prefixesToAbort) {
+ abortTransactionsWithPrefix(prefix, startCheckpointId);
+ }
+ }
+
+ /**
+ * Aborts all transactions that have been created by this subtask in a
previous run.
+ *
+ * <p>It also aborts transactions from subtasks that may have been removed
because of
+ * downscaling.
+ *
+ * <p>When Flink downscales X subtasks to Y subtasks, then subtask i is
responsible for cleaning
+ * all subtasks j in [0; X), where j % Y = i. For example, if we downscale
to 2, then subtask 0
+ * is responsible for all even and subtask 1 for all odd subtasks.
+ */
+ void abortTransactionsWithPrefix(String prefix, long startCheckpointId) {
Review comment:
why package-private?
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -141,13 +138,15 @@
} catch (Exception e) {
throw new FlinkRuntimeException("Cannot initialize schema.", e);
}
- this.kafkaWriterState =
- recoverAndInitializeState(checkNotNull(recoveredStates,
"recoveredStates"));
disableMetrics =
kafkaProducerConfig.containsKey(KEY_DISABLE_METRICS)
&& Boolean.parseBoolean(
kafkaProducerConfig.getProperty(KEY_DISABLE_METRICS));
- this.currentProducer = createProducer();
+ lastCheckpointId =
sinkInitContext.getRestoredCheckpointId().orElse(-1) + 1;
Review comment:
It seems I have seen different starting checkpoint ids. Does it start
with 0 or 1? 1 feels more natural to me because afaik Flink also has no
checkpoint 0.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TransactionAborter.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.sink;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * Aborts lingering transactions on restart.
+ *
+ * <p>Transactions are lingering if they are not tracked anywhere. For
example, if a job is started
+ * transactions are opened. A restart without checkpoint would not allow Flink
to abort old
+ * transactions. Since Kafka's transactions are sequential, newly produced
data does not become
+ * visible for read_committed consumers. However, Kafka has no API for
querying open transactions,
+ * so they become lingering.
+ *
+ * <p>Flink solves this by assuming consecutive transaction ids. On restart of
checkpoint C on
+ * subtask S, it will sequentially cancel transaction C+1, C+2, ... of S until
it finds the first
+ * unused transaction.
+ *
+ * <p>Additionally, to cover for weird downscaling cases without checkpoints,
it also checks for
+ * transactions of subtask S+P where P is the current parallelism until it
finds a subtask without
+ * transactions.
+ */
+class TransactionAborter {
+ private final int subtaskId;
+ private final int parallelism;
+ private final Function<String, FlinkKafkaInternalProducer<byte[], byte[]>>
producerFactory;
+ @Nullable FlinkKafkaInternalProducer<byte[], byte[]> producer = null;
+
+ public TransactionAborter(
+ int subtaskId,
+ int parallelism,
+ Function<String, FlinkKafkaInternalProducer<byte[], byte[]>>
producerFactory) {
+ this.subtaskId = subtaskId;
+ this.parallelism = parallelism;
+ this.producerFactory = producerFactory;
Review comment:
`checkNotNull`
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
##########
@@ -37,39 +37,41 @@
*/
class KafkaCommitter implements Committer<KafkaCommittable> {
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaCommitter.class);
+
private final Properties kafkaProducerConfig;
KafkaCommitter(Properties kafkaProducerConfig) {
this.kafkaProducerConfig = kafkaProducerConfig;
}
- private static final Logger LOG =
LoggerFactory.getLogger(KafkaCommitter.class);
-
@Override
public List<KafkaCommittable> commit(List<KafkaCommittable> committables)
throws IOException {
- committables.forEach(this::commitTransaction);
- return Collections.emptyList();
+ List<KafkaCommittable> retryableCommittables = new ArrayList<>();
+ for (KafkaCommittable committable : committables) {
+ final String transactionalId = committable.getTransactionalId();
+ LOG.debug("Committing Kafka transaction {}", transactionalId);
+ try (FlinkKafkaInternalProducer<?, ?> producer =
+ committable.getProducer().orElseGet(() ->
createProducer(committable))) {
+ producer.commitTransaction();
+ } catch (ProducerFencedException | InvalidTxnStateException e) {
+ // That means we have committed this transaction before.
+ LOG.warn(
+ "Encountered error {} while recovering transaction {}.
"
+ + "Presumably this transaction has been
already committed before",
+ e,
+ committable);
+ } catch (Throwable e) {
+ LOG.info("Cannot commit Kafka transaction, retrying.", e);
Review comment:
Nit: Should this be a warning?
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -78,16 +79,16 @@
private final Callback deliveryCallback;
private final AtomicLong pendingRecords = new AtomicLong();
private final KafkaRecordSerializationSchema.KafkaSinkContext
kafkaSinkContext;
- private final List<FlinkKafkaInternalProducer<byte[], byte[]>> producers =
new ArrayList<>();
private final Map<String, KafkaMetricMutableWrapper>
previouslyCreatedMetrics = new HashMap<>();
private final SinkWriterMetricGroup metricGroup;
private final Counter numBytesOutCounter;
private final Sink.ProcessingTimeService timeService;
private transient Metric byteOutMetric;
Review comment:
Nit: also remove the transient?
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -138,15 +144,30 @@
} catch (Exception e) {
throw new FlinkRuntimeException("Cannot initialize schema.", e);
}
- disableMetrics =
+
+ this.kafkaWriterState = new KafkaWriterState(transactionalIdPrefix);
+ this.disableMetrics =
kafkaProducerConfig.containsKey(KEY_DISABLE_METRICS)
&& Boolean.parseBoolean(
kafkaProducerConfig.getProperty(KEY_DISABLE_METRICS));
- lastCheckpointId =
sinkInitContext.getRestoredCheckpointId().orElse(-1) + 1;
- abortLingeringTransactions(
- checkNotNull(recoveredStates, "recoveredStates"),
lastCheckpointId);
- this.kafkaWriterState = new KafkaWriterState(transactionalIdPrefix);
- this.currentProducer = createProducer(lastCheckpointId);
+
+ this.lastCheckpointId =
sinkInitContext.getRestoredCheckpointId().orElse(-1);
+ if (deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
+ abortLingeringTransactions(
+ checkNotNull(recoveredStates, "recoveredStates"),
lastCheckpointId + 1);
+ }
+ if (this.deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
+ this.currentProducer = getTransactionalProducer(lastCheckpointId +
1);
+ this.currentProducer.beginTransaction();
Review comment:
Merge two condition branches?
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -183,62 +181,83 @@ public void close() throws Exception {
closer.close();
}
- private KafkaWriterState recoverAndInitializeState(List<KafkaWriterState>
recoveredStates) {
- final int subtaskId = kafkaSinkContext.getParallelInstanceId();
- if (recoveredStates.isEmpty()) {
- final KafkaWriterState state =
- new KafkaWriterState(transactionalIdPrefix, subtaskId, 0);
- abortTransactions(getTransactionsToAbort(state, new
ArrayList<>()));
- return state;
+ private void abortLingeringTransactions(
+ List<KafkaWriterState> recoveredStates, long startCheckpointId) {
+ List<String> prefixesToAbort =
Lists.newArrayList(transactionalIdPrefix);
+
+ if (!recoveredStates.isEmpty()) {
+ KafkaWriterState lastState = recoveredStates.get(0);
+ if
(!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {
Review comment:
Why do we only abort transactions if the prefix has changed?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitRetrier.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.clock.Clock;
+import org.apache.flink.util.clock.SystemClock;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Retries the committables of a {@link CommitterHandler} until all
committables are eventually
+ * committed.
+ */
+public class CommitRetrier {
Review comment:
Add `@Internal`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]