scwhittle commented on code in PR #30312:
URL: https://github.com/apache/beam/pull/30312#discussion_r1522052467


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitter.java:
##########
@@ -113,6 +113,17 @@ public void stop() {
       }
       commitSenders.shutdownNow();
     }
+    drainCommitQueue();
+  }
+
+  private void drainCommitQueue() {
+    commitQueue.stream().forEach(this::failCommit);

Review Comment:
   see other comment, how about loop on poll instead



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitter.java:
##########
@@ -163,13 +180,16 @@ private boolean tryAddToCommitStream(Commit commit, 
CommitWorkStream commitStrea
         commitStream.commitWorkItem(
             commit.computationId(),
             commit.request(),
-            (commitStatus) -> 
onCommitComplete.accept(CompleteCommit.create(commit, commitStatus)));
+            (commitStatus) -> {
+              onCommitComplete.accept(CompleteCommit.create(commit, 
commitStatus));
+              activeCommitBytes.addAndGet(-commit.getSize());
+            });
 
     if (!isCommitSuccessful) {

Review Comment:
   // Since the commit was not accepted, revert the changes made above.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WeightedBoundedQueue.java:
##########
@@ -98,4 +99,12 @@ public int queuedElementsWeight() {
   public int size() {
     return queue.size();
   }
+
+  public Stream<V> stream() {
+    return queue.stream();
+  }
+
+  public void clear() {
+    queue.clear();

Review Comment:
   this is not updating limit.  To do so properly it seems easiest to poll() 
until null, but actually perhaps a better alternative is to remove stream() and 
clear() and just do that loop on poll() until null in 
StreamingEngineWorkCommitter
   
   That is less API of this class to test and in general if queue is still in 
use, stream()+clear() may process something that is poll()/take() in between 
the calls.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to