Shekharrajak commented on code in PR #271:
URL: 
https://github.com/apache/flink-connector-kafka/pull/271#discussion_r3482740487


##########
flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaShareEosPipelineITCase.java:
##########
@@ -764,128 +623,41 @@ public Committer<KafkaShareEosCommittable> 
createCommitter(CommitterInitContext
         }
 
         @Override
-        public SimpleVersionedSerializer<KafkaShareEosCommittable> 
getCommittableSerializer() {
-            return new KafkaShareEosCommittableSerializer();
+        public SimpleVersionedSerializer<KafkaCommittable> 
getCommittableSerializer() {
+            return new KafkaCommittableSerializer();
         }
 
         @Override
         public SimpleVersionedSerializer<KafkaWriterState> 
getWriterStateSerializer() {
             return new KafkaWriterStateSerializer();
         }
-
-        @Override
-        public void addPostCommitTopology(
-                org.apache.flink.streaming.api.datastream.DataStream<
-                                CommittableMessage<KafkaShareEosCommittable>>
-                        committables) {
-            StandardSinkTopologies.addGlobalCommitter(
-                    committables,
-                    context -> new ShareAckPostCommitter(),
-                    KafkaShareEosCommittableSerializer::new);
-        }
     }
 
-    private static final class ShareAwareKafkaWriter
-            implements 
TwoPhaseCommittingStatefulSink.PrecommittingStatefulSinkWriter<
-                    ShareSourceRecord, KafkaWriterState, 
KafkaShareEosCommittable> {
-
-        private final ExactlyOnceKafkaWriter<ShareSourceRecord> delegate;
-        private final Set<ShareAckCommittable> currentShareAckCommittables =
-                new LinkedHashSet<>();
-
-        private 
ShareAwareKafkaWriter(ExactlyOnceKafkaWriter<ShareSourceRecord> delegate) {
-            this.delegate = delegate;
-        }
-
-        private void initialize() {
-            delegate.initialize();
-        }
-
-        @Override
-        public void write(ShareSourceRecord element, Context context)
-                throws IOException, InterruptedException {
-            delegate.write(element, context);
-            currentShareAckCommittables.add(element.shareAckCommittable);
-        }
-
-        @Override
-        public void flush(boolean endOfInput) throws IOException, 
InterruptedException {
-            delegate.flush(endOfInput);
-        }
-
-        @Override
-        public Collection<KafkaShareEosCommittable> prepareCommit()
-                throws IOException, InterruptedException {
-            Collection<KafkaCommittable> kafkaCommittables = 
delegate.prepareCommit();
-            if (kafkaCommittables.isEmpty()) {
-                currentShareAckCommittables.clear();
-                return Collections.emptyList();
-            }
-            KafkaShareEosCommittable committable =
-                    KafkaShareEosCommittable.ready(
-                            NO_CHECKPOINT, kafkaCommittables, 
currentShareAckCommittables);
-            currentShareAckCommittables.clear();
-            return List.of(committable);
-        }
-
-        @Override
-        public List<KafkaWriterState> snapshotState(long checkpointId) throws 
IOException {
-            return delegate.snapshotState(checkpointId);
-        }
-
-        @Override
-        public void close() throws Exception {
-            delegate.close();
-        }
-    }
-
-    private static final class SinkOnlyKafkaCommitter
-            implements Committer<KafkaShareEosCommittable> {
+    private static final class RecordingKafkaCommitter implements 
Committer<KafkaCommittable> {
 
         private final KafkaCommitter kafkaCommitter;
 
-        private SinkOnlyKafkaCommitter(KafkaCommitter kafkaCommitter) {
+        private RecordingKafkaCommitter(KafkaCommitter kafkaCommitter) {
             this.kafkaCommitter = kafkaCommitter;
         }
 
         @Override
-        public void commit(Collection<CommitRequest<KafkaShareEosCommittable>> 
requests)
+        public void commit(Collection<CommitRequest<KafkaCommittable>> 
requests)
                 throws IOException, InterruptedException {
-            for (CommitRequest<KafkaShareEosCommittable> request : requests) {
-                KafkaShareEosCommittable committable = 
request.getCommittable();
-                List<ForwardingKafkaCommitRequest> kafkaRequests =
-                        committable.getKafkaCommittables().stream()
-                                .map(ForwardingKafkaCommitRequest::new)
-                                .collect(Collectors.toList());
-                List<Committer.CommitRequest<KafkaCommittable>> 
kafkaCommitRequests =
-                        new ArrayList<>(kafkaRequests);
-                kafkaCommitter.commit(kafkaCommitRequests);
-                Optional<ForwardingKafkaCommitRequest> retry =
-                        kafkaRequests.stream()
-                                .filter(kafkaRequest -> kafkaRequest.retry)
-                                .findFirst();
-                if (retry.isPresent()) {
+            for (CommitRequest<KafkaCommittable> request : requests) {

Review Comment:
    sink records and share acks are part of the same Kafka transaction, so one 
Kafka commit makes both visible/committed atomically.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to