kennknowles commented on code in PR #31682:
URL: https://github.com/apache/beam/pull/31682#discussion_r1736273979
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java:
##########
@@ -99,13 +105,78 @@ private Map<String, Object> overrideBootstrapServersConfig(
}
}
+ private static final class MaxOffsetFn<K, V>
+ extends DoFn<KV<KafkaSourceDescriptor, KafkaRecord<K, V>>,
KV<KafkaSourceDescriptor, Long>> {
+ private static class OffsetAndTimestamp {
+ OffsetAndTimestamp(long offset, Instant timestamp) {
+ this.offset = offset;
+ this.timestamp = timestamp;
+ }
+
+ void merge(long offset, Instant timestamp) {
+ if (this.offset < offset) {
+ this.offset = offset;
+ this.timestamp = timestamp;
+ }
+ }
+
+ long offset;
+ Instant timestamp;
+ }
+
+ private transient @MonotonicNonNull Map<KafkaSourceDescriptor,
OffsetAndTimestamp> maxObserved;
+
+ @StartBundle
+ public void startBundle() {
+ if (maxObserved == null) {
+ maxObserved = new HashMap<>();
+ } else {
+ maxObserved.clear();
+ }
+ }
+
+ @RequiresStableInput
+ @ProcessElement
+ @SuppressWarnings("nullness") // startBundle guaranteed to initialize
Review Comment:
nit: don't suppress. Even though startbundle is guaranteed to initialize, it
is not guaranteed that whoever is calling this class obeys the contract of
calling methods in a particular order. In fact, it is incredibly common to get
it wrong. (this style of class is bad, but it is too late now)
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -2682,33 +2686,61 @@ public PCollection<KafkaRecord<K, V>>
expand(PCollection<KafkaSourceDescriptor>
.getSchemaRegistry()
.getSchemaCoder(KafkaSourceDescriptor.class),
recordCoder));
- if (isCommitOffsetEnabled() && !configuredKafkaCommit() &&
!isRedistribute()) {
- outputWithDescriptor =
- outputWithDescriptor
- .apply(Reshuffle.viaRandomKey())
- .setCoder(
- KvCoder.of(
- input
- .getPipeline()
- .getSchemaRegistry()
- .getSchemaCoder(KafkaSourceDescriptor.class),
- recordCoder));
-
- PCollection<Void> unused = outputWithDescriptor.apply(new
KafkaCommitOffset<K, V>(this));
- unused.setCoder(VoidCoder.of());
+
+ boolean applyCommitOffsets =
+ isCommitOffsetEnabled() && !configuredKafkaCommit() &&
!isRedistribute();
+ if (!applyCommitOffsets) {
+ return outputWithDescriptor
+ .apply(MapElements.into(new TypeDescriptor<KafkaRecord<K, V>>()
{}).via(KV::getValue))
+ .setCoder(recordCoder);
+ }
+
+ // Add transform for committing offsets to Kafka with consistency with
beam pipeline data
+ // processing.
+ String requestedVersionString =
+ input
+ .getPipeline()
+ .getOptions()
+ .as(StreamingOptions.class)
+ .getUpdateCompatibilityVersion();
+ if (requestedVersionString != null) {
+ List<String> requestedVersion =
Arrays.asList(requestedVersionString.split("\\."));
+ List<String> targetVersion = Arrays.asList("2", "60", "0");
+
+ if (Comparators.lexicographical(Comparator.<String>naturalOrder())
+ .compare(requestedVersion, targetVersion)
+ < 0) {
+ return expand259Commits(
Review Comment:
nit: would be preferable to have the parallel constructs appear as parallel
in the code, e.g.
```
if (...259) {
expand259commits
} else {
expandcommits
}
```
whereas now we have some inline and some factored even though they are
analogous
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]