[ 
https://issues.apache.org/jira/browse/BEAM-13015?focusedWorklogId=696935&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-696935
 ]

ASF GitHub Bot logged work on BEAM-13015:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Dec/21 23:05
            Start Date: 15/Dec/21 23:05
    Worklog Time Spent: 10m 
      Work Description: lukecwik commented on a change in pull request #16230:
URL: https://github.com/apache/beam/pull/16230#discussion_r770105066



##########
File path: 
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
##########
@@ -31,25 +36,82 @@
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest.RequestCase;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
 
 /** A fake implementation of a {@link BeamFnStateClient} to aid with testing. 
*/
 public class FakeBeamFnStateClient implements BeamFnStateClient {
-  private final Map<StateKey, ByteString> data;
-  private final int chunkSize;
+  private static final int DEFAULT_CHUNK_SIZE = 6;
+  private final Map<StateKey, List<ByteString>> data;
   private int currentId;
 
-  public FakeBeamFnStateClient(Map<StateKey, ByteString> initialData) {
-    this(initialData, 6);
+  public <V> FakeBeamFnStateClient(Coder<V> valueCoder, Map<StateKey, List<V>> 
initialData) {
+    this(valueCoder, initialData, DEFAULT_CHUNK_SIZE);
   }
 
-  public FakeBeamFnStateClient(Map<StateKey, ByteString> initialData, int 
chunkSize) {
-    this.data = new ConcurrentHashMap<>(initialData);
-    this.chunkSize = chunkSize;
+  public <V> FakeBeamFnStateClient(
+      Coder<V> valueCoder, Map<StateKey, List<V>> initialData, int chunkSize) {
+    this(Maps.transformValues(initialData, (value) -> KV.of(valueCoder, 
value)), chunkSize);
+  }
+
+  public FakeBeamFnStateClient(Map<StateKey, KV<Coder<?>, List<?>>> 
initialData) {
+    this(initialData, DEFAULT_CHUNK_SIZE);
+  }
+
+  public FakeBeamFnStateClient(Map<StateKey, KV<Coder<?>, List<?>>> 
initialData, int chunkSize) {
+    Map<StateKey, List<ByteString>> encodedData =
+        new HashMap<>(
+            Maps.transformValues(
+                initialData,
+                (KV<Coder<?>, List<?>> coderAndValues) -> {
+                  List<ByteString> chunks = new ArrayList<>();
+                  ByteString.Output output = ByteString.newOutput();
+                  for (Object value : coderAndValues.getValue()) {
+                    try {
+                      ((Coder<Object>) coderAndValues.getKey()).encode(value, 
output);
+                    } catch (IOException e) {
+                      throw new RuntimeException(e);
+                    }
+                    if (output.size() >= chunkSize) {
+                      ByteString chunk = output.toByteString();
+                      int i = 0;
+                      for (; i + chunkSize <= chunk.size(); i += chunkSize) {
+                        // We specifically use a copy of the bytes instead of 
a proper substring
+                        // so that debugging is easier since we don't have to 
worry about the
+                        // substring being a view over the original string.
+                        chunks.add(
+                            ByteString.copyFrom(chunk.substring(i, i + 
chunkSize).toByteArray()));
+                      }
+                      if (i < chunk.size()) {
+                        chunks.add(
+                            ByteString.copyFrom(chunk.substring(i, 
chunk.size()).toByteArray()));
+                      }
+                      output.reset();
+                    }
+                  }
+                  // Add the last chunk
+                  if (output.size() > 0) {
+                    chunks.add(output.toByteString());
+                  }
+                  return chunks;
+                }));
+    this.data =
+        new ConcurrentHashMap<>(
+            Maps.filterValues(encodedData, byteStrings -> 
!byteStrings.isEmpty()));

Review comment:
       Yup.

##########
File path: 
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
##########
@@ -301,14 +303,17 @@ public void testUsingUserState() throws Exception {
       assertThat(mainOutputValues, empty());
 
       assertEquals(
-          ImmutableMap.<StateKey, ByteString>builder()
-              .put(bagUserStateKey("value", "X"), encode("X2"))
-              .put(bagUserStateKey("bag", "X"), encode("X0", "X1", "X2"))
-              .put(bagUserStateKey("combine", "X"), encode("X0X1X2"))
-              .put(bagUserStateKey("value", "Y"), encode("Y2"))
-              .put(bagUserStateKey("bag", "Y"), encode("Y1", "Y2"))
-              .put(bagUserStateKey("combine", "Y"), encode("Y1Y2"))
-              .build(),
+          new FakeBeamFnStateClient(
+                  StringUtf8Coder.of(),
+                  ImmutableMap.<StateKey, List<String>>builder()
+                      .put(bagUserStateKey("value", "X"), asList("X2"))
+                      .put(bagUserStateKey("bag", "X"), asList("X0", "X1", 
"X2"))
+                      .put(bagUserStateKey("combine", "X"), asList("X0X1X2"))
+                      .put(bagUserStateKey("value", "Y"), asList("Y2"))
+                      .put(bagUserStateKey("bag", "Y"), asList("Y1", "Y2"))
+                      .put(bagUserStateKey("combine", "Y"), asList("Y1Y2"))
+                      .build())
+              .getData(),

Review comment:
       I wanted for the comparison to be based upon what the internal 
representation of FakeBeamFnStateClient does allowing for future changes to be 
simpler overall instead of relying on the caller to know the exact 
ordering/bytestring layout.

##########
File path: 
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java
##########
@@ -31,25 +36,82 @@
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest.RequestCase;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
 
 /** A fake implementation of a {@link BeamFnStateClient} to aid with testing. 
*/
 public class FakeBeamFnStateClient implements BeamFnStateClient {
-  private final Map<StateKey, ByteString> data;
-  private final int chunkSize;
+  private static final int DEFAULT_CHUNK_SIZE = 6;
+  private final Map<StateKey, List<ByteString>> data;
   private int currentId;
 
-  public FakeBeamFnStateClient(Map<StateKey, ByteString> initialData) {
-    this(initialData, 6);
+  public <V> FakeBeamFnStateClient(Coder<V> valueCoder, Map<StateKey, List<V>> 
initialData) {
+    this(valueCoder, initialData, DEFAULT_CHUNK_SIZE);
   }
 
-  public FakeBeamFnStateClient(Map<StateKey, ByteString> initialData, int 
chunkSize) {
-    this.data = new ConcurrentHashMap<>(initialData);
-    this.chunkSize = chunkSize;
+  public <V> FakeBeamFnStateClient(
+      Coder<V> valueCoder, Map<StateKey, List<V>> initialData, int chunkSize) {
+    this(Maps.transformValues(initialData, (value) -> KV.of(valueCoder, 
value)), chunkSize);
+  }
+
+  public FakeBeamFnStateClient(Map<StateKey, KV<Coder<?>, List<?>>> 
initialData) {

Review comment:
       The factory method idea is much cleaner and will use that in a future PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 696935)
    Time Spent: 23h 20m  (was: 23h 10m)

> Optimize Java SDK harness
> -------------------------
>
>                 Key: BEAM-13015
>                 URL: https://issues.apache.org/jira/browse/BEAM-13015
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-harness
>            Reporter: Luke Cwik
>            Assignee: Luke Cwik
>            Priority: P2
>          Time Spent: 23h 20m
>  Remaining Estimate: 0h
>
> Use profiling tools to remove bundle processing overhead in the SDK harness.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to