dpcollins-google commented on a change in pull request #17004:
URL: https://github.com/apache/beam/pull/17004#discussion_r819664655



##########
File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
##########
@@ -1462,37 +1463,80 @@ private void issueBatchedRequest(Map<Long, 
PendingRequest> requests) {
       }
     }
 
-    private void issueMultiChunkRequest(final long id, PendingRequest 
pendingRequest) {
-      Preconditions.checkNotNull(pendingRequest.computation);
-      final ByteString serializedCommit = 
pendingRequest.request.toByteString();
+    // An OutputStream which splits the output into chunks of no more than 
COMMIT_STREAM_CHUNK_SIZE
+    // before calling the chunkWriter on each.
+    //
+    // This avoids materializing the whole serialized request in the case it 
is large.
+    private class ChunkingByteStream extends OutputStream {
+      private final ByteString.Output output = 
ByteString.newOutput(COMMIT_STREAM_CHUNK_SIZE);
+      private final Consumer<ByteString> chunkWriter;
 
-      synchronized (this) {
-        pending.put(id, pendingRequest);
-        for (int i = 0; i < serializedCommit.size(); i += 
COMMIT_STREAM_CHUNK_SIZE) {
-          int end = i + COMMIT_STREAM_CHUNK_SIZE;
-          ByteString chunk = serializedCommit.substring(i, Math.min(end, 
serializedCommit.size()));
-
-          StreamingCommitRequestChunk.Builder chunkBuilder =
-              StreamingCommitRequestChunk.newBuilder()
-                  .setRequestId(id)
-                  .setSerializedWorkItemCommit(chunk)
-                  .setComputationId(pendingRequest.computation)
-                  .setShardingKey(pendingRequest.request.getShardingKey());
-          int remaining = serializedCommit.size() - end;
-          if (remaining > 0) {
-            chunkBuilder.setRemainingBytesForWorkItem(remaining);
-          }
+      ChunkingByteStream(Consumer<ByteString> chunkWriter) {
+        this.chunkWriter = chunkWriter;
+      }
 
-          StreamingCommitWorkRequest requestChunk =
-              
StreamingCommitWorkRequest.newBuilder().addCommitChunk(chunkBuilder).build();
-          try {
-            send(requestChunk);
-          } catch (IllegalStateException e) {
-            // Stream was broken, request will be retried when stream is 
reopened.
-            break;
-          }
+      @Override
+      public void close() {
+        flushBytes();
+      }
+
+      @Override
+      public void write(int b) throws IOException {
+        output.write(b);
+        if (output.size() == COMMIT_STREAM_CHUNK_SIZE) {
+          flushBytes();
         }
       }
+
+      @Override
+      public void write(byte b[], int off, int len) throws IOException {
+        // Fast path for larger writes that don't make the chunk too large.
+        if (len + output.size() < COMMIT_STREAM_CHUNK_SIZE) {
+          output.write(b, off, len);
+          return;
+        }
+        for (int i = 0; i < len; i++) {

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to