fapaul commented on a change in pull request #17019:
URL: https://github.com/apache/flink/pull/17019#discussion_r697362882
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedInitializationContext.java
##########
@@ -33,11 +35,16 @@
*/
public interface ManagedInitializationContext {
+ /** Returns true, if state was restored from the snapshot of a previous
execution. */
+ default boolean isRestored() {
+ return getRestoredCheckpointId() != null;
Review comment:
Shouldn't this be `getRestoredCheckpointId().isPresent()`?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java
##########
@@ -31,9 +31,17 @@
public interface StreamOperatorStateContext {
/**
- * Returns true, the states provided by this context are restored from a
checkpoint/savepoint.
+ * Returns true if the states provided by this context are restored from a
checkpoint/savepoint.
*/
- boolean isRestored();
+ default boolean isRestored() {
Review comment:
Can we pull out a common interface for these two methods and use for
this class and `ManagedInitializationContext`
##########
File path:
flink-core/src/main/java/org/apache/flink/api/connector/sink/SinkWriter.java
##########
@@ -53,26 +55,43 @@
* <p>This method is intended for advanced sinks that propagate watermarks.
*
* @param watermark The watermark.
+ *
* @throws IOException if fail to add a watermark.
*/
- default void writeWatermark(Watermark watermark) throws IOException,
InterruptedException {}
+ default void writeWatermark(Watermark watermark) throws IOException,
InterruptedException {
+ }
/**
* Prepare for a commit.
*
* <p>This will be called before we checkpoint the Writer's state in
Streaming execution mode.
*
* @param flush Whether flushing the un-staged data or not
+ *
* @return The data is ready to commit.
+ *
* @throws IOException if fail to prepare for a commit.
*/
List<CommT> prepareCommit(boolean flush) throws IOException,
InterruptedException;
/**
* @return The writer's state.
+ *
+ * @throws IOException if fail to snapshot writer's state.
+ * @deprecated implement {@link #snapshotState(long)}
+ */
+ default List<WriterStateT> snapshotState() throws IOException {
+ return Collections.emptyList();
+ }
+
+ /**
+ * @return The writer's state.
+ *
* @throws IOException if fail to snapshot writer's state.
*/
- List<WriterStateT> snapshotState() throws IOException;
+ default List<WriterStateT> snapshotState(long checkpointId) throws
IOException {
Review comment:
Should we implement a test to ensure the `checkpointId` is correctly
passed to the writer?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
##########
@@ -91,12 +106,14 @@ public void snapshotState(StateSnapshotContext context)
throws Exception {
@Override
public void endInput() throws Exception {
emitCommittables(committerHandler.endOfInput());
+ retryer.retryIndefinitely();
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
emitCommittables(committerHandler.notifyCheckpointCompleted(checkpointId));
+ retryer.retryWithDelay();
Review comment:
Not sure I like this approach, this means on every checkpoint we enqueue
a trigger in the mailbox. In the case of a very small checkpoint interval and
long commit durations, this overhead is significant.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitRetryer.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+
+import java.io.IOException;
+
+/**
+ * Retries the committables of a {@link CommitterHandler} until all
committables are eventually
+ * committed.
+ */
+public class CommitRetryer<InputT, OutputT> {
+ private final ProcessingTimeService processingTimeService;
+ private final CommitterHandler<InputT, OutputT> committerHandler;
+ public static final int RETRY_DELAY = 1000;
+
+ public CommitRetryer(
+ ProcessingTimeService processingTimeService,
+ CommitterHandler<InputT, OutputT> committerHandler) {
+ this.processingTimeService = processingTimeService;
+ this.committerHandler = committerHandler;
+ }
+
+ public void retryWithDelay() {
+ if (committerHandler.needsRetry()) {
+ processingTimeService.registerTimer(
+ RETRY_DELAY,
+ ts -> {
+ if (retry(1)) {
+ retryWithDelay();
+ }
+ });
+ }
+ }
+
+ public void retryIndefinitely() throws IOException, InterruptedException {
+ retry(Long.MAX_VALUE);
+ }
+
+ private boolean retry(long tries) throws IOException, InterruptedException
{
+ for (long i = 0; i < tries; i++) {
+ if (!committerHandler.needsRetry()) {
+ return false;
+ }
+ committerHandler.retry();
+ }
+ return true;
Review comment:
Nit: return `!committerHandler.needsRetry()`
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalStreamingCommitterHandler.java
##########
@@ -60,27 +53,21 @@ public GlobalStreamingCommitterHandler(
super(committableSerializer);
this.globalCommitter = checkNotNull(globalCommitter);
- this.recoveredGlobalCommittables = new ArrayList<>();
this.endOfInput = false;
}
@Override
- void recoveredCommittables(List<GlobalCommT> committables) throws
IOException {
- final List<GlobalCommT> recovered =
-
globalCommitter.filterRecoveredCommittables(checkNotNull(committables));
- recoveredGlobalCommittables.addAll(recovered);
+ protected void recoveredCommittables(List<GlobalCommT> committables)
throws IOException {
+ super.recoveredCommittables(
+
globalCommitter.filterRecoveredCommittables(checkNotNull(committables)));
}
@Override
List<GlobalCommT> prepareCommit(List<CommT> input) throws IOException {
- checkNotNull(input);
Review comment:
Nit: this check was removed
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -174,6 +174,9 @@ public void write(IN element, Context context) throws
IOException {
@Override
public void close() throws Exception {
+ if (currentProducer.isInTransaction()) {
Review comment:
Can we test this behavior?
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
##########
@@ -37,39 +36,41 @@
*/
class KafkaCommitter implements Committer<KafkaCommittable> {
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaCommitter.class);
+
private final Properties kafkaProducerConfig;
KafkaCommitter(Properties kafkaProducerConfig) {
this.kafkaProducerConfig = kafkaProducerConfig;
}
- private static final Logger LOG =
LoggerFactory.getLogger(KafkaCommitter.class);
-
@Override
public List<KafkaCommittable> commit(List<KafkaCommittable> committables)
throws IOException {
- committables.forEach(this::commitTransaction);
- return Collections.emptyList();
+ List<KafkaCommittable> retryableCommittables = new ArrayList<>();
+ for (KafkaCommittable committable : committables) {
+ final String transactionalId = committable.getTransactionalId();
+ LOG.debug("Committing Kafka transaction {}", transactionalId);
+ try (FlinkKafkaInternalProducer<?, ?> producer =
+ committable.getProducer().orElseGet(() ->
createProducer(committable))) {
+ producer.commitTransaction();
+ } catch (ProducerFencedException e) {
Review comment:
Do we also want to retry the `InvalidTxnStateException`? If the
transaction is already in a different state we can probably also abandon it.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -164,13 +163,12 @@ public void write(IN element, Context context) throws
IOException {
@Override
public List<KafkaCommittable> prepareCommit(boolean flush) {
flushRecords(flush);
- List<KafkaCommittable> committables = precommit();
- currentProducer = createProducer();
- return committables;
+ return precommit();
}
@Override
- public List<KafkaWriterState> snapshotState() throws IOException {
+ public List<KafkaWriterState> snapshotState(long checkpointId) throws
IOException {
+ currentProducer = createProducer(checkpointId);
Review comment:
It is unfortunate that we have to create the new producer in
snapshotState. Although it is probably not possible that `SinkWriter#write` is
invoked between `snapshotState` and `preCommit` but it leaves room for silent
errors which are hard to detect in the future.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -183,62 +181,83 @@ public void close() throws Exception {
closer.close();
}
- private KafkaWriterState recoverAndInitializeState(List<KafkaWriterState>
recoveredStates) {
- final int subtaskId = kafkaSinkContext.getParallelInstanceId();
- if (recoveredStates.isEmpty()) {
- final KafkaWriterState state =
- new KafkaWriterState(transactionalIdPrefix, subtaskId, 0);
- abortTransactions(getTransactionsToAbort(state, new
ArrayList<>()));
- return state;
+ private void abortLingeringTransactions(
+ List<KafkaWriterState> recoveredStates, long startCheckpointId) {
+ List<String> prefixesToAbort =
Lists.newArrayList(transactionalIdPrefix);
+
+ if (!recoveredStates.isEmpty()) {
+ KafkaWriterState lastState = recoveredStates.get(0);
+ if
(!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {
Review comment:
Shouldn't the condition be inverted. If the transactional id prefix has
changed we need to abort something.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -183,62 +181,83 @@ public void close() throws Exception {
closer.close();
}
- private KafkaWriterState recoverAndInitializeState(List<KafkaWriterState>
recoveredStates) {
- final int subtaskId = kafkaSinkContext.getParallelInstanceId();
- if (recoveredStates.isEmpty()) {
- final KafkaWriterState state =
- new KafkaWriterState(transactionalIdPrefix, subtaskId, 0);
- abortTransactions(getTransactionsToAbort(state, new
ArrayList<>()));
- return state;
+ private void abortLingeringTransactions(
+ List<KafkaWriterState> recoveredStates, long startCheckpointId) {
+ List<String> prefixesToAbort =
Lists.newArrayList(transactionalIdPrefix);
+
+ if (!recoveredStates.isEmpty()) {
+ KafkaWriterState lastState = recoveredStates.get(0);
+ if
(!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {
+ prefixesToAbort.add(lastState.getTransactionalIdPrefix());
+ LOG.warn(
+ "Transactional id prefix from previous execution {}
has changed to {}.",
+ lastState.getTransactionalIdPrefix(),
+ transactionalIdPrefix);
+ }
}
- final Map<Integer, KafkaWriterState> taskOffsetMapping =
- recoveredStates.stream()
- .collect(
- Collectors.toMap(
- KafkaWriterState::getSubtaskId,
Function.identity()));
- checkState(
- taskOffsetMapping.containsKey(subtaskId),
- "Internal error: It is expected that state from previous
executions is distributed to the same subtask id.");
- final KafkaWriterState lastState = taskOffsetMapping.get(subtaskId);
- taskOffsetMapping.remove(subtaskId);
- abortTransactions(
- getTransactionsToAbort(lastState, new
ArrayList<>(taskOffsetMapping.values())));
- if
(!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {
- LOG.warn(
- "Transactional id prefix from previous execution {} has
changed to {}.",
- lastState.getTransactionalIdPrefix(),
- transactionalIdPrefix);
- return new KafkaWriterState(transactionalIdPrefix, subtaskId, 0);
+
+ final Properties properties = new Properties();
+ properties.putAll(kafkaProducerConfig);
+ properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "dummy");
+ try (FlinkKafkaInternalProducer<byte[], byte[]> producer =
+ new FlinkKafkaInternalProducer<>(properties)) {
+ for (String prefix : prefixesToAbort) {
+ abortTransactionsWithPrefix(producer, prefix,
startCheckpointId);
+ }
}
- return new KafkaWriterState(
- transactionalIdPrefix, subtaskId,
lastState.getTransactionalIdOffset());
}
- private void abortTransactions(List<String> transactionsToAbort) {
- transactionsToAbort.forEach(
- transaction -> {
- // don't mess with the original configuration or any other
- // properties of the
- // original object
- // -> create an internal kafka producer on our own and do
not rely
- // on
- // initTransactionalProducer().
- final Properties myConfig = new Properties();
- myConfig.putAll(kafkaProducerConfig);
- myConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
transaction);
- LOG.info("Aborting Kafka transaction {}.", transaction);
- FlinkKafkaInternalProducer<byte[], byte[]> kafkaProducer =
null;
- try {
- kafkaProducer = new
FlinkKafkaInternalProducer<>(myConfig);
- // it suffices to call initTransactions - this will
abort any
- // lingering transactions
- kafkaProducer.initTransactions();
- } finally {
- if (kafkaProducer != null) {
- kafkaProducer.close(Duration.ofSeconds(0));
- }
- }
- });
+ /**
+ * Aborts all transactions that have been created by this subtask in a
previous run.
+ *
+ * <p>It also aborts transactions from subtasks that may have been removed
because of
+ * downscaling.
+ *
+ * <p>When Flink downscales X subtasks to Y subtasks, then subtask i is
responsible for cleaning
+ * all subtasks j in [0; X), where j % Y = i. For example, if we downscale
to 2, then subtask 0
+ * is responsible for all even and subtask 1 for all odd subtasks.
+ */
+ private void abortTransactionsWithPrefix(
+ FlinkKafkaInternalProducer<byte[], byte[]> producer,
+ String prefix,
+ long startCheckpointId) {
+ final int p = kafkaSinkContext.getNumberOfParallelInstances();
+ for (int subtaskId = kafkaSinkContext.getParallelInstanceId(); ;
subtaskId += p) {
+ if (abortTransactionOfSubtask(producer, prefix, startCheckpointId,
subtaskId) == 0) {
+ // If Flink didn't abort any transaction for current subtask,
then we assume that no
+ // such subtask existed and no subtask with a higher number as
well.
+ break;
+ }
+ }
+ }
+
+ /**
+ * Aborts all transactions that have been created by a subtask in a
previous run after the given
+ * checkpoint id.
+ *
+ * <p>We assume that transaction ids are consecutively used and thus Flink
can stop aborting as
+ * soon as Flink notices that a particular transaction id was unused.
+ */
+ private int abortTransactionOfSubtask(
Review comment:
WDYT about moving the aborting logic to a separate class to make it
easier to test?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkOperator.java
##########
@@ -121,10 +127,12 @@
}
@Override
- public void open() throws Exception {
- super.open();
-
- this.currentWatermark = Long.MIN_VALUE;
+ public void setup(
+ StreamTask<?, ?> containingTask,
+ StreamConfig config,
+ Output<StreamRecord<byte[]>> output) {
+ super.setup(containingTask, config, output);
+ retryer = new CommitRetryer<>(processingTimeService, committerHandler);
Review comment:
Nit: `retryer` -> `commitRetrier` to make the context clear
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
##########
@@ -47,6 +48,7 @@
private final Properties kafkaProducerConfig;
@Nullable private final String transactionalId;
+ private boolean inTransaction;
Review comment:
Do we require this variable to be thread-safe? I wonder what happens if
the committer calls `commitTransaction` while the writer checks the transaction
state.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractCommitterHandler.java
##########
@@ -19,18 +19,58 @@
import org.apache.flink.util.function.SupplierWithException;
+import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
-abstract class AbstractCommitterHandler<InputT, OutputT>
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+abstract class AbstractCommitterHandler<InputT, OutputT, RecoverT>
implements CommitterHandler<InputT, OutputT> {
/** Record all the committables until commit. */
private final Deque<InputT> committables = new ArrayDeque<>();
+ /** The committables that need to be committed again after recovering from
a failover. */
+ private final List<RecoverT> recoveredCommittables = new ArrayList<>();
Review comment:
I see the risk that this list might increase constantly and cause an
uprising OOM error. If a lot of committables fail the periodic commit can be
too slow.
I think we either need to guard somehow against and warn the user or in
addition commit the retried committables also as part of the checkpoint.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitRetryer.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+
+import java.io.IOException;
+
+/**
+ * Retries the committables of a {@link CommitterHandler} until all
committables are eventually
+ * committed.
+ */
+public class CommitRetryer<InputT, OutputT> {
Review comment:
`CommitRetrier` ;)
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
##########
@@ -71,6 +70,18 @@ private void commitTransaction(KafkaCommittable committable)
{
}
}
+ /**
+ * Creates a producer that con commit into the same transaction as the
upstream producer that
+ * was serialized into {@link KafkaCommitter}.
+ */
+ private FlinkKafkaInternalProducer<?, ?> createProducer(KafkaCommittable
committable) {
+ FlinkKafkaInternalProducer<Object, Object> producer =
Review comment:
Nit: use `?, ?` instead of `Object, Object`
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalBatchCommitterHandlerTest.java
##########
@@ -44,16 +45,20 @@ public void throwExceptionWithoutCommitter() throws
Exception {
testHarness.initializeEmptyState();
}
- @Test(expected = UnsupportedOperationException.class)
- public void doNotSupportRetry() throws Exception {
+ @Test
+ public void supportRetry() throws Exception {
Review comment:
How do the tests work now that the retry is done by the timerservice?
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
##########
@@ -36,25 +34,28 @@
import java.time.Duration;
import java.util.Properties;
+import static org.apache.flink.util.Preconditions.checkState;
+
/**
* A {@link KafkaProducer} that exposes private fields to allow resume
producing from a given state.
*/
-class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K, V> {
+public class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K, V> {
Review comment:
Mark as `@Internal` if it needs to be public
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
##########
@@ -86,6 +87,11 @@ public boolean isInTransaction() {
return inTransaction;
}
+ @Override
+ public void close() {
+ super.close(Duration.ZERO);
+ }
Review comment:
Hmm, I did want to implement this because it might cause problems for
the `KafkaCommitter` because closing with ZERO does **not** ensure all pending
messages for the producer are emitted.
For the committer, this means if the network connection is slow it can never
commit because we do not call `flush()` explicitly and rather use the close
mechanism to do so. Before this change, it waited indefinitely.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -183,62 +181,83 @@ public void close() throws Exception {
closer.close();
}
- private KafkaWriterState recoverAndInitializeState(List<KafkaWriterState>
recoveredStates) {
- final int subtaskId = kafkaSinkContext.getParallelInstanceId();
- if (recoveredStates.isEmpty()) {
- final KafkaWriterState state =
- new KafkaWriterState(transactionalIdPrefix, subtaskId, 0);
- abortTransactions(getTransactionsToAbort(state, new
ArrayList<>()));
- return state;
+ private void abortLingeringTransactions(
+ List<KafkaWriterState> recoveredStates, long startCheckpointId) {
+ List<String> prefixesToAbort =
Lists.newArrayList(transactionalIdPrefix);
+
+ if (!recoveredStates.isEmpty()) {
+ KafkaWriterState lastState = recoveredStates.get(0);
+ if
(!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {
+ prefixesToAbort.add(lastState.getTransactionalIdPrefix());
+ LOG.warn(
+ "Transactional id prefix from previous execution {}
has changed to {}.",
+ lastState.getTransactionalIdPrefix(),
+ transactionalIdPrefix);
+ }
}
- final Map<Integer, KafkaWriterState> taskOffsetMapping =
- recoveredStates.stream()
- .collect(
- Collectors.toMap(
- KafkaWriterState::getSubtaskId,
Function.identity()));
- checkState(
- taskOffsetMapping.containsKey(subtaskId),
- "Internal error: It is expected that state from previous
executions is distributed to the same subtask id.");
- final KafkaWriterState lastState = taskOffsetMapping.get(subtaskId);
- taskOffsetMapping.remove(subtaskId);
- abortTransactions(
- getTransactionsToAbort(lastState, new
ArrayList<>(taskOffsetMapping.values())));
- if
(!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {
- LOG.warn(
- "Transactional id prefix from previous execution {} has
changed to {}.",
- lastState.getTransactionalIdPrefix(),
- transactionalIdPrefix);
- return new KafkaWriterState(transactionalIdPrefix, subtaskId, 0);
+
+ final Properties properties = new Properties();
+ properties.putAll(kafkaProducerConfig);
+ properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "dummy");
Review comment:
Is this correct?
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -144,6 +145,10 @@
this.kafkaWriterState =
recoverAndInitializeState(checkNotNull(recoveredStates,
"recoveredStates"));
this.currentProducer = createProducer();
+ disableMetrics =
Review comment:
👍
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -183,62 +181,83 @@ public void close() throws Exception {
closer.close();
}
- private KafkaWriterState recoverAndInitializeState(List<KafkaWriterState>
recoveredStates) {
- final int subtaskId = kafkaSinkContext.getParallelInstanceId();
- if (recoveredStates.isEmpty()) {
- final KafkaWriterState state =
- new KafkaWriterState(transactionalIdPrefix, subtaskId, 0);
- abortTransactions(getTransactionsToAbort(state, new
ArrayList<>()));
- return state;
+ private void abortLingeringTransactions(
+ List<KafkaWriterState> recoveredStates, long startCheckpointId) {
+ List<String> prefixesToAbort =
Lists.newArrayList(transactionalIdPrefix);
+
+ if (!recoveredStates.isEmpty()) {
+ KafkaWriterState lastState = recoveredStates.get(0);
+ if
(!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {
+ prefixesToAbort.add(lastState.getTransactionalIdPrefix());
+ LOG.warn(
+ "Transactional id prefix from previous execution {}
has changed to {}.",
+ lastState.getTransactionalIdPrefix(),
+ transactionalIdPrefix);
+ }
}
- final Map<Integer, KafkaWriterState> taskOffsetMapping =
- recoveredStates.stream()
- .collect(
- Collectors.toMap(
- KafkaWriterState::getSubtaskId,
Function.identity()));
- checkState(
- taskOffsetMapping.containsKey(subtaskId),
- "Internal error: It is expected that state from previous
executions is distributed to the same subtask id.");
- final KafkaWriterState lastState = taskOffsetMapping.get(subtaskId);
- taskOffsetMapping.remove(subtaskId);
- abortTransactions(
- getTransactionsToAbort(lastState, new
ArrayList<>(taskOffsetMapping.values())));
- if
(!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {
- LOG.warn(
- "Transactional id prefix from previous execution {} has
changed to {}.",
- lastState.getTransactionalIdPrefix(),
- transactionalIdPrefix);
- return new KafkaWriterState(transactionalIdPrefix, subtaskId, 0);
+
+ final Properties properties = new Properties();
+ properties.putAll(kafkaProducerConfig);
+ properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "dummy");
+ try (FlinkKafkaInternalProducer<byte[], byte[]> producer =
+ new FlinkKafkaInternalProducer<>(properties)) {
+ for (String prefix : prefixesToAbort) {
+ abortTransactionsWithPrefix(producer, prefix,
startCheckpointId);
+ }
}
- return new KafkaWriterState(
- transactionalIdPrefix, subtaskId,
lastState.getTransactionalIdOffset());
}
- private void abortTransactions(List<String> transactionsToAbort) {
- transactionsToAbort.forEach(
- transaction -> {
- // don't mess with the original configuration or any other
- // properties of the
- // original object
- // -> create an internal kafka producer on our own and do
not rely
- // on
- // initTransactionalProducer().
- final Properties myConfig = new Properties();
- myConfig.putAll(kafkaProducerConfig);
- myConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
transaction);
- LOG.info("Aborting Kafka transaction {}.", transaction);
- FlinkKafkaInternalProducer<byte[], byte[]> kafkaProducer =
null;
- try {
- kafkaProducer = new
FlinkKafkaInternalProducer<>(myConfig);
- // it suffices to call initTransactions - this will
abort any
- // lingering transactions
- kafkaProducer.initTransactions();
- } finally {
- if (kafkaProducer != null) {
- kafkaProducer.close(Duration.ofSeconds(0));
- }
- }
- });
+ /**
+ * Aborts all transactions that have been created by this subtask in a
previous run.
+ *
+ * <p>It also aborts transactions from subtasks that may have been removed
because of
+ * downscaling.
+ *
+ * <p>When Flink downscales X subtasks to Y subtasks, then subtask i is
responsible for cleaning
+ * all subtasks j in [0; X), where j % Y = i. For example, if we downscale
to 2, then subtask 0
+ * is responsible for all even and subtask 1 for all odd subtasks.
+ */
+ private void abortTransactionsWithPrefix(
Review comment:
The word `prefix` seems to be a bit overloaded because we already have
the `transactionalIdPrefix`
##########
File path: flink-connectors/flink-connector-kafka/pom.xml
##########
@@ -124,6 +124,12 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>junit-jupiter</artifactId>
+ <scope>test</scope>
+ </dependency>
+
Review comment:
This change probably does not belong here?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitRetryer.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+
+import java.io.IOException;
+
+/**
+ * Retries the committables of a {@link CommitterHandler} until all
committables are eventually
+ * committed.
+ */
+public class CommitRetryer<InputT, OutputT> {
Review comment:
The generic seems unnecessary you can pass `CommitterHandler<?, ?>` to
the constructor.
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLogITCase.java
##########
@@ -17,7 +17,8 @@
package org.apache.flink.connector.kafka.sink;
-import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+import
org.apache.flink.connector.kafka.sink.KafkaTransactionLog.TransactionRecord;
Review comment:
Nit: Do we need a test for a test class?
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/TransactionTest.java
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source;
+
+import org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer;
+
Review comment:
Should this class be part of the final PR?
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -327,41 +346,43 @@ private void flushRecords(boolean finalFlush) {
* For each checkpoint we create new {@link FlinkKafkaInternalProducer} so
that new transactions
* will not clash with transactions created during previous checkpoints
({@code
* producer.initTransactions()} assures that we obtain new producerId and
epoch counters).
+ *
+ * <p>Ensures that all transaction ids in between lastCheckpointId and
checkpointId are
+ * initialized.
*/
- private FlinkKafkaInternalProducer<byte[], byte[]>
createTransactionalProducer() {
- final long transactionalIdOffset =
kafkaWriterState.getTransactionalIdOffset() + 1;
+ private FlinkKafkaInternalProducer<byte[], byte[]>
createTransactionalProducer(
+ long checkpointId) {
+ checkState(
+ checkpointId > lastCheckpointId,
+ "Expected %s > %s",
+ checkpointId,
+ lastCheckpointId);
final Properties copiedProducerConfig = new Properties();
copiedProducerConfig.putAll(kafkaProducerConfig);
- initTransactionalProducerConfig(
- copiedProducerConfig,
- transactionalIdOffset,
- transactionalIdPrefix,
- kafkaSinkContext.getParallelInstanceId());
+ copiedProducerConfig.put(
+ ProducerConfig.TRANSACTIONAL_ID_CONFIG,
+ TransactionalIdFactory.buildTransactionalId(
+ transactionalIdPrefix,
+ kafkaSinkContext.getParallelInstanceId(),
+ lastCheckpointId + 1));
final FlinkKafkaInternalProducer<byte[], byte[]> producer =
new FlinkKafkaInternalProducer<>(copiedProducerConfig);
producer.initTransactions();
- kafkaWriterState =
- new KafkaWriterState(
- transactionalIdPrefix,
- kafkaSinkContext.getParallelInstanceId(),
- transactionalIdOffset);
- LOG.info(
- "Created new transactional producer {}",
-
copiedProducerConfig.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG));
+ // in case checkpoints have been aborted, Flink would create
non-consecutive transaction ids
+ // this loop ensures that all gaps are filled with initialized (empty)
transactions
+ for (long id = lastCheckpointId + 2; id <= checkpointId; id++) {
+ producer.setTransactionalId(
+ TransactionalIdFactory.buildTransactionalId(
+ transactionalIdPrefix,
+ kafkaSinkContext.getParallelInstanceId(),
+ lastCheckpointId + 1));
+ producer.initTransactions();
+ }
Review comment:
I do not understand why we have to do this every time we create a new
producer. Can you give a concrete example?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]