AHeise commented on a change in pull request #17019:
URL: https://github.com/apache/flink/pull/17019#discussion_r699250923



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitRetrier.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.clock.Clock;
+import org.apache.flink.util.clock.SystemClock;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Retries the committables of a {@link CommitterHandler} until all 
committables are eventually
+ * committed.
+ */
+public class CommitRetrier {

Review comment:
       Made non-public instead.

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
##########
@@ -226,11 +236,12 @@ public void 
testAbortTransactionsOfPendingCheckpointsAfterFailure() throws Excep
                 new FailingCheckpointMapper(failed, lastCheckpointedRecord), 
config, "newPrefix");
         final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
                 drainAllRecordsFromTopic(topic);
-        assertEquals(
+        assertThat(
                 deserializeValues(collectedRecords),
-                LongStream.range(1, lastCheckpointedRecord.get().get() + 1)
-                        .boxed()
-                        .collect(Collectors.toList()));
+                containsInAnyOrder(

Review comment:
       Yes you are right. I was playing around with MiniCluster at some 
intermediate state and things were executed in parallel. I'd switch to 
`contains` but would leave the change in as expected and actual was swapped 
originally.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -141,13 +138,15 @@
         } catch (Exception e) {
             throw new FlinkRuntimeException("Cannot initialize schema.", e);
         }
-        this.kafkaWriterState =
-                recoverAndInitializeState(checkNotNull(recoveredStates, 
"recoveredStates"));
         disableMetrics =
                 kafkaProducerConfig.containsKey(KEY_DISABLE_METRICS)
                         && Boolean.parseBoolean(
                                 
kafkaProducerConfig.getProperty(KEY_DISABLE_METRICS));
-        this.currentProducer = createProducer();
+        lastCheckpointId = 
sinkInitContext.getRestoredCheckpointId().orElse(-1) + 1;

Review comment:
       Good catch!

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java
##########
@@ -183,62 +181,83 @@ public void close() throws Exception {
         closer.close();
     }
 
-    private KafkaWriterState recoverAndInitializeState(List<KafkaWriterState> 
recoveredStates) {
-        final int subtaskId = kafkaSinkContext.getParallelInstanceId();
-        if (recoveredStates.isEmpty()) {
-            final KafkaWriterState state =
-                    new KafkaWriterState(transactionalIdPrefix, subtaskId, 0);
-            abortTransactions(getTransactionsToAbort(state, new 
ArrayList<>()));
-            return state;
+    private void abortLingeringTransactions(
+            List<KafkaWriterState> recoveredStates, long startCheckpointId) {
+        List<String> prefixesToAbort = 
Lists.newArrayList(transactionalIdPrefix);
+
+        if (!recoveredStates.isEmpty()) {
+            KafkaWriterState lastState = recoveredStates.get(0);
+            if 
(!lastState.getTransactionalIdPrefix().equals(transactionalIdPrefix)) {

Review comment:
       The list is initialized with the new prefix. The old prefix is only 
added when changed. So we always abort with 1 and sometimes with 2 prefixes.

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
##########
@@ -406,8 +406,14 @@ private void deleteTestTopic(String topic)
     }
 
     private List<ConsumerRecord<byte[], byte[]>> 
drainAllRecordsFromTopic(String topic) {
+        Properties properties = getKafkaClientConfiguration();
+        return drainAllRecordsFromTopic(topic, properties);
+    }
+
+    static List<ConsumerRecord<byte[], byte[]>> drainAllRecordsFromTopic(

Review comment:
       This is used in a different test `KafkaWriterITCase`. Should be 
extracted into a util if we reuse it in even more classes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to