This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5a4cbe2074f Additional test for windmill OrderedListState
implementation. (#29970)
5a4cbe2074f is described below
commit 5a4cbe2074f8d0eccdec620d63bda6ecc709b7c4
Author: Sam Whittle <[email protected]>
AuthorDate: Wed Jan 17 10:13:08 2024 +0100
Additional test for windmill OrderedListState implementation. (#29970)
These were attempting to reproduce a bug that ended up being in the test
pipeline,
but they seem worthwhile keeping to improve coverage.
---
.../windmill/state/WindmillStateInternalsTest.java | 108 ++++++++++++++++++++-
1 file changed, 107 insertions(+), 1 deletion(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
index d2590ceb846..e8eeff3b1d1 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
@@ -47,6 +47,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
@@ -1871,19 +1872,64 @@ public class WindmillStateInternalsTest {
assertThat(orderedList.read(), Matchers.contains(goodbyeValue, worldValue,
helloValue));
}
+ @Test
+ public void testOrderedListAddBeforeRangeRead() throws Exception {
+ StateTag<OrderedListState<String>> addr =
+ StateTags.orderedList("orderedList", StringUtf8Coder.of());
+ OrderedListState<String> orderedList = underTest.state(NAMESPACE, addr);
+
+ SettableFuture<Iterable<TimestampedValue<String>>> future =
SettableFuture.create();
+ Range<Long> readSubrange = Range.closedOpen(70 * 1000L, 100 * 1000L);
+ when(mockReader.orderedListFuture(
+ readSubrange, key(NAMESPACE, "orderedList"), STATE_FAMILY,
StringUtf8Coder.of()))
+ .thenReturn(future);
+
+ orderedList.readRangeLater(Instant.ofEpochMilli(70),
Instant.ofEpochMilli(100));
+
+ final TimestampedValue<String> helloValue =
+ TimestampedValue.of("hello", Instant.ofEpochMilli(100));
+ final TimestampedValue<String> worldValue =
+ TimestampedValue.of("world", Instant.ofEpochMilli(75));
+ final TimestampedValue<String> goodbyeValue =
+ TimestampedValue.of("goodbye", Instant.ofEpochMilli(50));
+
+ orderedList.add(helloValue);
+ waitAndSet(future, Collections.singletonList(worldValue), 200);
+ orderedList.add(goodbyeValue);
+
+ assertThat(
+ orderedList.readRange(Instant.ofEpochMilli(70),
Instant.ofEpochMilli(100)),
+ Matchers.contains(worldValue));
+ }
+
@Test
public void testOrderedListClearBeforeRead() throws Exception {
StateTag<OrderedListState<String>> addr =
StateTags.orderedList("orderedList", StringUtf8Coder.of());
OrderedListState<String> orderedListState = underTest.state(NAMESPACE,
addr);
- final TimestampedValue<String> helloElement = TimestampedValue.of("hello",
Instant.EPOCH);
+ final TimestampedValue<String> helloElement =
+ TimestampedValue.of("hello", Instant.ofEpochSecond(1));
orderedListState.clear();
orderedListState.add(helloElement);
assertThat(orderedListState.read(),
Matchers.containsInAnyOrder(helloElement));
+ // Shouldn't need to read from windmill for this.
+ Mockito.verifyZeroInteractions(mockReader);
+ assertThat(
+ orderedListState.readRange(Instant.ofEpochSecond(1),
Instant.ofEpochSecond(2)),
+ Matchers.containsInAnyOrder(helloElement));
// Shouldn't need to read from windmill for this.
Mockito.verifyZeroInteractions(mockReader);
+
+ // Shouldn't need to read from windmill for this.
+ assertThat(
+ orderedListState.readRange(Instant.ofEpochSecond(100),
Instant.ofEpochSecond(200)),
+ Matchers.emptyIterable());
+ assertThat(
+ orderedListState.readRange(Instant.EPOCH, Instant.ofEpochSecond(1)),
+ Matchers.emptyIterable());
+ Mockito.verifyZeroInteractions(mockReader);
}
@Test
@@ -2201,6 +2247,66 @@ public class WindmillStateInternalsTest {
assertArrayEquals(expected, read);
}
+ @Test
+ public void testOrderedListInterleavedLocalAddClearReadRange() {
+ Future<Map<Range<Instant>, RangeSet<Long>>> orderedListFuture =
Futures.immediateFuture(null);
+ Future<Map<Range<Instant>, RangeSet<Instant>>> deletionsFuture =
Futures.immediateFuture(null);
+ when(mockReader.valueFuture(
+ systemKey(NAMESPACE, "orderedList" + IdTracker.IDS_AVAILABLE_STR),
+ STATE_FAMILY,
+ IdTracker.IDS_AVAILABLE_CODER))
+ .thenReturn(orderedListFuture);
+ when(mockReader.valueFuture(
+ systemKey(NAMESPACE, "orderedList" + IdTracker.DELETIONS_STR),
+ STATE_FAMILY,
+ IdTracker.SUBRANGE_DELETIONS_CODER))
+ .thenReturn(deletionsFuture);
+
+ SettableFuture<Iterable<TimestampedValue<String>>> fromStorage =
SettableFuture.create();
+
+ Range<Long> readSubrange = Range.closedOpen(1 * 1000000L, 8 * 1000000L);
+ when(mockReader.orderedListFuture(
+ readSubrange, key(NAMESPACE, "orderedList"), STATE_FAMILY,
StringUtf8Coder.of()))
+ .thenReturn(fromStorage);
+
+ StateTag<OrderedListState<String>> addr =
+ StateTags.orderedList("orderedList", StringUtf8Coder.of());
+ OrderedListState<String> orderedListState = underTest.state(NAMESPACE,
addr);
+
+ orderedListState.add(TimestampedValue.of("1", Instant.ofEpochSecond(1)));
+ orderedListState.add(TimestampedValue.of("2", Instant.ofEpochSecond(2)));
+ orderedListState.add(TimestampedValue.of("3", Instant.ofEpochSecond(3)));
+ orderedListState.add(TimestampedValue.of("4", Instant.ofEpochSecond(4)));
+
+ orderedListState.clearRange(Instant.ofEpochSecond(1),
Instant.ofEpochSecond(4));
+
+ orderedListState.add(TimestampedValue.of("5", Instant.ofEpochSecond(5)));
+ orderedListState.add(TimestampedValue.of("6", Instant.ofEpochSecond(6)));
+
+ orderedListState.add(TimestampedValue.of("3_again",
Instant.ofEpochSecond(3)));
+
+ orderedListState.add(TimestampedValue.of("7", Instant.ofEpochSecond(7)));
+ orderedListState.add(TimestampedValue.of("8", Instant.ofEpochSecond(8)));
+
+ fromStorage.set(ImmutableList.<TimestampedValue<String>>of());
+
+ TimestampedValue[] expected =
+ Iterables.toArray(
+ ImmutableList.of(
+ TimestampedValue.of("3_again", Instant.ofEpochSecond(3)),
+ TimestampedValue.of("4", Instant.ofEpochSecond(4)),
+ TimestampedValue.of("5", Instant.ofEpochSecond(5)),
+ TimestampedValue.of("6", Instant.ofEpochSecond(6)),
+ TimestampedValue.of("7", Instant.ofEpochSecond(7))),
+ TimestampedValue.class);
+
+ TimestampedValue[] read =
+ Iterables.toArray(
+ orderedListState.readRange(Instant.ofEpochSecond(1),
Instant.ofEpochSecond(8)),
+ TimestampedValue.class);
+ assertArrayEquals(expected, read);
+ }
+
@Test
public void testOrderedListPersistEmpty() throws Exception {
StateTag<OrderedListState<String>> addr =