scwhittle commented on code in PR #30439:
URL: https://github.com/apache/beam/pull/30439#discussion_r1510932832


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -500,8 +503,10 @@ static StreamingDataflowWorker forTesting(
       boolean publishCounters,
       HotKeyLogger hotKeyLogger,
       Supplier<Instant> clock,
-      Function<String, ScheduledExecutorService> executorSupplier) {
-    BoundedQueueExecutor boundedQueueExecutor = 
createWorkUnitExecutor(options);
+      Function<String, ScheduledExecutorService> executorSupplier,
+      BoundedQueueExecutor executor) {

Review Comment:
   how about naming workUnitExecutor and can just use same variable below to 
avoid proliferation of names
   
   if (workUnitExeuctor == null) workUnitExecutor = 
createWorkUnitExecutor(options);



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1918,6 +1926,26 @@ private Optional<WorkerMessage> 
createWorkerMessageForPerWorkerMetrics() {
     return 
Optional.of(workUnitClient.createWorkerMessageFromPerWorkerMetrics(perWorkerMetrics));
   }
 
+  private void 
readAndSaveWorkerMessageResponseForStreamingScalingReportResponse(
+      List<WorkerMessageResponse> responses) {
+    for (WorkerMessageResponse response : responses) {
+      if (response.getStreamingScalingReportResponse() != null) {

Review Comment:
   would it be possible to get multiple responses? If so perhaps it would be 
best to find the last value in the loop and then just set once instead of 
possibly setting multiple times.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -59,8 +69,8 @@ public BoundedQueueExecutor(
           @Override
           protected void beforeExecute(Thread t, Runnable r) {
             super.beforeExecute(t, r);
-            synchronized (this) {
-              if (activeCount.getAndIncrement() >= maximumPoolSize - 1) {
+            synchronized (BoundedQueueExecutor.this) {
+              if (activeCount++ >= maximumThreadCount - 1) {
                 startTimeMaxActiveThreadsUsed = System.currentTimeMillis();

Review Comment:
   only reset if zero? I think it could be racy since 



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -37,8 +37,17 @@ public class BoundedQueueExecutor {
   private final Monitor monitor = new Monitor();
   private int elementsOutstanding = 0;

Review Comment:
   comment that elementsOutstanding and bytesOutstanding are guarded by the 
monitor



##########
runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/BoundedQueueExecutorTest.java:
##########
@@ -0,0 +1,189 @@
+/*

Review Comment:
   I think this file should be in the directory path matching other util files 
like:
   
   
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/MemoryMonitorTest.java
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to