m-trieu commented on code in PR #32905:
URL: https://github.com/apache/beam/pull/32905#discussion_r1825323708
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/WeightBoundedQueueTest.java:
##########
@@ -194,4 +201,46 @@ public void testTake() throws InterruptedException {
assertEquals(MAX_WEIGHT, value.get());
}
+
+ @Test
+ public void testPut_sharedWeigher() throws InterruptedException {
+ WeightedSemaphore<Integer> weigher =
+ WeightedSemaphore.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i));
+ WeightedBoundedQueue<Integer> queue1 =
WeightedBoundedQueue.create(weigher);
+ WeightedBoundedQueue<Integer> queue2 =
WeightedBoundedQueue.create(weigher);
+
+ // Insert value that takes all the weight into the queue1.
+ queue1.put(MAX_WEIGHT);
+
+ // Try to insert a value into the queue2. This will block since there is
no capacity in the
+ // weigher.
+ Thread putThread =
+ new Thread(
+ () -> {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ queue2.put(MAX_WEIGHT);
+ });
+ putThread.start();
+
+ // Should only see the first value in the queue, since the queue is at
capacity. putThread
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]