This is an automated email from the ASF dual-hosted git repository.

frankchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d648968a1d Fix negative Kafka partition lag caused by inconsistent 
current/latest offsets (#18750)
1d648968a1d is described below

commit 1d648968a1d3604958aa26dd51b00e2013add8e2
Author: Guowei Wu <[email protected]>
AuthorDate: Tue Mar 17 21:45:43 2026 +0800

    Fix negative Kafka partition lag caused by inconsistent current/latest 
offsets (#18750)
    
    * Fix negative Kafka partition lag caused by inconsistent current/latest 
offsets
    
    * Address review comments
    
    * Fix stale Javadoc and brittle tests after removing 
latestSequenceFromStream
---
 .../indexing/kafka/supervisor/KafkaSupervisor.java | 116 +++++++++++-------
 .../seekablestream/supervisor/OffsetSnapshot.java  |  89 ++++++++++++++
 .../supervisor/OffsetSnapshotTest.java             | 136 +++++++++++++++++++++
 3 files changed, 296 insertions(+), 45 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index e226db37676..e2f62ed8d75 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -54,6 +54,7 @@ import 
org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
 import org.apache.druid.indexing.seekablestream.common.StreamException;
 import org.apache.druid.indexing.seekablestream.common.StreamPartition;
+import org.apache.druid.indexing.seekablestream.supervisor.OffsetSnapshot;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
@@ -71,9 +72,10 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
@@ -95,11 +97,13 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<KafkaTopicPartitio
   private static final Long END_OF_PARTITION = Long.MAX_VALUE;
 
   private final Pattern pattern;
-  private volatile Map<KafkaTopicPartition, Long> latestSequenceFromStream;
   private volatile Map<KafkaTopicPartition, Long> partitionToTimeLag;
 
   private final KafkaSupervisorSpec spec;
 
+  private final AtomicReference<OffsetSnapshot<KafkaTopicPartition, Long>> 
offsetSnapshotRef = new AtomicReference<>(
+      OffsetSnapshot.of(Map.of(), Map.of()));
+
   public KafkaSupervisor(
       final TaskStorage taskStorage,
       final TaskMaster taskMaster,
@@ -126,7 +130,6 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<KafkaTopicPartitio
     this.pattern = getIoConfig().isMultiTopic() ? 
Pattern.compile(getIoConfig().getStream()) : null;
   }
 
-
   @Override
   protected RecordSupplier<KafkaTopicPartition, Long, KafkaRecordEntity> 
setupRecordSupplier()
   {
@@ -173,7 +176,8 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<KafkaTopicPartitio
   )
   {
     KafkaSupervisorIOConfig ioConfig = spec.getIoConfig();
-    Map<KafkaTopicPartition, Long> partitionLag = 
getRecordLagPerPartitionInLatestSequences(getHighestCurrentOffsets());
+    OffsetSnapshot<KafkaTopicPartition, Long> offsetSnapshot = 
offsetSnapshotRef.get();
+    Map<KafkaTopicPartition, Long> partitionLag = 
getRecordLagPerPartitionInLatestSequences(offsetSnapshot);
     return new KafkaSupervisorReportPayload(
         spec.getId(),
         spec.getDataSchema().getDataSource(),
@@ -181,7 +185,7 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<KafkaTopicPartitio
         numPartitions,
         ioConfig.getReplicas(),
         ioConfig.getTaskDuration().getMillis() / 1000,
-        includeOffsets ? latestSequenceFromStream : null,
+        includeOffsets ? offsetSnapshot.getLatestOffsetsFromStream() : null,
         includeOffsets ? partitionLag : null,
         includeOffsets ? getPartitionTimeLag() : null,
         includeOffsets ? aggregatePartitionLags(partitionLag).getTotalLag() : 
null,
@@ -264,14 +268,16 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<KafkaTopicPartitio
   @Override
   protected Map<KafkaTopicPartition, Long> getPartitionRecordLag()
   {
-    Map<KafkaTopicPartition, Long> highestCurrentOffsets = 
getHighestCurrentOffsets();
+    OffsetSnapshot<KafkaTopicPartition, Long> offsetSnapshot = 
offsetSnapshotRef.get();
+    Map<KafkaTopicPartition, Long> latestSequencesFromStream = 
offsetSnapshot.getLatestOffsetsFromStream();
+    Map<KafkaTopicPartition, Long> highestIngestedOffsets = 
offsetSnapshot.getHighestIngestedOffsets();
 
-    if (latestSequenceFromStream == null) {
+    if (latestSequencesFromStream.isEmpty()) {
       return null;
     }
 
-    Set<KafkaTopicPartition> kafkaPartitions = 
latestSequenceFromStream.keySet();
-    Set<KafkaTopicPartition> taskPartitions = highestCurrentOffsets.keySet();
+    Set<KafkaTopicPartition> kafkaPartitions = 
latestSequencesFromStream.keySet();
+    Set<KafkaTopicPartition> taskPartitions = highestIngestedOffsets.keySet();
     if (!kafkaPartitions.equals(taskPartitions)) {
       try {
         log.warn("Mismatched kafka and task partitions: Missing Task 
Partitions %s, Missing Kafka Partitions %s",
@@ -284,7 +290,7 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<KafkaTopicPartitio
       }
     }
 
-    return getRecordLagPerPartitionInLatestSequences(highestCurrentOffsets);
+    return getRecordLagPerPartitionInLatestSequences(offsetSnapshot);
   }
 
   @Nullable
@@ -297,44 +303,48 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<KafkaTopicPartitio
   // suppress use of CollectionUtils.mapValues() since the valueMapper 
function is dependent on map key here
   @SuppressWarnings("SSBasedInspection")
   // Used while calculating cummulative lag for entire stream
-  private Map<KafkaTopicPartition, Long> 
getRecordLagPerPartitionInLatestSequences(Map<KafkaTopicPartition, Long> 
currentOffsets)
+  private Map<KafkaTopicPartition, Long> 
getRecordLagPerPartitionInLatestSequences(OffsetSnapshot<KafkaTopicPartition, 
Long> offsetSnapshot)
   {
-    if (latestSequenceFromStream == null) {
-      return Collections.emptyMap();
-    }
-
-    return latestSequenceFromStream
-        .entrySet()
-        .stream()
-        .collect(
-            Collectors.toMap(
-                Entry::getKey,
-                e -> e.getValue() != null
-                     ? e.getValue() - 
Optional.ofNullable(currentOffsets.get(e.getKey())).orElse(0L)
-                     : 0
-            )
-        );
+    return offsetSnapshot.getLatestOffsetsFromStream()
+                         .entrySet()
+                         .stream()
+                         .collect(Collectors.toMap(
+                             Entry::getKey,
+                             e -> e.getValue() - 
offsetSnapshot.getHighestIngestedOffsets().getOrDefault(e.getKey(), 0L)
+                         ));
   }
 
+  /**
+   * Computes the record lag for the specified partitions.
+   * <p>
+   * This method is called by the parent class to calculate lag only for the 
partitions present in
+   * {@code currentOffsets}. The values in {@code currentOffsets} are ignored; 
only the keys (partitions)
+   * are used to determine which partitions to compute lag for.
+   * <p>
+   * Lag is calculated as {@code latestOffsetFromStream - 
highestIngestedOffset} using the current atomic
+   * {@link OffsetSnapshot} to ensure consistency between the two offset maps.
+   *
+   * @param currentOffsets map whose keys indicate the partitions to compute 
lag for (values are ignored)
+   * @return map of partition to its record lag, or empty map if no valid data 
is available
+   */
   @Override
   protected Map<KafkaTopicPartition, Long> 
getRecordLagPerPartition(Map<KafkaTopicPartition, Long> currentOffsets)
   {
-    if (latestSequenceFromStream == null || currentOffsets == null) {
+    OffsetSnapshot<KafkaTopicPartition, Long> offsetSnapshot = 
offsetSnapshotRef.get();
+    Map<KafkaTopicPartition, Long> latestSequencesFromStream = 
offsetSnapshot.getLatestOffsetsFromStream();
+    Map<KafkaTopicPartition, Long> highestIngestedOffsets = 
offsetSnapshot.getHighestIngestedOffsets();
+
+    if (latestSequencesFromStream.isEmpty() || currentOffsets == null) {
       return Collections.emptyMap();
     }
 
-    return currentOffsets
-        .entrySet()
-        .stream()
-        .filter(e -> latestSequenceFromStream.get(e.getKey()) != null)
-        .collect(
-            Collectors.toMap(
-                Entry::getKey,
-                e -> e.getValue() != null
-                     ? latestSequenceFromStream.get(e.getKey()) - e.getValue()
-                     : 0
-            )
-        );
+    return currentOffsets.keySet().stream()
+                         .filter(latestSequencesFromStream::containsKey)
+                         .collect(Collectors.toMap(
+                             Function.identity(),
+                             // compute the lag using offsets from the 
snapshot.
+                             p -> latestSequencesFromStream.get(p) - 
highestIngestedOffsets.getOrDefault(p, 0L)
+                         ));
   }
 
   @Override
@@ -439,7 +449,7 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<KafkaTopicPartitio
       yetToReadPartitions.forEach(p -> lastIngestedTimestamps.put(p, 0L));
 
       recordSupplier.seekToLatest(partitions);
-      latestSequenceFromStream = 
recordSupplier.getLatestSequenceNumbers(partitions);
+      Map<KafkaTopicPartition, Long> latestSequenceFromStream = 
recordSupplier.getLatestSequenceNumbers(partitions);
 
       for (Map.Entry<KafkaTopicPartition, Long> entry : 
latestSequenceFromStream.entrySet()) {
         // if there are no messages .getEndOffset would return 0, but if there 
are n msgs it would return n+1
@@ -457,6 +467,8 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<KafkaTopicPartitio
                   e -> e.getValue() - lastIngestedTimestamps.get(e.getKey())
               )
           );
+
+      updateOffsetSnapshot(highestCurrentOffsets, latestSequenceFromStream);
     }
     catch (InterruptedException e) {
       throw new StreamException(e);
@@ -497,9 +509,11 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<KafkaTopicPartitio
   }
 
   /**
-   * Fetches the latest offsets from the Kafka stream and updates the map
-   * {@link #latestSequenceFromStream}. The actual lag is computed lazily in
-   * {@link #getPartitionRecordLag}.
+   * Fetches latest stream offsets, combines with the highest ingested offsets 
into a snapshot,
+   * and atomically updates {@link #offsetSnapshotRef}.
+   * <p>
+   * Lag is computed consistently from the snapshot in downstream methods.
+   * </p>
    */
   @Override
   protected void updatePartitionLagFromStream()
@@ -509,6 +523,8 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<KafkaTopicPartitio
       return;
     }
 
+    Map<KafkaTopicPartition, Long> highestCurrentOffsets = 
getHighestCurrentOffsets();
+
     getRecordSupplierLock().lock();
     try {
       Set<KafkaTopicPartition> partitionIds;
@@ -527,8 +543,10 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<KafkaTopicPartitio
 
       recordSupplier.seekToLatest(partitions);
 
-      latestSequenceFromStream =
+      Map<KafkaTopicPartition, Long> latestSequenceFromStream =
           
partitions.stream().collect(Collectors.toMap(StreamPartition::getPartitionId, 
recordSupplier::getPosition));
+
+      updateOffsetSnapshot(highestCurrentOffsets, latestSequenceFromStream);
     }
     catch (InterruptedException e) {
       throw new StreamException(e);
@@ -538,10 +556,18 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<KafkaTopicPartitio
     }
   }
 
+  private void updateOffsetSnapshot(
+      Map<KafkaTopicPartition, Long> highestIngestedOffsets,
+      Map<KafkaTopicPartition, Long> latestOffsetsFromStream
+  )
+  {
+    offsetSnapshotRef.set(OffsetSnapshot.of(highestIngestedOffsets, 
latestOffsetsFromStream));
+  }
+
   @Override
   protected Map<KafkaTopicPartition, Long> getLatestSequencesFromStream()
   {
-    return latestSequenceFromStream != null ? latestSequenceFromStream : new 
HashMap<>();
+    return offsetSnapshotRef.get().getLatestOffsetsFromStream();
   }
 
   @Override
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/OffsetSnapshot.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/OffsetSnapshot.java
new file mode 100644
index 00000000000..ba1df930fbb
--- /dev/null
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/OffsetSnapshot.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Immutable snapshot containing a consistent pair of offset maps: the highest 
ingested offsets
+ * reported by tasks and the latest end offsets fetched from the underlying 
stream.
+ *
+ * <p>
+ * The supervisor fetches task-reported ingested offsets first, then fetches 
end offsets from the stream.
+ * Because these two values are captured at slightly different instants, the 
reported lag
+ * (latestOffsetsFromStream - highestIngestedOffsets) may be slightly larger 
than the actual lag at any
+ * precise moment.
+ *
+ * <p>
+ * By publishing both maps together as a single atomic snapshot (using {@link 
java.util.concurrent.atomic.AtomicReference}),
+ * readers (such as lag metrics and supervisor status) always observe a 
coherent and consistent view.
+ * This produces stable, non-negative lag values over time, avoiding artifacts 
like temporary negative lags.
+ *
+ * <p>
+ * This class is generic and can be reused by all seekable-stream supervisors 
(Kafka, Kinesis, etc.).
+ */
+public final class OffsetSnapshot<PartitionIdType, SequenceOffsetType>
+{
+  private final ImmutableMap<PartitionIdType, SequenceOffsetType> 
highestIngestedOffsets;
+  private final ImmutableMap<PartitionIdType, SequenceOffsetType> 
latestOffsetsFromStream;
+
+  private OffsetSnapshot(
+      @Nullable Map<PartitionIdType, SequenceOffsetType> 
highestIngestedOffsets,
+      @Nullable Map<PartitionIdType, SequenceOffsetType> 
latestOffsetsFromStream
+  )
+  {
+    this.highestIngestedOffsets = toImmutableOffsetMap(highestIngestedOffsets);
+    this.latestOffsetsFromStream = 
toImmutableOffsetMap(latestOffsetsFromStream);
+  }
+
+  public static <PartitionIdType, SequenceOffsetType> 
OffsetSnapshot<PartitionIdType, SequenceOffsetType> of(
+      @Nullable Map<PartitionIdType, SequenceOffsetType> currentOffsets,
+      @Nullable Map<PartitionIdType, SequenceOffsetType> endOffsets
+  )
+  {
+    return new OffsetSnapshot<>(currentOffsets, endOffsets);
+  }
+
+  private ImmutableMap<PartitionIdType, SequenceOffsetType> 
toImmutableOffsetMap(
+      @Nullable Map<PartitionIdType, SequenceOffsetType> input
+  )
+  {
+    if (input == null) {
+      return ImmutableMap.of();
+    }
+
+    return ImmutableMap.copyOf(Maps.filterValues(input, Objects::nonNull));
+  }
+
+  public ImmutableMap<PartitionIdType, SequenceOffsetType> 
getHighestIngestedOffsets()
+  {
+    return highestIngestedOffsets;
+  }
+
+  public ImmutableMap<PartitionIdType, SequenceOffsetType> 
getLatestOffsetsFromStream()
+  {
+    return latestOffsetsFromStream;
+  }
+}
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/OffsetSnapshotTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/OffsetSnapshotTest.java
new file mode 100644
index 00000000000..603d8648ba2
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/OffsetSnapshotTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class OffsetSnapshotTest
+{
+  @Test
+  public void testOffsetSnapshot_emptyInputReturnsEmptyMap()
+  {
+    OffsetSnapshot<String, Long> snapshot = OffsetSnapshot.of(
+        Collections.emptyMap(),
+        Collections.emptyMap()
+    );
+
+    Assert.assertTrue(snapshot.getHighestIngestedOffsets().isEmpty());
+    Assert.assertEquals(ImmutableMap.of(), 
snapshot.getHighestIngestedOffsets());
+    Assert.assertTrue(snapshot.getLatestOffsetsFromStream().isEmpty());
+    Assert.assertEquals(ImmutableMap.of(), 
snapshot.getLatestOffsetsFromStream());
+  }
+
+  @Test
+  public void testOffsetSnapshot_nullInputsReturnEmptyMaps()
+  {
+    OffsetSnapshot<Integer, Long> snapshot = OffsetSnapshot.of(null, null);
+
+    Assert.assertTrue(snapshot.getHighestIngestedOffsets().isEmpty());
+    Assert.assertEquals(ImmutableMap.of(), 
snapshot.getHighestIngestedOffsets());
+    Assert.assertTrue(snapshot.getLatestOffsetsFromStream().isEmpty());
+    Assert.assertEquals(ImmutableMap.of(), 
snapshot.getLatestOffsetsFromStream());
+  }
+
+  @Test
+  public void testOffsetSnapshot_nullCurrentOffsetsReturnsEmptyCurrentMap()
+  {
+    Map<Integer, Long> endOffsets = ImmutableMap.of(0, 100L, 1, 200L);
+
+    OffsetSnapshot<Integer, Long> snapshot = OffsetSnapshot.of(null, 
endOffsets);
+
+    Assert.assertTrue(snapshot.getHighestIngestedOffsets().isEmpty());
+    Assert.assertEquals(ImmutableMap.of(), 
snapshot.getHighestIngestedOffsets());
+    Assert.assertEquals(endOffsets, snapshot.getLatestOffsetsFromStream());
+  }
+
+  @Test
+  public void testOffsetSnapshot_nullEndOffsetsReturnsEmptyEndMap()
+  {
+    Map<Integer, Long> currentOffsets = ImmutableMap.of(0, 50L, 1, 150L);
+
+    OffsetSnapshot<Integer, Long> snapshot = OffsetSnapshot.of(currentOffsets, 
null);
+
+    Assert.assertEquals(currentOffsets, snapshot.getHighestIngestedOffsets());
+    Assert.assertTrue(snapshot.getLatestOffsetsFromStream().isEmpty());
+    Assert.assertEquals(ImmutableMap.of(), 
snapshot.getLatestOffsetsFromStream());
+  }
+
+  @Test
+  public void testOffsetSnapshot_copiesInputMapsAndReturnsImmutableCopies()
+  {
+    Map<String, String> current = new HashMap<>();
+    current.put("p0", "100");
+    current.put("p1", "200");
+
+    Map<String, String> end = new HashMap<>();
+    end.put("p0", "150");
+    end.put("p2", "300");
+
+    OffsetSnapshot<String, String> snapshot = OffsetSnapshot.of(current, end);
+
+    Assert.assertEquals(Map.of("p0", "100", "p1", "200"), 
snapshot.getHighestIngestedOffsets());
+    Assert.assertEquals(Map.of("p0", "150", "p2", "300"), 
snapshot.getLatestOffsetsFromStream());
+  }
+
+  @Test
+  public void testOffsetSnapshot_filtersOutNullValues()
+  {
+    Map<Integer, Long> current = new HashMap<>();
+    current.put(0, 100L);
+    current.put(1, null);   // should be filtered
+    current.put(2, 200L);
+
+    Map<Integer, Long> end = new HashMap<>();
+    end.put(0, null);       // should be filtered
+    end.put(1, 300L);
+
+    OffsetSnapshot<Integer, Long> snapshot = OffsetSnapshot.of(current, end);
+
+    Assert.assertEquals(Map.of(0, 100L, 2, 200L), 
snapshot.getHighestIngestedOffsets());
+    Assert.assertEquals(2, snapshot.getHighestIngestedOffsets().size());
+
+    Assert.assertEquals(Map.of(1, 300L), 
snapshot.getLatestOffsetsFromStream());
+    Assert.assertEquals(1, snapshot.getLatestOffsetsFromStream().size());
+  }
+
+  @Test
+  public void testOffsetSnapshot_snapshotIndependentFromInputMapMutations()
+  {
+    Map<Integer, Long> inputCurrent = new HashMap<>();
+    inputCurrent.put(1, 10L);
+    Map<Integer, Long> inputEnd = new HashMap<>();
+    inputEnd.put(2, 20L);
+
+    OffsetSnapshot<Integer, Long> snapshot = OffsetSnapshot.of(inputCurrent, 
inputEnd);
+
+    // Mutate inputs after snapshot creation
+    inputCurrent.clear();
+    inputEnd.put(999, 999L);
+
+    Assert.assertEquals(Map.of(1, 10L), snapshot.getHighestIngestedOffsets());
+    Assert.assertEquals(Map.of(2, 20L), snapshot.getLatestOffsetsFromStream());
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to