This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a4cbe2074f Additional test for windmill OrderedListState 
implementation. (#29970)
5a4cbe2074f is described below

commit 5a4cbe2074f8d0eccdec620d63bda6ecc709b7c4
Author: Sam Whittle <[email protected]>
AuthorDate: Wed Jan 17 10:13:08 2024 +0100

    Additional test for windmill OrderedListState implementation. (#29970)
    
    These were attempting to reproduce a bug that ended up being in the test 
pipeline,
    but they seem worthwhile keeping to improve coverage.
---
 .../windmill/state/WindmillStateInternalsTest.java | 108 ++++++++++++++++++++-
 1 file changed, 107 insertions(+), 1 deletion(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
index d2590ceb846..e8eeff3b1d1 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
@@ -47,6 +47,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
@@ -1871,19 +1872,64 @@ public class WindmillStateInternalsTest {
     assertThat(orderedList.read(), Matchers.contains(goodbyeValue, worldValue, 
helloValue));
   }
 
+  @Test
+  public void testOrderedListAddBeforeRangeRead() throws Exception {
+    StateTag<OrderedListState<String>> addr =
+        StateTags.orderedList("orderedList", StringUtf8Coder.of());
+    OrderedListState<String> orderedList = underTest.state(NAMESPACE, addr);
+
+    SettableFuture<Iterable<TimestampedValue<String>>> future = 
SettableFuture.create();
+    Range<Long> readSubrange = Range.closedOpen(70 * 1000L, 100 * 1000L);
+    when(mockReader.orderedListFuture(
+            readSubrange, key(NAMESPACE, "orderedList"), STATE_FAMILY, 
StringUtf8Coder.of()))
+        .thenReturn(future);
+
+    orderedList.readRangeLater(Instant.ofEpochMilli(70), 
Instant.ofEpochMilli(100));
+
+    final TimestampedValue<String> helloValue =
+        TimestampedValue.of("hello", Instant.ofEpochMilli(100));
+    final TimestampedValue<String> worldValue =
+        TimestampedValue.of("world", Instant.ofEpochMilli(75));
+    final TimestampedValue<String> goodbyeValue =
+        TimestampedValue.of("goodbye", Instant.ofEpochMilli(50));
+
+    orderedList.add(helloValue);
+    waitAndSet(future, Collections.singletonList(worldValue), 200);
+    orderedList.add(goodbyeValue);
+
+    assertThat(
+        orderedList.readRange(Instant.ofEpochMilli(70), 
Instant.ofEpochMilli(100)),
+        Matchers.contains(worldValue));
+  }
+
   @Test
   public void testOrderedListClearBeforeRead() throws Exception {
     StateTag<OrderedListState<String>> addr =
         StateTags.orderedList("orderedList", StringUtf8Coder.of());
     OrderedListState<String> orderedListState = underTest.state(NAMESPACE, 
addr);
 
-    final TimestampedValue<String> helloElement = TimestampedValue.of("hello", 
Instant.EPOCH);
+    final TimestampedValue<String> helloElement =
+        TimestampedValue.of("hello", Instant.ofEpochSecond(1));
     orderedListState.clear();
     orderedListState.add(helloElement);
     assertThat(orderedListState.read(), 
Matchers.containsInAnyOrder(helloElement));
+    // Shouldn't need to read from windmill for this.
+    Mockito.verifyZeroInteractions(mockReader);
 
+    assertThat(
+        orderedListState.readRange(Instant.ofEpochSecond(1), 
Instant.ofEpochSecond(2)),
+        Matchers.containsInAnyOrder(helloElement));
     // Shouldn't need to read from windmill for this.
     Mockito.verifyZeroInteractions(mockReader);
+
+    // Shouldn't need to read from windmill for this.
+    assertThat(
+        orderedListState.readRange(Instant.ofEpochSecond(100), 
Instant.ofEpochSecond(200)),
+        Matchers.emptyIterable());
+    assertThat(
+        orderedListState.readRange(Instant.EPOCH, Instant.ofEpochSecond(1)),
+        Matchers.emptyIterable());
+    Mockito.verifyZeroInteractions(mockReader);
   }
 
   @Test
@@ -2201,6 +2247,66 @@ public class WindmillStateInternalsTest {
     assertArrayEquals(expected, read);
   }
 
+  @Test
+  public void testOrderedListInterleavedLocalAddClearReadRange() {
+    Future<Map<Range<Instant>, RangeSet<Long>>> orderedListFuture = 
Futures.immediateFuture(null);
+    Future<Map<Range<Instant>, RangeSet<Instant>>> deletionsFuture = 
Futures.immediateFuture(null);
+    when(mockReader.valueFuture(
+            systemKey(NAMESPACE, "orderedList" + IdTracker.IDS_AVAILABLE_STR),
+            STATE_FAMILY,
+            IdTracker.IDS_AVAILABLE_CODER))
+        .thenReturn(orderedListFuture);
+    when(mockReader.valueFuture(
+            systemKey(NAMESPACE, "orderedList" + IdTracker.DELETIONS_STR),
+            STATE_FAMILY,
+            IdTracker.SUBRANGE_DELETIONS_CODER))
+        .thenReturn(deletionsFuture);
+
+    SettableFuture<Iterable<TimestampedValue<String>>> fromStorage = 
SettableFuture.create();
+
+    Range<Long> readSubrange = Range.closedOpen(1 * 1000000L, 8 * 1000000L);
+    when(mockReader.orderedListFuture(
+            readSubrange, key(NAMESPACE, "orderedList"), STATE_FAMILY, 
StringUtf8Coder.of()))
+        .thenReturn(fromStorage);
+
+    StateTag<OrderedListState<String>> addr =
+        StateTags.orderedList("orderedList", StringUtf8Coder.of());
+    OrderedListState<String> orderedListState = underTest.state(NAMESPACE, 
addr);
+
+    orderedListState.add(TimestampedValue.of("1", Instant.ofEpochSecond(1)));
+    orderedListState.add(TimestampedValue.of("2", Instant.ofEpochSecond(2)));
+    orderedListState.add(TimestampedValue.of("3", Instant.ofEpochSecond(3)));
+    orderedListState.add(TimestampedValue.of("4", Instant.ofEpochSecond(4)));
+
+    orderedListState.clearRange(Instant.ofEpochSecond(1), 
Instant.ofEpochSecond(4));
+
+    orderedListState.add(TimestampedValue.of("5", Instant.ofEpochSecond(5)));
+    orderedListState.add(TimestampedValue.of("6", Instant.ofEpochSecond(6)));
+
+    orderedListState.add(TimestampedValue.of("3_again", 
Instant.ofEpochSecond(3)));
+
+    orderedListState.add(TimestampedValue.of("7", Instant.ofEpochSecond(7)));
+    orderedListState.add(TimestampedValue.of("8", Instant.ofEpochSecond(8)));
+
+    fromStorage.set(ImmutableList.<TimestampedValue<String>>of());
+
+    TimestampedValue[] expected =
+        Iterables.toArray(
+            ImmutableList.of(
+                TimestampedValue.of("3_again", Instant.ofEpochSecond(3)),
+                TimestampedValue.of("4", Instant.ofEpochSecond(4)),
+                TimestampedValue.of("5", Instant.ofEpochSecond(5)),
+                TimestampedValue.of("6", Instant.ofEpochSecond(6)),
+                TimestampedValue.of("7", Instant.ofEpochSecond(7))),
+            TimestampedValue.class);
+
+    TimestampedValue[] read =
+        Iterables.toArray(
+            orderedListState.readRange(Instant.ofEpochSecond(1), 
Instant.ofEpochSecond(8)),
+            TimestampedValue.class);
+    assertArrayEquals(expected, read);
+  }
+
   @Test
   public void testOrderedListPersistEmpty() throws Exception {
     StateTag<OrderedListState<String>> addr =

Reply via email to