AHeise commented on a change in pull request #17019: URL: https://github.com/apache/flink/pull/17019#discussion_r698508026
########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalBatchCommitterHandlerTest.java ########## @@ -44,16 +45,20 @@ public void throwExceptionWithoutCommitter() throws Exception { testHarness.initializeEmptyState(); } - @Test(expected = UnsupportedOperationException.class) - public void doNotSupportRetry() throws Exception { + @Test + public void supportRetry() throws Exception { Review comment: For global committer timer is not really used. For streaming committer handler, we can use `TestProcessingTimeService` and manually advance the time which triggers all respective callbacks. I have also added a more fine-grain unit test with `ManualClock`. ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java ########## @@ -47,6 +48,7 @@ private final Properties kafkaProducerConfig; @Nullable private final String transactionalId; + private boolean inTransaction; Review comment: Good point. I was assuming that all accesses use the task thread, which probably holds until we reach cancellation (where abortion really matters). So I probably switch to volatile here - it's not on the hot path and gives safety. ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java ########## @@ -174,6 +174,9 @@ public void write(IN element, Context context) throws IOException { @Override public void close() throws Exception { + if (currentProducer.isInTransaction()) { Review comment: I added a test. 👍 ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java ########## @@ -86,6 +87,11 @@ public boolean isInTransaction() { return inTransaction; } + @Override + public void close() { + super.close(Duration.ZERO); + } Review comment: I added a `flush` in `KafkaWriter#close` to be safe. All other paths are pretty much secured through `commitTransaction`. ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java ########## @@ -36,25 +34,28 @@ import java.time.Duration; import java.util.Properties; +import static org.apache.flink.util.Preconditions.checkState; + /** * A {@link KafkaProducer} that exposes private fields to allow resume producing from a given state. */ -class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K, V> { +public class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K, V> { Review comment: I think this is an oversight on my end. ########## File path: flink-connectors/flink-connector-kafka/pom.xml ########## @@ -124,6 +124,12 @@ under the License. <scope>test</scope> </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>junit-jupiter</artifactId> + <scope>test</scope> + </dependency> + Review comment: You are right that belongs in the commit with the JUnit5 test container test. ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java ########## @@ -164,13 +163,12 @@ public void write(IN element, Context context) throws IOException { @Override public List<KafkaCommittable> prepareCommit(boolean flush) { flushRecords(flush); - List<KafkaCommittable> committables = precommit(); - currentProducer = createProducer(); - return committables; + return precommit(); } @Override - public List<KafkaWriterState> snapshotState() throws IOException { + public List<KafkaWriterState> snapshotState(long checkpointId) throws IOException { + currentProducer = createProducer(checkpointId); Review comment: I now explicitly nulled the `currentProducer`. That still leaves room in the future for errors but at least not correctness issues. ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java ########## @@ -183,62 +181,83 @@ public void close() throws Exception { closer.close(); } - private KafkaWriterState recoverAndInitializeState(List<KafkaWriterState> recoveredStates) { - final int subtaskId = kafkaSinkContext.getParallelInstanceId(); - if (recoveredStates.isEmpty()) { - final KafkaWriterState state = - new KafkaWriterState(transactionalIdPrefix, subtaskId, 0); - abortTransactions(getTransactionsToAbort(state, new ArrayList<>())); - return state; + private void abortLingeringTransactions( + List<KafkaWriterState> recoveredStates, long startCheckpointId) { + List<String> prefixesToAbort = Lists.newArrayList(transactionalIdPrefix); + + if (!recoveredStates.isEmpty()) { + KafkaWriterState lastState = recoveredStates.get(0); + if (!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) { Review comment: Isn't this exactly what's done here? We usually only try to abort transactions with the current prefix. When we detect a change, we additionally check for transactions with the old prefix. ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java ########## @@ -183,62 +181,83 @@ public void close() throws Exception { closer.close(); } - private KafkaWriterState recoverAndInitializeState(List<KafkaWriterState> recoveredStates) { - final int subtaskId = kafkaSinkContext.getParallelInstanceId(); - if (recoveredStates.isEmpty()) { - final KafkaWriterState state = - new KafkaWriterState(transactionalIdPrefix, subtaskId, 0); - abortTransactions(getTransactionsToAbort(state, new ArrayList<>())); - return state; + private void abortLingeringTransactions( + List<KafkaWriterState> recoveredStates, long startCheckpointId) { + List<String> prefixesToAbort = Lists.newArrayList(transactionalIdPrefix); + + if (!recoveredStates.isEmpty()) { + KafkaWriterState lastState = recoveredStates.get(0); + if (!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) { + prefixesToAbort.add(lastState.getTransactionalIdPrefix()); + LOG.warn( + "Transactional id prefix from previous execution {} has changed to {}.", + lastState.getTransactionalIdPrefix(), + transactionalIdPrefix); + } } - final Map<Integer, KafkaWriterState> taskOffsetMapping = - recoveredStates.stream() - .collect( - Collectors.toMap( - KafkaWriterState::getSubtaskId, Function.identity())); - checkState( - taskOffsetMapping.containsKey(subtaskId), - "Internal error: It is expected that state from previous executions is distributed to the same subtask id."); - final KafkaWriterState lastState = taskOffsetMapping.get(subtaskId); - taskOffsetMapping.remove(subtaskId); - abortTransactions( - getTransactionsToAbort(lastState, new ArrayList<>(taskOffsetMapping.values()))); - if (!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) { - LOG.warn( - "Transactional id prefix from previous execution {} has changed to {}.", - lastState.getTransactionalIdPrefix(), - transactionalIdPrefix); - return new KafkaWriterState(transactionalIdPrefix, subtaskId, 0); + + final Properties properties = new Properties(); + properties.putAll(kafkaProducerConfig); + properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "dummy"); Review comment: Yes, but we can solve it in a better way where this is not needed. ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java ########## @@ -183,62 +181,83 @@ public void close() throws Exception { closer.close(); } - private KafkaWriterState recoverAndInitializeState(List<KafkaWriterState> recoveredStates) { - final int subtaskId = kafkaSinkContext.getParallelInstanceId(); - if (recoveredStates.isEmpty()) { - final KafkaWriterState state = - new KafkaWriterState(transactionalIdPrefix, subtaskId, 0); - abortTransactions(getTransactionsToAbort(state, new ArrayList<>())); - return state; + private void abortLingeringTransactions( + List<KafkaWriterState> recoveredStates, long startCheckpointId) { + List<String> prefixesToAbort = Lists.newArrayList(transactionalIdPrefix); + + if (!recoveredStates.isEmpty()) { + KafkaWriterState lastState = recoveredStates.get(0); + if (!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) { + prefixesToAbort.add(lastState.getTransactionalIdPrefix()); + LOG.warn( + "Transactional id prefix from previous execution {} has changed to {}.", + lastState.getTransactionalIdPrefix(), + transactionalIdPrefix); + } } - final Map<Integer, KafkaWriterState> taskOffsetMapping = - recoveredStates.stream() - .collect( - Collectors.toMap( - KafkaWriterState::getSubtaskId, Function.identity())); - checkState( - taskOffsetMapping.containsKey(subtaskId), - "Internal error: It is expected that state from previous executions is distributed to the same subtask id."); - final KafkaWriterState lastState = taskOffsetMapping.get(subtaskId); - taskOffsetMapping.remove(subtaskId); - abortTransactions( - getTransactionsToAbort(lastState, new ArrayList<>(taskOffsetMapping.values()))); - if (!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) { - LOG.warn( - "Transactional id prefix from previous execution {} has changed to {}.", - lastState.getTransactionalIdPrefix(), - transactionalIdPrefix); - return new KafkaWriterState(transactionalIdPrefix, subtaskId, 0); + + final Properties properties = new Properties(); + properties.putAll(kafkaProducerConfig); + properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "dummy"); + try (FlinkKafkaInternalProducer<byte[], byte[]> producer = + new FlinkKafkaInternalProducer<>(properties)) { + for (String prefix : prefixesToAbort) { + abortTransactionsWithPrefix(producer, prefix, startCheckpointId); + } } - return new KafkaWriterState( - transactionalIdPrefix, subtaskId, lastState.getTransactionalIdOffset()); } - private void abortTransactions(List<String> transactionsToAbort) { - transactionsToAbort.forEach( - transaction -> { - // don't mess with the original configuration or any other - // properties of the - // original object - // -> create an internal kafka producer on our own and do not rely - // on - // initTransactionalProducer(). - final Properties myConfig = new Properties(); - myConfig.putAll(kafkaProducerConfig); - myConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transaction); - LOG.info("Aborting Kafka transaction {}.", transaction); - FlinkKafkaInternalProducer<byte[], byte[]> kafkaProducer = null; - try { - kafkaProducer = new FlinkKafkaInternalProducer<>(myConfig); - // it suffices to call initTransactions - this will abort any - // lingering transactions - kafkaProducer.initTransactions(); - } finally { - if (kafkaProducer != null) { - kafkaProducer.close(Duration.ofSeconds(0)); - } - } - }); + /** + * Aborts all transactions that have been created by this subtask in a previous run. + * + * <p>It also aborts transactions from subtasks that may have been removed because of + * downscaling. + * + * <p>When Flink downscales X subtasks to Y subtasks, then subtask i is responsible for cleaning + * all subtasks j in [0; X), where j % Y = i. For example, if we downscale to 2, then subtask 0 + * is responsible for all even and subtask 1 for all odd subtasks. + */ + private void abortTransactionsWithPrefix( Review comment: But this is exactly the same thing. ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java ########## @@ -327,41 +346,43 @@ private void flushRecords(boolean finalFlush) { * For each checkpoint we create new {@link FlinkKafkaInternalProducer} so that new transactions * will not clash with transactions created during previous checkpoints ({@code * producer.initTransactions()} assures that we obtain new producerId and epoch counters). + * + * <p>Ensures that all transaction ids in between lastCheckpointId and checkpointId are + * initialized. */ - private FlinkKafkaInternalProducer<byte[], byte[]> createTransactionalProducer() { - final long transactionalIdOffset = kafkaWriterState.getTransactionalIdOffset() + 1; + private FlinkKafkaInternalProducer<byte[], byte[]> createTransactionalProducer( + long checkpointId) { + checkState( + checkpointId > lastCheckpointId, + "Expected %s > %s", + checkpointId, + lastCheckpointId); final Properties copiedProducerConfig = new Properties(); copiedProducerConfig.putAll(kafkaProducerConfig); - initTransactionalProducerConfig( - copiedProducerConfig, - transactionalIdOffset, - transactionalIdPrefix, - kafkaSinkContext.getParallelInstanceId()); + copiedProducerConfig.put( + ProducerConfig.TRANSACTIONAL_ID_CONFIG, + TransactionalIdFactory.buildTransactionalId( + transactionalIdPrefix, + kafkaSinkContext.getParallelInstanceId(), + lastCheckpointId + 1)); final FlinkKafkaInternalProducer<byte[], byte[]> producer = new FlinkKafkaInternalProducer<>(copiedProducerConfig); producer.initTransactions(); - kafkaWriterState = - new KafkaWriterState( - transactionalIdPrefix, - kafkaSinkContext.getParallelInstanceId(), - transactionalIdOffset); - LOG.info( - "Created new transactional producer {}", - copiedProducerConfig.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)); + // in case checkpoints have been aborted, Flink would create non-consecutive transaction ids + // this loop ensures that all gaps are filled with initialized (empty) transactions + for (long id = lastCheckpointId + 2; id <= checkpointId; id++) { + producer.setTransactionalId( + TransactionalIdFactory.buildTransactionalId( + transactionalIdPrefix, + kafkaSinkContext.getParallelInstanceId(), + lastCheckpointId + 1)); + producer.initTransactions(); + } Review comment: I refactored the code to make it more obvious. I'm also adding an example in the comments. ########## File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLogITCase.java ########## @@ -17,7 +17,8 @@ package org.apache.flink.connector.kafka.sink; -import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; +import org.apache.flink.connector.kafka.sink.KafkaTransactionLog.TransactionRecord; Review comment: Yes, if it's non-trivial. ########## File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLogITCase.java ########## @@ -17,7 +17,8 @@ package org.apache.flink.connector.kafka.sink; -import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; +import org.apache.flink.connector.kafka.sink.KafkaTransactionLog.TransactionRecord; Review comment: Yes, if it's non-trivial and meant to be used as a util for other tests. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitRetrier.java ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.sink; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.util.clock.Clock; +import org.apache.flink.util.clock.SystemClock; + +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Retries the committables of a {@link CommitterHandler} until all committables are eventually + * committed. + */ +public class CommitRetrier { Review comment: Made non-public instead. ########## File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java ########## @@ -226,11 +236,12 @@ public void testAbortTransactionsOfPendingCheckpointsAfterFailure() throws Excep new FailingCheckpointMapper(failed, lastCheckpointedRecord), config, "newPrefix"); final List<ConsumerRecord<byte[], byte[]>> collectedRecords = drainAllRecordsFromTopic(topic); - assertEquals( + assertThat( deserializeValues(collectedRecords), - LongStream.range(1, lastCheckpointedRecord.get().get() + 1) - .boxed() - .collect(Collectors.toList())); + containsInAnyOrder( Review comment: Yes you are right. I was playing around with MiniCluster at some intermediate state and things were executed in parallel. I'd switch to `contains` but would leave the change in as expected and actual was swapped originally. ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java ########## @@ -141,13 +138,15 @@ } catch (Exception e) { throw new FlinkRuntimeException("Cannot initialize schema.", e); } - this.kafkaWriterState = - recoverAndInitializeState(checkNotNull(recoveredStates, "recoveredStates")); disableMetrics = kafkaProducerConfig.containsKey(KEY_DISABLE_METRICS) && Boolean.parseBoolean( kafkaProducerConfig.getProperty(KEY_DISABLE_METRICS)); - this.currentProducer = createProducer(); + lastCheckpointId = sinkInitContext.getRestoredCheckpointId().orElse(-1) + 1; Review comment: Good catch! ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java ########## @@ -183,62 +181,83 @@ public void close() throws Exception { closer.close(); } - private KafkaWriterState recoverAndInitializeState(List<KafkaWriterState> recoveredStates) { - final int subtaskId = kafkaSinkContext.getParallelInstanceId(); - if (recoveredStates.isEmpty()) { - final KafkaWriterState state = - new KafkaWriterState(transactionalIdPrefix, subtaskId, 0); - abortTransactions(getTransactionsToAbort(state, new ArrayList<>())); - return state; + private void abortLingeringTransactions( + List<KafkaWriterState> recoveredStates, long startCheckpointId) { + List<String> prefixesToAbort = Lists.newArrayList(transactionalIdPrefix); + + if (!recoveredStates.isEmpty()) { + KafkaWriterState lastState = recoveredStates.get(0); + if (!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) { Review comment: The list is initialized with the new prefix. The old prefix is only added when changed. So we always abort with 1 and sometimes with 2 prefixes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org