kennknowles commented on code in PR #31682:
URL: https://github.com/apache/beam/pull/31682#discussion_r1736273979


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java:
##########
@@ -99,13 +105,78 @@ private Map<String, Object> overrideBootstrapServersConfig(
     }
   }
 
+  private static final class MaxOffsetFn<K, V>
+      extends DoFn<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>, 
KV<KafkaSourceDescriptor, Long>> {
+    private static class OffsetAndTimestamp {
+      OffsetAndTimestamp(long offset, Instant timestamp) {
+        this.offset = offset;
+        this.timestamp = timestamp;
+      }
+
+      void merge(long offset, Instant timestamp) {
+        if (this.offset < offset) {
+          this.offset = offset;
+          this.timestamp = timestamp;
+        }
+      }
+
+      long offset;
+      Instant timestamp;
+    }
+
+    private transient @MonotonicNonNull Map<KafkaSourceDescriptor, 
OffsetAndTimestamp> maxObserved;
+
+    @StartBundle
+    public void startBundle() {
+      if (maxObserved == null) {
+        maxObserved = new HashMap<>();
+      } else {
+        maxObserved.clear();
+      }
+    }
+
+    @RequiresStableInput
+    @ProcessElement
+    @SuppressWarnings("nullness") // startBundle guaranteed to initialize

Review Comment:
   nit: don't suppress. Even though startbundle is guaranteed to initialize, it 
is not guaranteed that whoever is calling this class obeys the contract of 
calling methods in a particular order. In fact, it is incredibly common to get 
it wrong. (this style of class is bad, but it is too late now)



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -2682,33 +2686,61 @@ public PCollection<KafkaRecord<K, V>> 
expand(PCollection<KafkaSourceDescriptor>
                             .getSchemaRegistry()
                             .getSchemaCoder(KafkaSourceDescriptor.class),
                         recordCoder));
-        if (isCommitOffsetEnabled() && !configuredKafkaCommit() && 
!isRedistribute()) {
-          outputWithDescriptor =
-              outputWithDescriptor
-                  .apply(Reshuffle.viaRandomKey())
-                  .setCoder(
-                      KvCoder.of(
-                          input
-                              .getPipeline()
-                              .getSchemaRegistry()
-                              .getSchemaCoder(KafkaSourceDescriptor.class),
-                          recordCoder));
-
-          PCollection<Void> unused = outputWithDescriptor.apply(new 
KafkaCommitOffset<K, V>(this));
-          unused.setCoder(VoidCoder.of());
+
+        boolean applyCommitOffsets =
+            isCommitOffsetEnabled() && !configuredKafkaCommit() && 
!isRedistribute();
+        if (!applyCommitOffsets) {
+          return outputWithDescriptor
+              .apply(MapElements.into(new TypeDescriptor<KafkaRecord<K, V>>() 
{}).via(KV::getValue))
+              .setCoder(recordCoder);
+        }
+
+        // Add transform for committing offsets to Kafka with consistency with 
beam pipeline data
+        // processing.
+        String requestedVersionString =
+            input
+                .getPipeline()
+                .getOptions()
+                .as(StreamingOptions.class)
+                .getUpdateCompatibilityVersion();
+        if (requestedVersionString != null) {
+          List<String> requestedVersion = 
Arrays.asList(requestedVersionString.split("\\."));
+          List<String> targetVersion = Arrays.asList("2", "60", "0");
+
+          if (Comparators.lexicographical(Comparator.<String>naturalOrder())
+                  .compare(requestedVersion, targetVersion)
+              < 0) {
+            return expand259Commits(

Review Comment:
   nit: would be preferable to have the parallel constructs appear as parallel 
in the code, e.g.
   
   ```
   if (...259) {
     expand259commits
   } else {
     expandcommits
   }
   ```
   
   whereas now we have some inline and some factored even though they are 
analogous



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to