This is an automated email from the ASF dual-hosted git repository.

heejong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 048cd2e  [BEAM-13015] Simplify fake state API testing client and make 
it consistent with chunk sizing even after appends.
     new 431ee4d  Merge pull request #16131 from lukecwik/beam13015.4
048cd2e is described below

commit 048cd2ee05db2fcdeb4d9e9dad598a7aea488aaf
Author: Luke Cwik <[email protected]>
AuthorDate: Fri Dec 3 11:05:51 2021 -0800

    [BEAM-13015] Simplify fake state API testing client and make it consistent 
with chunk sizing even after appends.
---
 .../fn/harness/state/FakeBeamFnStateClient.java    | 59 ++++++----------------
 1 file changed, 15 insertions(+), 44 deletions(-)

diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
index 04f05ae..1a99b7e 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
@@ -20,9 +20,6 @@ package org.apache.beam.fn.harness.state;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -35,11 +32,11 @@ import 
org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest.RequestCase;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
 import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
 
 /** A fake implementation of a {@link BeamFnStateClient} to aid with testing. 
*/
 public class FakeBeamFnStateClient implements BeamFnStateClient {
-  private final Map<StateKey, List<ByteString>> data;
+  private final Map<StateKey, ByteString> data;
+  private final int chunkSize;
   private int currentId;
 
   public FakeBeamFnStateClient(Map<StateKey, ByteString> initialData) {
@@ -47,34 +44,16 @@ public class FakeBeamFnStateClient implements 
BeamFnStateClient {
   }
 
   public FakeBeamFnStateClient(Map<StateKey, ByteString> initialData, int 
chunkSize) {
-    this.data =
-        new ConcurrentHashMap<>(
-            Maps.transformValues(
-                initialData,
-                (ByteString all) -> {
-                  List<ByteString> chunks = new ArrayList<>();
-                  for (int i = 0; i < Math.max(1, all.size()); i += chunkSize) 
{
-                    chunks.add(all.substring(i, Math.min(all.size(), i + 
chunkSize)));
-                  }
-                  return chunks;
-                }));
+    this.data = new ConcurrentHashMap<>(initialData);
+    this.chunkSize = chunkSize;
   }
 
   public Map<StateKey, ByteString> getData() {
-    return Maps.transformValues(
-        data,
-        bs -> {
-          ByteString all = ByteString.EMPTY;
-          for (ByteString b : bs) {
-            all = all.concat(b);
-          }
-          return all;
-        });
+    return data;
   }
 
   @Override
   public CompletableFuture<StateResponse> handle(StateRequest.Builder 
requestBuilder) {
-
     // The id should never be filled out
     assertEquals("", requestBuilder.getId());
     requestBuilder.setId(generateId());
@@ -92,16 +71,17 @@ public class FakeBeamFnStateClient implements 
BeamFnStateClient {
 
     switch (request.getRequestCase()) {
       case GET:
-        // Chunk gets into 6 byte return blocks
-        List<ByteString> byteStrings =
-            data.getOrDefault(request.getStateKey(), 
Collections.singletonList(ByteString.EMPTY));
+        // Chunk gets into chunkSize blocks
+        ByteString byteString = data.getOrDefault(request.getStateKey(), 
ByteString.EMPTY);
         int block = 0;
         if (request.getGet().getContinuationToken().size() > 0) {
           block = 
Integer.parseInt(request.getGet().getContinuationToken().toStringUtf8());
         }
-        ByteString returnBlock = byteStrings.get(block);
+        ByteString returnBlock =
+            byteString.substring(
+                block * chunkSize, Math.min(byteString.size(), (block + 1) * 
chunkSize));
         ByteString continuationToken = ByteString.EMPTY;
-        if (byteStrings.size() > block + 1) {
+        if ((block + 1) * chunkSize < byteString.size()) {
           continuationToken = ByteString.copyFromUtf8(Integer.toString(block + 
1));
         }
         response =
@@ -118,18 +98,10 @@ public class FakeBeamFnStateClient implements 
BeamFnStateClient {
         break;
 
       case APPEND:
-        List<ByteString> previousValue =
-            data.getOrDefault(request.getStateKey(), 
Collections.singletonList(ByteString.EMPTY));
-        List<ByteString> newValue = new ArrayList<>();
-        newValue.addAll(previousValue);
-        ByteString newData = request.getAppend().getData();
-        if (newData.size() % 2 == 0) {
-          newValue.remove(newValue.size() - 1);
-          newValue.add(previousValue.get(previousValue.size() - 
1).concat(newData));
-        } else {
-          newValue.add(newData);
-        }
-        data.put(request.getStateKey(), newValue);
+        data.put(
+            request.getStateKey(),
+            data.getOrDefault(request.getStateKey(), ByteString.EMPTY)
+                .concat(request.getAppend().getData()));
         response = 
StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance());
         break;
 
@@ -138,7 +110,6 @@ public class FakeBeamFnStateClient implements 
BeamFnStateClient {
             String.format("Unknown request type %s", 
request.getRequestCase()));
     }
 
-    CompletableFuture<StateResponse> responseFuture = new 
CompletableFuture<>();
     return 
CompletableFuture.completedFuture(response.setId(requestBuilder.getId()).build());
   }
 

Reply via email to