AHeise commented on a change in pull request #17019:
URL: https://github.com/apache/flink/pull/17019#discussion_r698508026
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalBatchCommitterHandlerTest.java
##########
@@ -44,16 +45,20 @@ public void throwExceptionWithoutCommitter() throws
Exception {
testHarness.initializeEmptyState();
}
- @Test(expected = UnsupportedOperationException.class)
- public void doNotSupportRetry() throws Exception {
+ @Test
+ public void supportRetry() throws Exception {
Review comment:
For global committer timer is not really used. For streaming committer
handler, we can use `TestProcessingTimeService` and manually advance the time
which triggers all respective callbacks. I have also added a more fine-grain
unit test with `ManualClock`.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
##########
@@ -47,6 +48,7 @@
private final Properties kafkaProducerConfig;
@Nullable private final String transactionalId;
+ private boolean inTransaction;
Review comment:
Good point. I was assuming that all accesses use the task thread, which
probably holds until we reach cancellation (where abortion really matters). So
I probably switch to volatile here - it's not on the hot path and gives safety.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -174,6 +174,9 @@ public void write(IN element, Context context) throws
IOException {
@Override
public void close() throws Exception {
+ if (currentProducer.isInTransaction()) {
Review comment:
I added a test. 👍
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
##########
@@ -86,6 +87,11 @@ public boolean isInTransaction() {
return inTransaction;
}
+ @Override
+ public void close() {
+ super.close(Duration.ZERO);
+ }
Review comment:
I added a `flush` in `KafkaWriter#close` to be safe. All other paths are
pretty much secured through `commitTransaction`.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
##########
@@ -36,25 +34,28 @@
import java.time.Duration;
import java.util.Properties;
+import static org.apache.flink.util.Preconditions.checkState;
+
/**
* A {@link KafkaProducer} that exposes private fields to allow resume
producing from a given state.
*/
-class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K, V> {
+public class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K, V> {
Review comment:
I think this is an oversight on my end.
##########
File path: flink-connectors/flink-connector-kafka/pom.xml
##########
@@ -124,6 +124,12 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>junit-jupiter</artifactId>
+ <scope>test</scope>
+ </dependency>
+
Review comment:
You are right that belongs in the commit with the JUnit5 test container
test.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -164,13 +163,12 @@ public void write(IN element, Context context) throws
IOException {
@Override
public List<KafkaCommittable> prepareCommit(boolean flush) {
flushRecords(flush);
- List<KafkaCommittable> committables = precommit();
- currentProducer = createProducer();
- return committables;
+ return precommit();
}
@Override
- public List<KafkaWriterState> snapshotState() throws IOException {
+ public List<KafkaWriterState> snapshotState(long checkpointId) throws
IOException {
+ currentProducer = createProducer(checkpointId);
Review comment:
I now explicitly nulled the `currentProducer`. That still leaves room in
the future for errors but at least not correctness issues.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -183,62 +181,83 @@ public void close() throws Exception {
closer.close();
}
- private KafkaWriterState recoverAndInitializeState(List<KafkaWriterState>
recoveredStates) {
- final int subtaskId = kafkaSinkContext.getParallelInstanceId();
- if (recoveredStates.isEmpty()) {
- final KafkaWriterState state =
- new KafkaWriterState(transactionalIdPrefix, subtaskId, 0);
- abortTransactions(getTransactionsToAbort(state, new
ArrayList<>()));
- return state;
+ private void abortLingeringTransactions(
+ List<KafkaWriterState> recoveredStates, long startCheckpointId) {
+ List<String> prefixesToAbort =
Lists.newArrayList(transactionalIdPrefix);
+
+ if (!recoveredStates.isEmpty()) {
+ KafkaWriterState lastState = recoveredStates.get(0);
+ if
(!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {
Review comment:
Isn't this exactly what's done here?
We usually only try to abort transactions with the current prefix. When we
detect a change, we additionally check for transactions with the old prefix.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -183,62 +181,83 @@ public void close() throws Exception {
closer.close();
}
- private KafkaWriterState recoverAndInitializeState(List<KafkaWriterState>
recoveredStates) {
- final int subtaskId = kafkaSinkContext.getParallelInstanceId();
- if (recoveredStates.isEmpty()) {
- final KafkaWriterState state =
- new KafkaWriterState(transactionalIdPrefix, subtaskId, 0);
- abortTransactions(getTransactionsToAbort(state, new
ArrayList<>()));
- return state;
+ private void abortLingeringTransactions(
+ List<KafkaWriterState> recoveredStates, long startCheckpointId) {
+ List<String> prefixesToAbort =
Lists.newArrayList(transactionalIdPrefix);
+
+ if (!recoveredStates.isEmpty()) {
+ KafkaWriterState lastState = recoveredStates.get(0);
+ if
(!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {
+ prefixesToAbort.add(lastState.getTransactionalIdPrefix());
+ LOG.warn(
+ "Transactional id prefix from previous execution {}
has changed to {}.",
+ lastState.getTransactionalIdPrefix(),
+ transactionalIdPrefix);
+ }
}
- final Map<Integer, KafkaWriterState> taskOffsetMapping =
- recoveredStates.stream()
- .collect(
- Collectors.toMap(
- KafkaWriterState::getSubtaskId,
Function.identity()));
- checkState(
- taskOffsetMapping.containsKey(subtaskId),
- "Internal error: It is expected that state from previous
executions is distributed to the same subtask id.");
- final KafkaWriterState lastState = taskOffsetMapping.get(subtaskId);
- taskOffsetMapping.remove(subtaskId);
- abortTransactions(
- getTransactionsToAbort(lastState, new
ArrayList<>(taskOffsetMapping.values())));
- if
(!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {
- LOG.warn(
- "Transactional id prefix from previous execution {} has
changed to {}.",
- lastState.getTransactionalIdPrefix(),
- transactionalIdPrefix);
- return new KafkaWriterState(transactionalIdPrefix, subtaskId, 0);
+
+ final Properties properties = new Properties();
+ properties.putAll(kafkaProducerConfig);
+ properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "dummy");
Review comment:
Yes, but we can solve it in a better way where this is not needed.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -183,62 +181,83 @@ public void close() throws Exception {
closer.close();
}
- private KafkaWriterState recoverAndInitializeState(List<KafkaWriterState>
recoveredStates) {
- final int subtaskId = kafkaSinkContext.getParallelInstanceId();
- if (recoveredStates.isEmpty()) {
- final KafkaWriterState state =
- new KafkaWriterState(transactionalIdPrefix, subtaskId, 0);
- abortTransactions(getTransactionsToAbort(state, new
ArrayList<>()));
- return state;
+ private void abortLingeringTransactions(
+ List<KafkaWriterState> recoveredStates, long startCheckpointId) {
+ List<String> prefixesToAbort =
Lists.newArrayList(transactionalIdPrefix);
+
+ if (!recoveredStates.isEmpty()) {
+ KafkaWriterState lastState = recoveredStates.get(0);
+ if
(!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {
+ prefixesToAbort.add(lastState.getTransactionalIdPrefix());
+ LOG.warn(
+ "Transactional id prefix from previous execution {}
has changed to {}.",
+ lastState.getTransactionalIdPrefix(),
+ transactionalIdPrefix);
+ }
}
- final Map<Integer, KafkaWriterState> taskOffsetMapping =
- recoveredStates.stream()
- .collect(
- Collectors.toMap(
- KafkaWriterState::getSubtaskId,
Function.identity()));
- checkState(
- taskOffsetMapping.containsKey(subtaskId),
- "Internal error: It is expected that state from previous
executions is distributed to the same subtask id.");
- final KafkaWriterState lastState = taskOffsetMapping.get(subtaskId);
- taskOffsetMapping.remove(subtaskId);
- abortTransactions(
- getTransactionsToAbort(lastState, new
ArrayList<>(taskOffsetMapping.values())));
- if
(!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {
- LOG.warn(
- "Transactional id prefix from previous execution {} has
changed to {}.",
- lastState.getTransactionalIdPrefix(),
- transactionalIdPrefix);
- return new KafkaWriterState(transactionalIdPrefix, subtaskId, 0);
+
+ final Properties properties = new Properties();
+ properties.putAll(kafkaProducerConfig);
+ properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "dummy");
+ try (FlinkKafkaInternalProducer<byte[], byte[]> producer =
+ new FlinkKafkaInternalProducer<>(properties)) {
+ for (String prefix : prefixesToAbort) {
+ abortTransactionsWithPrefix(producer, prefix,
startCheckpointId);
+ }
}
- return new KafkaWriterState(
- transactionalIdPrefix, subtaskId,
lastState.getTransactionalIdOffset());
}
- private void abortTransactions(List<String> transactionsToAbort) {
- transactionsToAbort.forEach(
- transaction -> {
- // don't mess with the original configuration or any other
- // properties of the
- // original object
- // -> create an internal kafka producer on our own and do
not rely
- // on
- // initTransactionalProducer().
- final Properties myConfig = new Properties();
- myConfig.putAll(kafkaProducerConfig);
- myConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
transaction);
- LOG.info("Aborting Kafka transaction {}.", transaction);
- FlinkKafkaInternalProducer<byte[], byte[]> kafkaProducer =
null;
- try {
- kafkaProducer = new
FlinkKafkaInternalProducer<>(myConfig);
- // it suffices to call initTransactions - this will
abort any
- // lingering transactions
- kafkaProducer.initTransactions();
- } finally {
- if (kafkaProducer != null) {
- kafkaProducer.close(Duration.ofSeconds(0));
- }
- }
- });
+ /**
+ * Aborts all transactions that have been created by this subtask in a
previous run.
+ *
+ * <p>It also aborts transactions from subtasks that may have been removed
because of
+ * downscaling.
+ *
+ * <p>When Flink downscales X subtasks to Y subtasks, then subtask i is
responsible for cleaning
+ * all subtasks j in [0; X), where j % Y = i. For example, if we downscale
to 2, then subtask 0
+ * is responsible for all even and subtask 1 for all odd subtasks.
+ */
+ private void abortTransactionsWithPrefix(
Review comment:
But this is exactly the same thing.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -327,41 +346,43 @@ private void flushRecords(boolean finalFlush) {
* For each checkpoint we create new {@link FlinkKafkaInternalProducer} so
that new transactions
* will not clash with transactions created during previous checkpoints
({@code
* producer.initTransactions()} assures that we obtain new producerId and
epoch counters).
+ *
+ * <p>Ensures that all transaction ids in between lastCheckpointId and
checkpointId are
+ * initialized.
*/
- private FlinkKafkaInternalProducer<byte[], byte[]>
createTransactionalProducer() {
- final long transactionalIdOffset =
kafkaWriterState.getTransactionalIdOffset() + 1;
+ private FlinkKafkaInternalProducer<byte[], byte[]>
createTransactionalProducer(
+ long checkpointId) {
+ checkState(
+ checkpointId > lastCheckpointId,
+ "Expected %s > %s",
+ checkpointId,
+ lastCheckpointId);
final Properties copiedProducerConfig = new Properties();
copiedProducerConfig.putAll(kafkaProducerConfig);
- initTransactionalProducerConfig(
- copiedProducerConfig,
- transactionalIdOffset,
- transactionalIdPrefix,
- kafkaSinkContext.getParallelInstanceId());
+ copiedProducerConfig.put(
+ ProducerConfig.TRANSACTIONAL_ID_CONFIG,
+ TransactionalIdFactory.buildTransactionalId(
+ transactionalIdPrefix,
+ kafkaSinkContext.getParallelInstanceId(),
+ lastCheckpointId + 1));
final FlinkKafkaInternalProducer<byte[], byte[]> producer =
new FlinkKafkaInternalProducer<>(copiedProducerConfig);
producer.initTransactions();
- kafkaWriterState =
- new KafkaWriterState(
- transactionalIdPrefix,
- kafkaSinkContext.getParallelInstanceId(),
- transactionalIdOffset);
- LOG.info(
- "Created new transactional producer {}",
-
copiedProducerConfig.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG));
+ // in case checkpoints have been aborted, Flink would create
non-consecutive transaction ids
+ // this loop ensures that all gaps are filled with initialized (empty)
transactions
+ for (long id = lastCheckpointId + 2; id <= checkpointId; id++) {
+ producer.setTransactionalId(
+ TransactionalIdFactory.buildTransactionalId(
+ transactionalIdPrefix,
+ kafkaSinkContext.getParallelInstanceId(),
+ lastCheckpointId + 1));
+ producer.initTransactions();
+ }
Review comment:
I refactored the code to make it more obvious. I'm also adding an
example in the comments.
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLogITCase.java
##########
@@ -17,7 +17,8 @@
package org.apache.flink.connector.kafka.sink;
-import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+import
org.apache.flink.connector.kafka.sink.KafkaTransactionLog.TransactionRecord;
Review comment:
Yes, if it's non-trivial.
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLogITCase.java
##########
@@ -17,7 +17,8 @@
package org.apache.flink.connector.kafka.sink;
-import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+import
org.apache.flink.connector.kafka.sink.KafkaTransactionLog.TransactionRecord;
Review comment:
Yes, if it's non-trivial and meant to be used as a util for other tests.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitRetrier.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.clock.Clock;
+import org.apache.flink.util.clock.SystemClock;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Retries the committables of a {@link CommitterHandler} until all
committables are eventually
+ * committed.
+ */
+public class CommitRetrier {
Review comment:
Made non-public instead.
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
##########
@@ -226,11 +236,12 @@ public void
testAbortTransactionsOfPendingCheckpointsAfterFailure() throws Excep
new FailingCheckpointMapper(failed, lastCheckpointedRecord),
config, "newPrefix");
final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
drainAllRecordsFromTopic(topic);
- assertEquals(
+ assertThat(
deserializeValues(collectedRecords),
- LongStream.range(1, lastCheckpointedRecord.get().get() + 1)
- .boxed()
- .collect(Collectors.toList()));
+ containsInAnyOrder(
Review comment:
Yes you are right. I was playing around with MiniCluster at some
intermediate state and things were executed in parallel. I'd switch to
`contains` but would leave the change in as expected and actual was swapped
originally.
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -141,13 +138,15 @@
} catch (Exception e) {
throw new FlinkRuntimeException("Cannot initialize schema.", e);
}
- this.kafkaWriterState =
- recoverAndInitializeState(checkNotNull(recoveredStates,
"recoveredStates"));
disableMetrics =
kafkaProducerConfig.containsKey(KEY_DISABLE_METRICS)
&& Boolean.parseBoolean(
kafkaProducerConfig.getProperty(KEY_DISABLE_METRICS));
- this.currentProducer = createProducer();
+ lastCheckpointId =
sinkInitContext.getRestoredCheckpointId().orElse(-1) + 1;
Review comment:
Good catch!
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -183,62 +181,83 @@ public void close() throws Exception {
closer.close();
}
- private KafkaWriterState recoverAndInitializeState(List<KafkaWriterState>
recoveredStates) {
- final int subtaskId = kafkaSinkContext.getParallelInstanceId();
- if (recoveredStates.isEmpty()) {
- final KafkaWriterState state =
- new KafkaWriterState(transactionalIdPrefix, subtaskId, 0);
- abortTransactions(getTransactionsToAbort(state, new
ArrayList<>()));
- return state;
+ private void abortLingeringTransactions(
+ List<KafkaWriterState> recoveredStates, long startCheckpointId) {
+ List<String> prefixesToAbort =
Lists.newArrayList(transactionalIdPrefix);
+
+ if (!recoveredStates.isEmpty()) {
+ KafkaWriterState lastState = recoveredStates.get(0);
+ if
(!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {
Review comment:
The list is initialized with the new prefix. The old prefix is only
added when changed. So we always abort with 1 and sometimes with 2 prefixes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]