This is an automated email from the ASF dual-hosted git repository.
heejong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 048cd2e [BEAM-13015] Simplify fake state API testing client and make
it consistent with chunk sizing even after appends.
new 431ee4d Merge pull request #16131 from lukecwik/beam13015.4
048cd2e is described below
commit 048cd2ee05db2fcdeb4d9e9dad598a7aea488aaf
Author: Luke Cwik <[email protected]>
AuthorDate: Fri Dec 3 11:05:51 2021 -0800
[BEAM-13015] Simplify fake state API testing client and make it consistent
with chunk sizing even after appends.
---
.../fn/harness/state/FakeBeamFnStateClient.java | 59 ++++++----------------
1 file changed, 15 insertions(+), 44 deletions(-)
diff --git
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
index 04f05ae..1a99b7e 100644
---
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
+++
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
@@ -20,9 +20,6 @@ package org.apache.beam.fn.harness.state;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -35,11 +32,11 @@ import
org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest.RequestCase;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
/** A fake implementation of a {@link BeamFnStateClient} to aid with testing.
*/
public class FakeBeamFnStateClient implements BeamFnStateClient {
- private final Map<StateKey, List<ByteString>> data;
+ private final Map<StateKey, ByteString> data;
+ private final int chunkSize;
private int currentId;
public FakeBeamFnStateClient(Map<StateKey, ByteString> initialData) {
@@ -47,34 +44,16 @@ public class FakeBeamFnStateClient implements
BeamFnStateClient {
}
public FakeBeamFnStateClient(Map<StateKey, ByteString> initialData, int
chunkSize) {
- this.data =
- new ConcurrentHashMap<>(
- Maps.transformValues(
- initialData,
- (ByteString all) -> {
- List<ByteString> chunks = new ArrayList<>();
- for (int i = 0; i < Math.max(1, all.size()); i += chunkSize)
{
- chunks.add(all.substring(i, Math.min(all.size(), i +
chunkSize)));
- }
- return chunks;
- }));
+ this.data = new ConcurrentHashMap<>(initialData);
+ this.chunkSize = chunkSize;
}
public Map<StateKey, ByteString> getData() {
- return Maps.transformValues(
- data,
- bs -> {
- ByteString all = ByteString.EMPTY;
- for (ByteString b : bs) {
- all = all.concat(b);
- }
- return all;
- });
+ return data;
}
@Override
public CompletableFuture<StateResponse> handle(StateRequest.Builder
requestBuilder) {
-
// The id should never be filled out
assertEquals("", requestBuilder.getId());
requestBuilder.setId(generateId());
@@ -92,16 +71,17 @@ public class FakeBeamFnStateClient implements
BeamFnStateClient {
switch (request.getRequestCase()) {
case GET:
- // Chunk gets into 6 byte return blocks
- List<ByteString> byteStrings =
- data.getOrDefault(request.getStateKey(),
Collections.singletonList(ByteString.EMPTY));
+ // Chunk gets into chunkSize blocks
+ ByteString byteString = data.getOrDefault(request.getStateKey(),
ByteString.EMPTY);
int block = 0;
if (request.getGet().getContinuationToken().size() > 0) {
block =
Integer.parseInt(request.getGet().getContinuationToken().toStringUtf8());
}
- ByteString returnBlock = byteStrings.get(block);
+ ByteString returnBlock =
+ byteString.substring(
+ block * chunkSize, Math.min(byteString.size(), (block + 1) *
chunkSize));
ByteString continuationToken = ByteString.EMPTY;
- if (byteStrings.size() > block + 1) {
+ if ((block + 1) * chunkSize < byteString.size()) {
continuationToken = ByteString.copyFromUtf8(Integer.toString(block +
1));
}
response =
@@ -118,18 +98,10 @@ public class FakeBeamFnStateClient implements
BeamFnStateClient {
break;
case APPEND:
- List<ByteString> previousValue =
- data.getOrDefault(request.getStateKey(),
Collections.singletonList(ByteString.EMPTY));
- List<ByteString> newValue = new ArrayList<>();
- newValue.addAll(previousValue);
- ByteString newData = request.getAppend().getData();
- if (newData.size() % 2 == 0) {
- newValue.remove(newValue.size() - 1);
- newValue.add(previousValue.get(previousValue.size() -
1).concat(newData));
- } else {
- newValue.add(newData);
- }
- data.put(request.getStateKey(), newValue);
+ data.put(
+ request.getStateKey(),
+ data.getOrDefault(request.getStateKey(), ByteString.EMPTY)
+ .concat(request.getAppend().getData()));
response =
StateResponse.newBuilder().setAppend(StateAppendResponse.getDefaultInstance());
break;
@@ -138,7 +110,6 @@ public class FakeBeamFnStateClient implements
BeamFnStateClient {
String.format("Unknown request type %s",
request.getRequestCase()));
}
- CompletableFuture<StateResponse> responseFuture = new
CompletableFuture<>();
return
CompletableFuture.completedFuture(response.setId(requestBuilder.getId()).build());
}