scwhittle commented on a change in pull request #17004:
URL: https://github.com/apache/beam/pull/17004#discussion_r818968039



##########
File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
##########
@@ -1462,37 +1463,80 @@ private void issueBatchedRequest(Map<Long, 
PendingRequest> requests) {
       }
     }
 
-    private void issueMultiChunkRequest(final long id, PendingRequest 
pendingRequest) {
-      Preconditions.checkNotNull(pendingRequest.computation);
-      final ByteString serializedCommit = 
pendingRequest.request.toByteString();
+    // An OutputStream which splits the output into chunks of no more than 
COMMIT_STREAM_CHUNK_SIZE
+    // before calling the chunkWriter on each.
+    //
+    // This avoids materializing the whole serialized request in the case it 
is large.
+    private class ChunkingByteStream extends OutputStream {
+      private final ByteString.Output output = 
ByteString.newOutput(COMMIT_STREAM_CHUNK_SIZE);
+      private final Consumer<ByteString> chunkWriter;
 
-      synchronized (this) {
-        pending.put(id, pendingRequest);
-        for (int i = 0; i < serializedCommit.size(); i += 
COMMIT_STREAM_CHUNK_SIZE) {
-          int end = i + COMMIT_STREAM_CHUNK_SIZE;
-          ByteString chunk = serializedCommit.substring(i, Math.min(end, 
serializedCommit.size()));
-
-          StreamingCommitRequestChunk.Builder chunkBuilder =
-              StreamingCommitRequestChunk.newBuilder()
-                  .setRequestId(id)
-                  .setSerializedWorkItemCommit(chunk)
-                  .setComputationId(pendingRequest.computation)
-                  .setShardingKey(pendingRequest.request.getShardingKey());
-          int remaining = serializedCommit.size() - end;
-          if (remaining > 0) {
-            chunkBuilder.setRemainingBytesForWorkItem(remaining);
-          }
+      ChunkingByteStream(Consumer<ByteString> chunkWriter) {
+        this.chunkWriter = chunkWriter;
+      }
 
-          StreamingCommitWorkRequest requestChunk =
-              
StreamingCommitWorkRequest.newBuilder().addCommitChunk(chunkBuilder).build();
-          try {
-            send(requestChunk);
-          } catch (IllegalStateException e) {
-            // Stream was broken, request will be retried when stream is 
reopened.
-            break;
-          }
+      @Override
+      public void close() {
+        flushBytes();
+      }
+
+      @Override
+      public void write(int b) throws IOException {
+        output.write(b);
+        if (output.size() == COMMIT_STREAM_CHUNK_SIZE) {
+          flushBytes();
         }
       }
+
+      @Override
+      public void write(byte b[], int off, int len) throws IOException {
+        // Fast path for larger writes that don't make the chunk too large.
+        if (len + output.size() < COMMIT_STREAM_CHUNK_SIZE) {
+          output.write(b, off, len);
+          return;
+        }
+        for (int i = 0; i < len; i++) {
+          write(b[off + i]);
+        }
+      }
+
+      private void flushBytes() {
+        if (output.size() == 0) {
+          return;
+        }
+        chunkWriter.accept(output.toByteString());
+        output.reset();
+      }
+    }
+
+    private void issueMultiChunkRequest(final long id, PendingRequest 
pendingRequest) {
+      Preconditions.checkNotNull(pendingRequest.computation);
+      Consumer<ByteString> chunkWriter =
+          new Consumer<ByteString>() {
+            private long remaining = 
pendingRequest.request.getSerializedSize();
+
+            @Override
+            public void accept(ByteString byteString) {

Review comment:
       seems like byteString should be chunk

##########
File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
##########
@@ -1462,37 +1463,80 @@ private void issueBatchedRequest(Map<Long, 
PendingRequest> requests) {
       }
     }
 
-    private void issueMultiChunkRequest(final long id, PendingRequest 
pendingRequest) {
-      Preconditions.checkNotNull(pendingRequest.computation);
-      final ByteString serializedCommit = 
pendingRequest.request.toByteString();
+    // An OutputStream which splits the output into chunks of no more than 
COMMIT_STREAM_CHUNK_SIZE
+    // before calling the chunkWriter on each.
+    //
+    // This avoids materializing the whole serialized request in the case it 
is large.
+    private class ChunkingByteStream extends OutputStream {
+      private final ByteString.Output output = 
ByteString.newOutput(COMMIT_STREAM_CHUNK_SIZE);
+      private final Consumer<ByteString> chunkWriter;
 
-      synchronized (this) {
-        pending.put(id, pendingRequest);
-        for (int i = 0; i < serializedCommit.size(); i += 
COMMIT_STREAM_CHUNK_SIZE) {
-          int end = i + COMMIT_STREAM_CHUNK_SIZE;
-          ByteString chunk = serializedCommit.substring(i, Math.min(end, 
serializedCommit.size()));
-
-          StreamingCommitRequestChunk.Builder chunkBuilder =
-              StreamingCommitRequestChunk.newBuilder()
-                  .setRequestId(id)
-                  .setSerializedWorkItemCommit(chunk)
-                  .setComputationId(pendingRequest.computation)
-                  .setShardingKey(pendingRequest.request.getShardingKey());
-          int remaining = serializedCommit.size() - end;
-          if (remaining > 0) {
-            chunkBuilder.setRemainingBytesForWorkItem(remaining);
-          }
+      ChunkingByteStream(Consumer<ByteString> chunkWriter) {
+        this.chunkWriter = chunkWriter;
+      }
 
-          StreamingCommitWorkRequest requestChunk =
-              
StreamingCommitWorkRequest.newBuilder().addCommitChunk(chunkBuilder).build();
-          try {
-            send(requestChunk);
-          } catch (IllegalStateException e) {
-            // Stream was broken, request will be retried when stream is 
reopened.
-            break;
-          }
+      @Override
+      public void close() {
+        flushBytes();
+      }
+
+      @Override
+      public void write(int b) throws IOException {
+        output.write(b);
+        if (output.size() == COMMIT_STREAM_CHUNK_SIZE) {
+          flushBytes();
         }
       }
+
+      @Override
+      public void write(byte b[], int off, int len) throws IOException {
+        // Fast path for larger writes that don't make the chunk too large.
+        if (len + output.size() < COMMIT_STREAM_CHUNK_SIZE) {
+          output.write(b, off, len);
+          return;
+        }
+        for (int i = 0; i < len; i++) {

Review comment:
       I think this could get called with large things, ie individual encoded 
message payloads (which can be up to 80MB). Can you
   - fill remaning in output and flush
   - while remaining bytes is greater than chunk size flush directly without 
copying to output
   - put the rest in output




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to