scwhittle commented on code in PR #32905:
URL: https://github.com/apache/beam/pull/32905#discussion_r1822315996


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -118,19 +118,17 @@ public final class StreamingDataflowWorker {
    */
   public static final int MAX_SINK_BYTES = 10_000_000;
 
+  public static final String 
STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL =

Review Comment:
   revert this file



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WeightedSemaphore.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import java.util.concurrent.Semaphore;
+import java.util.function.Function;
+
+public final class WeightedSemaphore<V> {
+  private final int maxWeight;
+  private final Semaphore limit;
+  private final Function<V, Integer> weigher;
+
+  private WeightedSemaphore(int maxWeight, Semaphore limit, Function<V, 
Integer> weigher) {
+    this.maxWeight = maxWeight;
+    this.limit = limit;
+    this.weigher = weigher;
+  }
+
+  public static <V> WeightedSemaphore<V> create(int maxWeight, Function<V, 
Integer> weigherFn) {
+    return new WeightedSemaphore<>(maxWeight, new Semaphore(maxWeight, true), 
weigherFn);
+  }
+
+  void acquire(V value) {

Review Comment:
   public acquire and release? seems like basic functionality



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitter.java:
##########
@@ -86,16 +86,19 @@ public final class StreamingEngineWorkCommitter implements 
WorkCommitter {
   public static Builder builder() {
     return new AutoBuilder_StreamingEngineWorkCommitter_Builder()
         .setBackendWorkerToken(NO_BACKEND_WORKER_TOKEN)
+        .setWeigher(
+            WeightedSemaphore.create(

Review Comment:
   maybe require this to be set by caller instead of hiding it? It would be 
nice for the callers to know about it because then they could display it on 
their status page



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WeightedBoundedQueue.java:
##########
@@ -78,24 +69,26 @@ public void put(V value) {
   public @Nullable V poll(long timeout, TimeUnit unit) throws 
InterruptedException {
     V result = queue.poll(timeout, unit);
     if (result != null) {
-      limit.release(weigher.apply(result));
+      weigher.release(result);
     }
     return result;
   }
 
   /** Returns and removes the next value, or blocks until one is available. */
   public @Nullable V take() throws InterruptedException {
     V result = queue.take();
-    limit.release(weigher.apply(result));
+    weigher.release(result);
     return result;
   }
 
   /** Returns the current weight of the queue. */

Review Comment:
   can this be removed and just look at the weighted semaphore?  It is 
confusing since it doesn't count just bytes in this queue.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitter.java:
##########
@@ -61,11 +62,10 @@ public final class StreamingEngineWorkCommitter implements 
WorkCommitter {
       Supplier<CloseableStream<CommitWorkStream>> commitWorkStreamFactory,
       int numCommitSenders,
       Consumer<CompleteCommit> onCommitComplete,
-      String backendWorkerToken) {
+      String backendWorkerToken,
+      WeightedSemaphore<Commit> weigher) {

Review Comment:
   rename weigher throughout to commitByteSemaphore or something capturing it 
is the limiter not just a weighing function?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WeightedSemaphore.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import java.util.concurrent.Semaphore;
+import java.util.function.Function;
+
+public final class WeightedSemaphore<V> {
+  private final int maxWeight;
+  private final Semaphore limit;
+  private final Function<V, Integer> weigher;
+
+  private WeightedSemaphore(int maxWeight, Semaphore limit, Function<V, 
Integer> weigher) {
+    this.maxWeight = maxWeight;
+    this.limit = limit;
+    this.weigher = weigher;
+  }
+
+  public static <V> WeightedSemaphore<V> create(int maxWeight, Function<V, 
Integer> weigherFn) {
+    return new WeightedSemaphore<>(maxWeight, new Semaphore(maxWeight, true), 
weigherFn);
+  }
+
+  void acquire(V value) {

Review Comment:
   name acquireUninterruptibly?



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/WeightBoundedQueueTest.java:
##########
@@ -194,4 +201,46 @@ public void testTake() throws InterruptedException {
 
     assertEquals(MAX_WEIGHT, value.get());
   }
+
+  @Test
+  public void testPut_sharedWeigher() throws InterruptedException {
+    WeightedSemaphore<Integer> weigher =
+        WeightedSemaphore.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i));
+    WeightedBoundedQueue<Integer> queue1 = 
WeightedBoundedQueue.create(weigher);
+    WeightedBoundedQueue<Integer> queue2 = 
WeightedBoundedQueue.create(weigher);
+
+    // Insert value that takes all the weight into the queue1.
+    queue1.put(MAX_WEIGHT);
+
+    // Try to insert a value into the queue2. This will block since there is 
no capacity in the
+    // weigher.
+    Thread putThread =
+        new Thread(
+            () -> {
+              try {

Review Comment:
   remove the sleep? the element is already added by queue1 above so the sleep 
doesn't seem necessary



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WeightedBoundedQueue.java:
##########
@@ -18,49 +18,40 @@
 package org.apache.beam.runners.dataflow.worker.streaming;
 
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
-/** Bounded set of queues, with a maximum total weight. */
+/** Queue bounded by a {@link WeightedSemaphore}. */
 public final class WeightedBoundedQueue<V> {
 
   private final LinkedBlockingQueue<V> queue;
-  private final int maxWeight;
-  private final Semaphore limit;
-  private final Function<V, Integer> weigher;
+  private final WeightedSemaphore<V> weigher;

Review Comment:
   name weightedSemaphore? weigher just sounds like it is the weigh function



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/WeightBoundedQueueTest.java:
##########
@@ -194,4 +201,46 @@ public void testTake() throws InterruptedException {
 
     assertEquals(MAX_WEIGHT, value.get());
   }
+
+  @Test
+  public void testPut_sharedWeigher() throws InterruptedException {
+    WeightedSemaphore<Integer> weigher =
+        WeightedSemaphore.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i));
+    WeightedBoundedQueue<Integer> queue1 = 
WeightedBoundedQueue.create(weigher);
+    WeightedBoundedQueue<Integer> queue2 = 
WeightedBoundedQueue.create(weigher);
+
+    // Insert value that takes all the weight into the queue1.
+    queue1.put(MAX_WEIGHT);
+
+    // Try to insert a value into the queue2. This will block since there is 
no capacity in the
+    // weigher.
+    Thread putThread =
+        new Thread(
+            () -> {
+              try {
+                Thread.sleep(100);
+              } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+              }
+              queue2.put(MAX_WEIGHT);
+            });
+    putThread.start();
+
+    // Should only see the first value in the queue, since the queue is at 
capacity. putThread

Review Comment:
   could put the sleep here if you want to give a little more time for 
putThread to run and become blocked (doesn't matter if it doesn't or not so not 
racy but better test if it does block).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to