m-trieu commented on code in PR #32905:
URL: https://github.com/apache/beam/pull/32905#discussion_r1829886077
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/WeightBoundedQueueTest.java:
##########
@@ -194,4 +206,37 @@ public void testTake() throws InterruptedException {
assertEquals(MAX_WEIGHT, value.get());
}
+
+ @Test
+ public void testPut_sharedWeigher() throws InterruptedException {
+ WeightedSemaphore<Integer> weigher =
+ WeightedSemaphore.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i));
+ WeightedBoundedQueue<Integer> queue1 =
WeightedBoundedQueue.create(weigher);
+ WeightedBoundedQueue<Integer> queue2 =
WeightedBoundedQueue.create(weigher);
+
+ // Insert value that takes all the weight into the queue1.
+ queue1.put(MAX_WEIGHT);
+
+ // Try to insert a value into the queue2. This will block since there is
no capacity in the
+ // weigher.
+ Thread putThread = new Thread(() -> queue2.put(MAX_WEIGHT));
+ putThread.start();
+ // Should only see the first value in the queue, since the queue is at
capacity. putThread
+ // should be blocked. The weight should be the same however, since queue1
and queue2 are sharing
+ // the weigher.
+ Thread.sleep(100);
+ assertEquals(MAX_WEIGHT, weigher.currentWeight());
+ assertEquals(1, queue1.size());
+ assertEquals(MAX_WEIGHT, weigher.currentWeight());
+ assertEquals(0, queue2.size());
+
+ // Poll queue1, pulling off the only value inside and freeing up the
capacity in the weigher.
+ queue1.poll();
+
+ // Wait for the putThread which was previously blocked due to the weigher
being at capacity.
+ putThread.join();
+
+ assertEquals(MAX_WEIGHT, weigher.currentWeight());
+ assertEquals(1, queue2.size());
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]