TheNeuralBit commented on a change in pull request #16230:
URL: https://github.com/apache/beam/pull/16230#discussion_r770063300
##########
File path:
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
##########
@@ -31,25 +36,82 @@
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest.RequestCase;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
/** A fake implementation of a {@link BeamFnStateClient} to aid with testing.
*/
public class FakeBeamFnStateClient implements BeamFnStateClient {
- private final Map<StateKey, ByteString> data;
- private final int chunkSize;
+ private static final int DEFAULT_CHUNK_SIZE = 6;
+ private final Map<StateKey, List<ByteString>> data;
private int currentId;
- public FakeBeamFnStateClient(Map<StateKey, ByteString> initialData) {
- this(initialData, 6);
+ public <V> FakeBeamFnStateClient(Coder<V> valueCoder, Map<StateKey, List<V>>
initialData) {
+ this(valueCoder, initialData, DEFAULT_CHUNK_SIZE);
}
- public FakeBeamFnStateClient(Map<StateKey, ByteString> initialData, int
chunkSize) {
- this.data = new ConcurrentHashMap<>(initialData);
- this.chunkSize = chunkSize;
+ public <V> FakeBeamFnStateClient(
+ Coder<V> valueCoder, Map<StateKey, List<V>> initialData, int chunkSize) {
+ this(Maps.transformValues(initialData, (value) -> KV.of(valueCoder,
value)), chunkSize);
+ }
+
+ public FakeBeamFnStateClient(Map<StateKey, KV<Coder<?>, List<?>>>
initialData) {
Review comment:
It's confusing to use a `KV` here, could you just make a quick
`@AutoValue CoderAndData<T> ` instead?
##########
File path:
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
##########
@@ -31,25 +36,82 @@
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest.RequestCase;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
/** A fake implementation of a {@link BeamFnStateClient} to aid with testing.
*/
public class FakeBeamFnStateClient implements BeamFnStateClient {
- private final Map<StateKey, ByteString> data;
- private final int chunkSize;
+ private static final int DEFAULT_CHUNK_SIZE = 6;
+ private final Map<StateKey, List<ByteString>> data;
private int currentId;
- public FakeBeamFnStateClient(Map<StateKey, ByteString> initialData) {
- this(initialData, 6);
+ public <V> FakeBeamFnStateClient(Coder<V> valueCoder, Map<StateKey, List<V>>
initialData) {
+ this(valueCoder, initialData, DEFAULT_CHUNK_SIZE);
}
- public FakeBeamFnStateClient(Map<StateKey, ByteString> initialData, int
chunkSize) {
- this.data = new ConcurrentHashMap<>(initialData);
- this.chunkSize = chunkSize;
+ public <V> FakeBeamFnStateClient(
+ Coder<V> valueCoder, Map<StateKey, List<V>> initialData, int chunkSize) {
+ this(Maps.transformValues(initialData, (value) -> KV.of(valueCoder,
value)), chunkSize);
+ }
+
+ public FakeBeamFnStateClient(Map<StateKey, KV<Coder<?>, List<?>>>
initialData) {
+ this(initialData, DEFAULT_CHUNK_SIZE);
+ }
+
+ public FakeBeamFnStateClient(Map<StateKey, KV<Coder<?>, List<?>>>
initialData, int chunkSize) {
+ Map<StateKey, List<ByteString>> encodedData =
+ new HashMap<>(
+ Maps.transformValues(
+ initialData,
+ (KV<Coder<?>, List<?>> coderAndValues) -> {
+ List<ByteString> chunks = new ArrayList<>();
+ ByteString.Output output = ByteString.newOutput();
+ for (Object value : coderAndValues.getValue()) {
+ try {
+ ((Coder<Object>) coderAndValues.getKey()).encode(value,
output);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ if (output.size() >= chunkSize) {
+ ByteString chunk = output.toByteString();
+ int i = 0;
+ for (; i + chunkSize <= chunk.size(); i += chunkSize) {
+ // We specifically use a copy of the bytes instead of
a proper substring
+ // so that debugging is easier since we don't have to
worry about the
+ // substring being a view over the original string.
+ chunks.add(
+ ByteString.copyFrom(chunk.substring(i, i +
chunkSize).toByteArray()));
+ }
+ if (i < chunk.size()) {
+ chunks.add(
+ ByteString.copyFrom(chunk.substring(i,
chunk.size()).toByteArray()));
+ }
+ output.reset();
+ }
+ }
+ // Add the last chunk
+ if (output.size() > 0) {
+ chunks.add(output.toByteString());
+ }
+ return chunks;
+ }));
+ this.data =
+ new ConcurrentHashMap<>(
+ Maps.filterValues(encodedData, byteStrings ->
!byteStrings.isEmpty()));
Review comment:
nit: consider doing this work in a factory method rather than in a
constructor.
I think the reasoning behind this has to do with testability though. Since
this is a test utility maybe thats a moot point.
##########
File path:
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
##########
@@ -301,14 +303,17 @@ public void testUsingUserState() throws Exception {
assertThat(mainOutputValues, empty());
assertEquals(
- ImmutableMap.<StateKey, ByteString>builder()
- .put(bagUserStateKey("value", "X"), encode("X2"))
- .put(bagUserStateKey("bag", "X"), encode("X0", "X1", "X2"))
- .put(bagUserStateKey("combine", "X"), encode("X0X1X2"))
- .put(bagUserStateKey("value", "Y"), encode("Y2"))
- .put(bagUserStateKey("bag", "Y"), encode("Y1", "Y2"))
- .put(bagUserStateKey("combine", "Y"), encode("Y1Y2"))
- .build(),
+ new FakeBeamFnStateClient(
+ StringUtf8Coder.of(),
+ ImmutableMap.<StateKey, List<String>>builder()
+ .put(bagUserStateKey("value", "X"), asList("X2"))
+ .put(bagUserStateKey("bag", "X"), asList("X0", "X1",
"X2"))
+ .put(bagUserStateKey("combine", "X"), asList("X0X1X2"))
+ .put(bagUserStateKey("value", "Y"), asList("Y2"))
+ .put(bagUserStateKey("bag", "Y"), asList("Y1", "Y2"))
+ .put(bagUserStateKey("combine", "Y"), asList("Y1Y2"))
+ .build())
+ .getData(),
Review comment:
Wouldn't it be preferable to keep the old assertion, since it's
explicitly constructing the expected value?
##########
File path:
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
##########
@@ -31,25 +36,82 @@
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest.RequestCase;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
/** A fake implementation of a {@link BeamFnStateClient} to aid with testing.
*/
public class FakeBeamFnStateClient implements BeamFnStateClient {
- private final Map<StateKey, ByteString> data;
- private final int chunkSize;
+ private static final int DEFAULT_CHUNK_SIZE = 6;
+ private final Map<StateKey, List<ByteString>> data;
private int currentId;
- public FakeBeamFnStateClient(Map<StateKey, ByteString> initialData) {
- this(initialData, 6);
+ public <V> FakeBeamFnStateClient(Coder<V> valueCoder, Map<StateKey, List<V>>
initialData) {
+ this(valueCoder, initialData, DEFAULT_CHUNK_SIZE);
}
- public FakeBeamFnStateClient(Map<StateKey, ByteString> initialData, int
chunkSize) {
- this.data = new ConcurrentHashMap<>(initialData);
- this.chunkSize = chunkSize;
+ public <V> FakeBeamFnStateClient(
+ Coder<V> valueCoder, Map<StateKey, List<V>> initialData, int chunkSize) {
+ this(Maps.transformValues(initialData, (value) -> KV.of(valueCoder,
value)), chunkSize);
+ }
+
+ public FakeBeamFnStateClient(Map<StateKey, KV<Coder<?>, List<?>>>
initialData) {
+ this(initialData, DEFAULT_CHUNK_SIZE);
+ }
+
+ public FakeBeamFnStateClient(Map<StateKey, KV<Coder<?>, List<?>>>
initialData, int chunkSize) {
+ Map<StateKey, List<ByteString>> encodedData =
+ new HashMap<>(
+ Maps.transformValues(
+ initialData,
+ (KV<Coder<?>, List<?>> coderAndValues) -> {
+ List<ByteString> chunks = new ArrayList<>();
+ ByteString.Output output = ByteString.newOutput();
+ for (Object value : coderAndValues.getValue()) {
+ try {
+ ((Coder<Object>) coderAndValues.getKey()).encode(value,
output);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ if (output.size() >= chunkSize) {
+ ByteString chunk = output.toByteString();
+ int i = 0;
+ for (; i + chunkSize <= chunk.size(); i += chunkSize) {
+ // We specifically use a copy of the bytes instead of
a proper substring
+ // so that debugging is easier since we don't have to
worry about the
+ // substring being a view over the original string.
+ chunks.add(
+ ByteString.copyFrom(chunk.substring(i, i +
chunkSize).toByteArray()));
+ }
+ if (i < chunk.size()) {
+ chunks.add(
+ ByteString.copyFrom(chunk.substring(i,
chunk.size()).toByteArray()));
+ }
+ output.reset();
+ }
+ }
+ // Add the last chunk
+ if (output.size() > 0) {
+ chunks.add(output.toByteString());
+ }
+ return chunks;
+ }));
+ this.data =
+ new ConcurrentHashMap<>(
+ Maps.filterValues(encodedData, byteStrings ->
!byteStrings.isEmpty()));
Review comment:
I think it would improve readability though, to have a set of different
factory methods rather than a mix of calls to `new FakeBeamFnStateClient`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]