zhengbuqian commented on code in PR #23492:
URL: https://github.com/apache/beam/pull/23492#discussion_r1176202763
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillStateInternalsTest.java:
##########
@@ -1812,6 +1755,62 @@ true, key(NAMESPACE, tag), STATE_FAMILY,
VarIntCoder.of()))
assertTrue(entryUpdate.getDeleteAll());
}
+ @Test
+ public void testMultimapFuzzTest() {
+ final String tag = "multimap";
+ StateTag<MultimapState<String, Integer>> addr =
+ StateTags.multimap(tag, StringUtf8Coder.of(), VarIntCoder.of());
+ MultimapState<String, Integer> multimapState = underTest.state(NAMESPACE,
addr);
+
+ SettableFuture<Iterable<Map.Entry<ByteString, Iterable<Integer>>>>
entriesFuture =
+ SettableFuture.create();
+ when(mockReader.multimapFetchAllFuture(
+ false, key(NAMESPACE, tag), STATE_FAMILY, VarIntCoder.of()))
+ .thenReturn(entriesFuture);
+
+ // to set up the multimap as cache complete
+ waitAndSet(entriesFuture, Collections.emptyList(), 30);
+ multimapState.entries().read();
+
+ Multimap<String, Integer> mirror = ArrayListMultimap.create();
+
+ Random rand = new Random();
+
+ final int ROUNDS = 100;
+ final int OPS_PER_ROUND = 2000;
+ final int NUM_KEY = 20;
+ for (int i = 0; i < ROUNDS; i++) {
+ for (int j = 0; j < OPS_PER_ROUND; j++) {
+ int op = rand.nextInt(100);
+ String key = "key" + rand.nextInt(NUM_KEY);
+ if (op < 50) {
+ // 50% add operation
+ Integer value = rand.nextInt();
+ multimapState.put(key, value);
+ mirror.put(key, value);
+ } else if (op < 95) {
+ // 45% remove key operation
+ multimapState.remove(key);
+ mirror.removeAll(key);
+ } else {
+ // 5% clear operation
+ multimapState.clear();
+ mirror.clear();
+ }
+ }
+ Iterable<String> read = multimapState.keys().read();
+ Set<String> bytes = mirror.keySet();
+ assertThat(read, Matchers.containsInAnyOrder(bytes.toArray()));
+ for (String key : multimapState.keys().read()) {
+ assertThat(
+ multimapState.get(key).read(),
Matchers.containsInAnyOrder(mirror.get(key).toArray()));
+ }
+ Windmill.WorkItemCommitRequest.Builder commitBuilder =
+ Windmill.WorkItemCommitRequest.newBuilder();
+ underTest.persist(commitBuilder);
Review Comment:
Done. Now it clears cache after 100 rounds, and ran another 100 rounds. In
between it calls `keys()` and `get()` to rebuild cache.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]