m-trieu commented on code in PR #32905:
URL: https://github.com/apache/beam/pull/32905#discussion_r1829876550


##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/WeightBoundedQueueTest.java:
##########
@@ -194,4 +206,37 @@ public void testTake() throws InterruptedException {
 
     assertEquals(MAX_WEIGHT, value.get());
   }
+
+  @Test
+  public void testPut_sharedWeigher() throws InterruptedException {
+    WeightedSemaphore<Integer> weigher =
+        WeightedSemaphore.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i));
+    WeightedBoundedQueue<Integer> queue1 = 
WeightedBoundedQueue.create(weigher);
+    WeightedBoundedQueue<Integer> queue2 = 
WeightedBoundedQueue.create(weigher);
+
+    // Insert value that takes all the weight into the queue1.
+    queue1.put(MAX_WEIGHT);
+
+    // Try to insert a value into the queue2. This will block since there is 
no capacity in the
+    // weigher.
+    Thread putThread = new Thread(() -> queue2.put(MAX_WEIGHT));
+    putThread.start();
+    // Should only see the first value in the queue, since the queue is at 
capacity. putThread
+    // should be blocked. The weight should be the same however, since queue1 
and queue2 are sharing
+    // the weigher.
+    Thread.sleep(100);
+    assertEquals(MAX_WEIGHT, weigher.currentWeight());
+    assertEquals(1, queue1.size());
+    assertEquals(MAX_WEIGHT, weigher.currentWeight());

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to