scwhittle commented on code in PR #28537: URL: https://github.com/apache/beam/pull/28537#discussion_r1334173244
########## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/WeightBoundedQueueTest.java: ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class WeightBoundedQueueTest { + private static final int MAX_WEIGHT = 10; + + @Test + public void testPut_hasCapacity() { + WeightedBoundedQueue<Integer> queue = + WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i)); + + int insertedValue = 1; + + queue.put(insertedValue); + + assertEquals(insertedValue, queue.queuedElementsWeight()); + assertEquals(1, queue.size()); + assertEquals(insertedValue, (int) queue.poll()); + } + + @Test + public void testPut_noCapacity() throws InterruptedException { + WeightedBoundedQueue<Integer> queue = + WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i)); + + // Insert value that takes all the capacity into the queue. + Thread thread1 = new Thread(() -> queue.put(MAX_WEIGHT)); + thread1.start(); + thread1.join(); + + // Try to insert another value into the queue. This will block since there is no capacity in the + // queue. + Thread thread2 = new Thread(() -> queue.put(MAX_WEIGHT)); + thread2.start(); + + // Should only see the first value in the queue, since the queue is at capacity. thread2 + // should be blocked. + assertEquals(MAX_WEIGHT, queue.queuedElementsWeight()); + assertEquals(1, queue.size()); + + // Have another thread poll the queue, pulling off the only value inside and freeing up the Review Comment: can just poll directly from this thread instead of via a new thread, easier to verify result etc then. ########## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/WeightBoundedQueueTest.java: ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class WeightBoundedQueueTest { + private static final int MAX_WEIGHT = 10; + + @Test + public void testPut_hasCapacity() { + WeightedBoundedQueue<Integer> queue = + WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i)); + + int insertedValue = 1; + + queue.put(insertedValue); + + assertEquals(insertedValue, queue.queuedElementsWeight()); + assertEquals(1, queue.size()); + assertEquals(insertedValue, (int) queue.poll()); + } + + @Test + public void testPut_noCapacity() throws InterruptedException { + WeightedBoundedQueue<Integer> queue = + WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i)); + + // Insert value that takes all the capacity into the queue. + Thread thread1 = new Thread(() -> queue.put(MAX_WEIGHT)); Review Comment: can just put directly from the test thread ########## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/WeightBoundedQueueTest.java: ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class WeightBoundedQueueTest { + private static final int MAX_WEIGHT = 10; + + @Test + public void testPut_hasCapacity() { + WeightedBoundedQueue<Integer> queue = + WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i)); + + int insertedValue = 1; + + queue.put(insertedValue); + + assertEquals(insertedValue, queue.queuedElementsWeight()); + assertEquals(1, queue.size()); + assertEquals(insertedValue, (int) queue.poll()); + } + + @Test + public void testPut_noCapacity() throws InterruptedException { + WeightedBoundedQueue<Integer> queue = + WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i)); + + // Insert value that takes all the capacity into the queue. + Thread thread1 = new Thread(() -> queue.put(MAX_WEIGHT)); + thread1.start(); + thread1.join(); + + // Try to insert another value into the queue. This will block since there is no capacity in the + // queue. + Thread thread2 = new Thread(() -> queue.put(MAX_WEIGHT)); + thread2.start(); + + // Should only see the first value in the queue, since the queue is at capacity. thread2 + // should be blocked. + assertEquals(MAX_WEIGHT, queue.queuedElementsWeight()); + assertEquals(1, queue.size()); + + // Have another thread poll the queue, pulling off the only value inside and freeing up the + // capacity in the queue. + Thread thread3 = new Thread(queue::poll); + thread3.start(); + thread3.join(); + + // Wait for the thread2 which was previously blocked due to the queue being at capacity. + thread2.join(); + + assertEquals(MAX_WEIGHT, queue.queuedElementsWeight()); + assertEquals(1, queue.size()); + } + + @Test + public void testPoll() { + WeightedBoundedQueue<Integer> queue = + WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i)); + + int insertedValue1 = 1; + int insertedValue2 = 2; + + queue.put(insertedValue1); + queue.put(insertedValue2); + + assertEquals(insertedValue1 + insertedValue2, queue.queuedElementsWeight()); + assertEquals(2, queue.size()); + assertEquals(insertedValue1, (int) queue.poll()); + assertEquals(1, queue.size()); + } + + @Test + public void testPoll_emptyQueue() { + WeightedBoundedQueue<Integer> queue = + WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i)); + + assertNull(queue.poll()); + } + + @Test + public void testTake() throws InterruptedException { + WeightedBoundedQueue<Integer> queue = + WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i)); + + AtomicInteger value = new AtomicInteger(); + // Should block until value is available + Thread takeThread = + new Thread( + () -> { + try { + value.set(queue.take()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + takeThread.start(); Review Comment: add a sleep to give chance for it to start blocking ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ########## @@ -170,106 +191,35 @@ public class StreamingDataflowWorker { private static final Function<MapTask, MutableNetwork<Node, Edge>> mapTaskToBaseNetwork = new MapTaskToNetworkFunction(idGenerator); - private static Random clientIdGenerator = new Random(); - - // Maximum number of threads for processing. Currently each thread processes one key at a time. - static final int MAX_PROCESSING_THREADS = 300; - static final long THREAD_EXPIRATION_TIME_SEC = 60; - static final long TARGET_COMMIT_BUNDLE_BYTES = 32 << 20; - static final int MAX_COMMIT_QUEUE_BYTES = 500 << 20; // 500MB - static final int NUM_COMMIT_STREAMS = 1; - static final int GET_WORK_STREAM_TIMEOUT_MINUTES = 3; - static final Duration COMMIT_STREAM_TIMEOUT = Duration.standardMinutes(1); - private static final int DEFAULT_STATUS_PORT = 8081; - // Maximum size of the result of a GetWork request. private static final long MAX_GET_WORK_FETCH_BYTES = 64L << 20; // 64m - // Reserved ID for counter updates. // Matches kWindmillCounterUpdate in workflow_worker_service_multi_hubs.cc. private static final String WINDMILL_COUNTER_UPDATE_WORK_ID = "3"; - /** Maximum number of failure stacktraces to report in each update sent to backend. */ private static final int MAX_FAILURES_TO_REPORT_IN_UPDATE = 1000; - // TODO(https://github.com/apache/beam/issues/19632): Update throttling counters to use generic - // throttling-msecs metric. - public static final MetricName BIGQUERY_STREAMING_INSERT_THROTTLE_TIME = - MetricName.named( - "org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl", - "throttling-msecs"); - private static final Duration MAX_LOCAL_PROCESSING_RETRY_DURATION = Duration.standardMinutes(5); - - /** Returns whether an exception was caused by a {@link OutOfMemoryError}. */ - private static boolean isOutOfMemoryError(Throwable t) { - while (t != null) { - if (t instanceof OutOfMemoryError) { - return true; - } - t = t.getCause(); - } - return false; - } - - private static MapTask parseMapTask(String input) throws IOException { - return Transport.getJsonFactory().fromString(input, MapTask.class); - } - - public static void main(String[] args) throws Exception { - JvmInitializers.runOnStartup(); - - DataflowWorkerHarnessHelper.initializeLogging(StreamingDataflowWorker.class); - DataflowWorkerHarnessOptions options = - DataflowWorkerHarnessHelper.initializeGlobalStateAndPipelineOptions( - StreamingDataflowWorker.class); - DataflowWorkerHarnessHelper.configureLogging(options); - checkArgument( - options.isStreaming(), - "%s instantiated with options indicating batch use", - StreamingDataflowWorker.class.getName()); - - checkArgument( - !DataflowRunner.hasExperiment(options, "beam_fn_api"), - "%s cannot be main() class with beam_fn_api enabled", - StreamingDataflowWorker.class.getSimpleName()); - - StreamingDataflowWorker worker = - StreamingDataflowWorker.fromDataflowWorkerHarnessOptions(options); - - // Use the MetricsLogger container which is used by BigQueryIO to periodically log process-wide - // metrics. - MetricsEnvironment.setProcessWideContainer(new MetricsLogger(null)); - - JvmInitializers.runBeforeProcessing(options); - worker.startStatusPages(); - worker.start(); - } - + private static final Random clientIdGenerator = new Random(); + final WindmillStateCache stateCache; Review Comment: @kennknowles I'm not very aware of Java ecosystem and didn't originally write these files. I'm not sure if either of those frameworks are used in Beam but I don't know if that was a conscious choice. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java: ########## @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming; + +import com.google.api.services.dataflow.model.MapTask; +import java.io.PrintWriter; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import javax.annotation.Nullable; +import org.apache.beam.runners.dataflow.worker.WindmillStateCache; +import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class representing the state of a computation. + * + * <p>This class is synchronized, but only used from the dispatch and commit threads, so should not + * be heavily contended. Still, blocking work should not be done by it. + */ +public class ComputationState implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(ComputationState.class); + + // The max number of keys in COMMITTING or COMMIT_QUEUED status to be shown. + private static final int MAX_PRINTABLE_COMMIT_PENDING_KEYS = 50; + + private final String computationId; + private final MapTask mapTask; + private final ImmutableMap<String, String> transformUserNameToStateFamily; + // Map from key to work for the key. The first item in the queue is + // actively processing. Synchronized by itself. + private final Map<ShardedKey, Deque<Work>> activeWork = new HashMap<>(); + private final BoundedQueueExecutor executor; + private final ConcurrentLinkedQueue<ExecutionState> executionStateQueue = + new ConcurrentLinkedQueue<>(); + private final WindmillStateCache.ForComputation computationStateCache; + + public ComputationState( + String computationId, + MapTask mapTask, + BoundedQueueExecutor executor, + Map<String, String> transformUserNameToStateFamily, + WindmillStateCache.ForComputation computationStateCache) { + this.computationId = computationId; + this.mapTask = mapTask; + this.executor = executor; + this.transformUserNameToStateFamily = + transformUserNameToStateFamily != null + ? ImmutableMap.copyOf(transformUserNameToStateFamily) + : ImmutableMap.of(); + this.computationStateCache = computationStateCache; + Preconditions.checkNotNull(mapTask.getStageName()); + Preconditions.checkNotNull(mapTask.getSystemName()); + } + + public String getComputationId() { + return computationId; + } + + public MapTask getMapTask() { + return mapTask; + } + + public ImmutableMap<String, String> getTransformUserNameToStateFamily() { + return transformUserNameToStateFamily; + } + + public ConcurrentLinkedQueue<ExecutionState> getExecutionStateQueue() { + return executionStateQueue; + } + + /** Mark the given shardedKey and work as active. */ + public boolean activateWork(ShardedKey shardedKey, Work work) { + synchronized (activeWork) { + Deque<Work> queue = activeWork.get(shardedKey); + if (queue != null) { + Preconditions.checkState(!queue.isEmpty()); + // Ensure we don't already have this work token queueud. + for (Work queuedWork : queue) { + if (queuedWork.getWorkItem().getWorkToken() == work.getWorkItem().getWorkToken()) { + return false; + } + } + // Queue the work for later processing. + queue.addLast(work); + return true; + } else { + queue = new ArrayDeque<>(); + queue.addLast(work); + activeWork.put(shardedKey, queue); + // Fall through to execute without the lock held. + } + } + executor.execute(work, work.getWorkItem().getSerializedSize()); + return true; + } + + /** + * Marks the work for the given shardedKey as complete. Schedules queued work for the key if any. + */ + public void completeWork(ShardedKey shardedKey, long workToken) { + Work nextWork; Review Comment: ping on this and some other uncompleted comments ########## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/WeightBoundedQueueTest.java: ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class WeightBoundedQueueTest { + private static final int MAX_WEIGHT = 10; + + @Test + public void testPut_hasCapacity() { + WeightedBoundedQueue<Integer> queue = + WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i)); + + int insertedValue = 1; + + queue.put(insertedValue); + + assertEquals(insertedValue, queue.queuedElementsWeight()); + assertEquals(1, queue.size()); + assertEquals(insertedValue, (int) queue.poll()); + } + + @Test + public void testPut_noCapacity() throws InterruptedException { + WeightedBoundedQueue<Integer> queue = + WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i)); + + // Insert value that takes all the capacity into the queue. + Thread thread1 = new Thread(() -> queue.put(MAX_WEIGHT)); + thread1.start(); + thread1.join(); + + // Try to insert another value into the queue. This will block since there is no capacity in the + // queue. + Thread thread2 = new Thread(() -> queue.put(MAX_WEIGHT)); + thread2.start(); Review Comment: sleep 100ms? just to give some time for thread to get into blocking put (test will pass either way so shouldn't add flakiness) ########## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/WeightBoundedQueueTest.java: ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class WeightBoundedQueueTest { + private static final int MAX_WEIGHT = 10; + + @Test + public void testPut_hasCapacity() { + WeightedBoundedQueue<Integer> queue = + WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i)); + + int insertedValue = 1; + + queue.put(insertedValue); + + assertEquals(insertedValue, queue.queuedElementsWeight()); + assertEquals(1, queue.size()); + assertEquals(insertedValue, (int) queue.poll()); + } + + @Test + public void testPut_noCapacity() throws InterruptedException { + WeightedBoundedQueue<Integer> queue = + WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i)); + + // Insert value that takes all the capacity into the queue. + Thread thread1 = new Thread(() -> queue.put(MAX_WEIGHT)); + thread1.start(); + thread1.join(); + + // Try to insert another value into the queue. This will block since there is no capacity in the + // queue. + Thread thread2 = new Thread(() -> queue.put(MAX_WEIGHT)); + thread2.start(); + + // Should only see the first value in the queue, since the queue is at capacity. thread2 + // should be blocked. + assertEquals(MAX_WEIGHT, queue.queuedElementsWeight()); + assertEquals(1, queue.size()); + + // Have another thread poll the queue, pulling off the only value inside and freeing up the + // capacity in the queue. + Thread thread3 = new Thread(queue::poll); + thread3.start(); + thread3.join(); + + // Wait for the thread2 which was previously blocked due to the queue being at capacity. + thread2.join(); + + assertEquals(MAX_WEIGHT, queue.queuedElementsWeight()); + assertEquals(1, queue.size()); + } + + @Test + public void testPoll() { + WeightedBoundedQueue<Integer> queue = + WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i)); + + int insertedValue1 = 1; + int insertedValue2 = 2; + + queue.put(insertedValue1); + queue.put(insertedValue2); + + assertEquals(insertedValue1 + insertedValue2, queue.queuedElementsWeight()); + assertEquals(2, queue.size()); + assertEquals(insertedValue1, (int) queue.poll()); + assertEquals(1, queue.size()); + } + + @Test + public void testPoll_emptyQueue() { + WeightedBoundedQueue<Integer> queue = + WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i)); + + assertNull(queue.poll()); Review Comment: add test of poll with timeout method ########## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/WeightBoundedQueueTest.java: ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class WeightBoundedQueueTest { + private static final int MAX_WEIGHT = 10; + + @Test + public void testPut_hasCapacity() { + WeightedBoundedQueue<Integer> queue = + WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i)); + + int insertedValue = 1; + + queue.put(insertedValue); + + assertEquals(insertedValue, queue.queuedElementsWeight()); + assertEquals(1, queue.size()); + assertEquals(insertedValue, (int) queue.poll()); + } + + @Test + public void testPut_noCapacity() throws InterruptedException { + WeightedBoundedQueue<Integer> queue = + WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i)); + + // Insert value that takes all the capacity into the queue. + Thread thread1 = new Thread(() -> queue.put(MAX_WEIGHT)); + thread1.start(); + thread1.join(); + + // Try to insert another value into the queue. This will block since there is no capacity in the + // queue. + Thread thread2 = new Thread(() -> queue.put(MAX_WEIGHT)); + thread2.start(); + + // Should only see the first value in the queue, since the queue is at capacity. thread2 + // should be blocked. + assertEquals(MAX_WEIGHT, queue.queuedElementsWeight()); + assertEquals(1, queue.size()); + + // Have another thread poll the queue, pulling off the only value inside and freeing up the + // capacity in the queue. + Thread thread3 = new Thread(queue::poll); + thread3.start(); + thread3.join(); + + // Wait for the thread2 which was previously blocked due to the queue being at capacity. + thread2.join(); + + assertEquals(MAX_WEIGHT, queue.queuedElementsWeight()); + assertEquals(1, queue.size()); + } + + @Test + public void testPoll() { + WeightedBoundedQueue<Integer> queue = + WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i)); + + int insertedValue1 = 1; + int insertedValue2 = 2; + + queue.put(insertedValue1); + queue.put(insertedValue2); + + assertEquals(insertedValue1 + insertedValue2, queue.queuedElementsWeight()); + assertEquals(2, queue.size()); + assertEquals(insertedValue1, (int) queue.poll()); + assertEquals(1, queue.size()); + } + + @Test + public void testPoll_emptyQueue() { + WeightedBoundedQueue<Integer> queue = + WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i)); + + assertNull(queue.poll()); + } + + @Test + public void testTake() throws InterruptedException { Review Comment: add a variant where you poll with very large timeout and ensure it gets the element -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
