AHeise commented on code in PR #154:
URL: 
https://github.com/apache/flink-connector-kafka/pull/154#discussion_r2014507776


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java:
##########
@@ -264,11 +282,51 @@ private TransactionAbortStrategyContextImpl 
getTransactionAbortStrategyContext(
                     producerPool.recycle(producer);
                     return epoch;
                 };
+        Set<String> ongoingTransactionIds =
+                recoveredStates.stream()
+                        .flatMap(
+                                s ->
+                                        s.getOngoingTransactions().stream()
+                                                
.map(CheckpointTransaction::getTransactionalId))
+                        .collect(Collectors.toSet());
         return new TransactionAbortStrategyContextImpl(
+                this::getTopicNames,
                 kafkaSinkContext.getParallelInstanceId(),
                 kafkaSinkContext.getNumberOfParallelInstances(),
                 prefixesToAbort,
                 startCheckpointId,
-                aborter);
+                aborter,
+                this::getAdminClient,
+                ongoingTransactionIds);
+    }
+
+    private Collection<String> getTopicNames() {
+        KafkaDatasetIdentifier identifier =
+                getDatasetIdentifier()
+                        .orElseThrow(
+                                () ->
+                                        new IllegalStateException(
+                                                "The record serializer does 
not expose a static list of target topics."));
+        if (identifier.getTopics() != null) {
+            return identifier.getTopics();
+        }
+        return AdminUtils.getTopicsByPattern(getAdminClient(), 
identifier.getTopicPattern());
+    }

Review Comment:
   In that case, the user has implement the interfaces. 
   
   ```
               if (transactionNamingStrategy.requiresKnownTopics()) {
                   checkState(
                           recordSerializer instanceof 
KafkaDatasetFacetProvider,
                           "For %s naming strategy, the recordSerializer needs 
to expose the target topics though implementing KafkaDatasetFacetProvider.",
                           transactionNamingStrategy);
               }
   ```
   
   Note that Artem planned to extend ListTransaction API to look by transaction 
id prefix (which is what we actually want) which makes this requirement 
obsolete.
   
   For all internal serializers, especially SQL and going through your builder, 
the interface is already implemented.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to