scwhittle commented on code in PR #28537:
URL: https://github.com/apache/beam/pull/28537#discussion_r1335618840
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java:
##########
@@ -125,23 +126,28 @@ public boolean activateWork(ShardedKey shardedKey, Work
work) {
/**
* Marks the work for the given shardedKey as complete. Schedules queued
work for the key if any.
*/
- public void completeWork(ShardedKey shardedKey, long workToken) {
- Work nextWork;
+ public void completeWorkAndScheduleNextWork(ShardedKey shardedKey, long
workToken) {
Review Comment:
nit: maybe CompleteWorkAndScheduleNextWorkForKey
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/WeightBoundedQueueTest.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class WeightBoundedQueueTest {
+ private static final int MAX_WEIGHT = 10;
+
+ @Test
+ public void testPut_hasCapacity() {
+ WeightedBoundedQueue<Integer> queue =
+ WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i));
+
+ int insertedValue = 1;
+
+ queue.put(insertedValue);
+
+ assertEquals(insertedValue, queue.queuedElementsWeight());
+ assertEquals(1, queue.size());
+ assertEquals(insertedValue, (int) queue.poll());
+ }
+
+ @Test
+ public void testPut_noCapacity() throws InterruptedException {
+ WeightedBoundedQueue<Integer> queue =
+ WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i));
+
+ // Insert value that takes all the capacity into the queue.
+ queue.put(MAX_WEIGHT);
+
+ // Try to insert another value into the queue. This will block since there
is no capacity in the
+ // queue.
+ Thread putThread =
+ new Thread(
+ () -> {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ queue.put(MAX_WEIGHT);
+ });
+ putThread.start();
+
+ // Should only see the first value in the queue, since the queue is at
capacity. thread2
+ // should be blocked.
+ assertEquals(MAX_WEIGHT, queue.queuedElementsWeight());
+ assertEquals(1, queue.size());
+
+ // Poll the queue, pulling off the only value inside and freeing up the
capacity in the queue.
+ queue.poll();
+
+ // Wait for the putThread which was previously blocked due to the queue
being at capacity.
+ putThread.join();
+
+ assertEquals(MAX_WEIGHT, queue.queuedElementsWeight());
+ assertEquals(1, queue.size());
+ }
+
+ @Test
+ public void testPoll() {
+ WeightedBoundedQueue<Integer> queue =
+ WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i));
+
+ int insertedValue1 = 1;
+ int insertedValue2 = 2;
+
+ queue.put(insertedValue1);
+ queue.put(insertedValue2);
+
+ assertEquals(insertedValue1 + insertedValue2,
queue.queuedElementsWeight());
+ assertEquals(2, queue.size());
+ assertEquals(insertedValue1, (int) queue.poll());
+ assertEquals(1, queue.size());
+ }
+
+ @Test
+ public void testPoll_withTimeout() throws InterruptedException {
+ WeightedBoundedQueue<Integer> queue =
+ WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i));
+ int pollWaitTimeMillis = 10000;
+ int insertedValue1 = 1;
+
+ AtomicInteger pollResult = new AtomicInteger();
+ Thread pollThread =
+ new Thread(
+ () -> {
+ int polled;
+ try {
+ polled = queue.poll(pollWaitTimeMillis, TimeUnit.MILLISECONDS);
+ pollResult.set(polled);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ pollThread.start();
+ Thread.sleep(pollWaitTimeMillis / 100);
+ queue.put(insertedValue1);
+ pollThread.join();
+
+ assertEquals(insertedValue1, pollResult.get());
+ }
+
+ @Test
+ public void testPoll_withLargeTimeout() throws InterruptedException {
+ WeightedBoundedQueue<Integer> queue =
+ WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i));
+ int insertedValue1 = 1;
+
+ AtomicInteger pollResult = new AtomicInteger();
+ Thread pollThread =
+ new Thread(
+ () -> {
+ int polled;
+ try {
+ polled = queue.poll(1, TimeUnit.MINUTES);
+ pollResult.set(polled);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ pollThread.start();
+ Thread.sleep(30000); // 30 sec.
+ queue.put(insertedValue1);
+ pollThread.join();
+
+ assertEquals(insertedValue1, pollResult.get());
+ }
+
+ @Test
+ public void testPoll_withTimeout_timesOut() throws InterruptedException {
+ WeightedBoundedQueue<Integer> queue =
+ WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i));
+ int defaultPollResult = -10;
+ int pollWaitTimeMillis = 100;
+ int insertedValue1 = 1;
+
+ // AtomicInteger default isn't null, so set it to a negative value and
verify that it doesn't
+ // change.
+ AtomicInteger pollResult = new AtomicInteger(defaultPollResult);
+
+ Thread pollThread =
+ new Thread(
+ () -> {
+ int polled;
+ try {
+ polled = queue.poll(pollWaitTimeMillis, TimeUnit.MILLISECONDS);
+ pollResult.set(polled);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ pollThread.start();
+ Thread.sleep(pollWaitTimeMillis * 100);
+ queue.put(insertedValue1);
+ pollThread.join();
+
+ assertEquals(defaultPollResult, pollResult.get());
+ }
+
+ @Test
+ public void testPoll_emptyQueue() {
+ WeightedBoundedQueue<Integer> queue =
+ WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i));
+
+ assertNull(queue.poll());
+ }
+
+ @Test
+ public void testTake() throws InterruptedException {
+ WeightedBoundedQueue<Integer> queue =
+ WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i));
+
+ AtomicInteger value = new AtomicInteger();
+ // Should block until value is available
+ Thread takeThread =
+ new Thread(
+ () -> {
+ try {
+ value.set(queue.take());
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ takeThread.start();
+
+ Thread putThread =
Review Comment:
remove putThread, can just be inlined to test.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ShardedKey.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.TextFormat;
+
+@AutoValue
+public abstract class ShardedKey {
+
+ public static ShardedKey create(ByteString key, long shardingKey) {
+ return new AutoValue_ShardedKey(key, shardingKey);
+ }
+
+ public abstract ByteString key();
+
+ public abstract long shardingKey();
+
+ @Override
+ public final String toString() {
+ ByteString keyToDisplay = key();
+ if (keyToDisplay.size() > 100) {
+ keyToDisplay = keyToDisplay.substring(0, 100);
+ }
+ return String.format("%016x-%s", shardingKey(),
TextFormat.escapeBytes(keyToDisplay));
Review Comment:
ping
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import static
org.apache.beam.runners.dataflow.worker.DataflowSystemMetrics.THROTTLING_MSECS_METRIC_NAME;
+
+import com.google.api.services.dataflow.model.CounterStructuredName;
+import com.google.api.services.dataflow.model.CounterUpdate;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.runners.dataflow.worker.DataflowSystemMetrics;
+import org.apache.beam.runners.dataflow.worker.MetricsContainerRegistry;
+import org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker;
+import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext;
+import org.apache.beam.runners.dataflow.worker.StreamingStepMetricsContainer;
+import org.apache.beam.runners.dataflow.worker.counters.Counter;
+import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
+import
org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor;
+import org.apache.beam.runners.dataflow.worker.counters.NameContext;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+
+/** Contains a few of the stage specific fields. E.g. metrics container
registry, counters etc. */
+public class StageInfo {
+
+ private final String stageName;
+ private final String systemName;
+ private final MetricsContainerRegistry<StreamingStepMetricsContainer>
metricsContainerRegistry;
+ private final
StreamingModeExecutionContext.StreamingModeExecutionStateRegistry
+ executionStateRegistry;
+ private final CounterSet deltaCounters;
+ private final Counter<Long, Long> throttledMsecs;
+ private final Counter<Long, Long> totalProcessingMsecs;
+ private final Counter<Long, Long> timerProcessingMsecs;
+
+ public StageInfo(String stageName, String systemName,
StreamingDataflowWorker worker) {
+ this.stageName = stageName;
+ this.systemName = systemName;
+ metricsContainerRegistry = StreamingStepMetricsContainer.createRegistry();
+ executionStateRegistry =
+ new
StreamingModeExecutionContext.StreamingModeExecutionStateRegistry(worker);
+ NameContext nameContext = NameContext.create(stageName, null, systemName,
null);
+ deltaCounters = new CounterSet();
+ throttledMsecs =
+ deltaCounters.longSum(
+
DataflowSystemMetrics.StreamingPerStageSystemCounterNames.THROTTLED_MSECS.counterName(
+ nameContext));
+ totalProcessingMsecs =
+ deltaCounters.longSum(
+
DataflowSystemMetrics.StreamingPerStageSystemCounterNames.TOTAL_PROCESSING_MSECS
+ .counterName(nameContext));
+ timerProcessingMsecs =
+ deltaCounters.longSum(
+
DataflowSystemMetrics.StreamingPerStageSystemCounterNames.TIMER_PROCESSING_MSECS
+ .counterName(nameContext));
+ }
+
+ public List<CounterUpdate> extractCounterUpdates() {
+ List<CounterUpdate> counterUpdates = new ArrayList<>();
+ Iterables.addAll(
+ counterUpdates,
+
StreamingStepMetricsContainer.extractMetricUpdates(metricsContainerRegistry));
+ Iterables.addAll(counterUpdates,
executionStateRegistry.extractUpdates(false));
+ for (CounterUpdate counterUpdate : counterUpdates) {
+ translateKnownStepCounters(counterUpdate);
+ }
+ counterUpdates.addAll(
+
deltaCounters.extractModifiedDeltaUpdates(DataflowCounterUpdateExtractor.INSTANCE));
+ return counterUpdates;
+ }
+
+ public String getStageName() {
Review Comment:
ping
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/WeightBoundedQueueTest.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class WeightBoundedQueueTest {
+ private static final int MAX_WEIGHT = 10;
+
+ @Test
+ public void testPut_hasCapacity() {
+ WeightedBoundedQueue<Integer> queue =
+ WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i));
+
+ int insertedValue = 1;
+
+ queue.put(insertedValue);
+
+ assertEquals(insertedValue, queue.queuedElementsWeight());
+ assertEquals(1, queue.size());
+ assertEquals(insertedValue, (int) queue.poll());
+ }
+
+ @Test
+ public void testPut_noCapacity() throws InterruptedException {
+ WeightedBoundedQueue<Integer> queue =
+ WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i));
+
+ // Insert value that takes all the capacity into the queue.
+ Thread thread1 = new Thread(() -> queue.put(MAX_WEIGHT));
+ thread1.start();
+ thread1.join();
+
+ // Try to insert another value into the queue. This will block since there
is no capacity in the
+ // queue.
+ Thread thread2 = new Thread(() -> queue.put(MAX_WEIGHT));
+ thread2.start();
+
+ // Should only see the first value in the queue, since the queue is at
capacity. thread2
+ // should be blocked.
+ assertEquals(MAX_WEIGHT, queue.queuedElementsWeight());
+ assertEquals(1, queue.size());
+
+ // Have another thread poll the queue, pulling off the only value inside
and freeing up the
+ // capacity in the queue.
+ Thread thread3 = new Thread(queue::poll);
+ thread3.start();
+ thread3.join();
+
+ // Wait for the thread2 which was previously blocked due to the queue
being at capacity.
+ thread2.join();
+
+ assertEquals(MAX_WEIGHT, queue.queuedElementsWeight());
+ assertEquals(1, queue.size());
+ }
+
+ @Test
+ public void testPoll() {
+ WeightedBoundedQueue<Integer> queue =
+ WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i));
+
+ int insertedValue1 = 1;
+ int insertedValue2 = 2;
+
+ queue.put(insertedValue1);
+ queue.put(insertedValue2);
+
+ assertEquals(insertedValue1 + insertedValue2,
queue.queuedElementsWeight());
+ assertEquals(2, queue.size());
+ assertEquals(insertedValue1, (int) queue.poll());
+ assertEquals(1, queue.size());
+ }
+
+ @Test
+ public void testPoll_emptyQueue() {
+ WeightedBoundedQueue<Integer> queue =
+ WeightedBoundedQueue.create(MAX_WEIGHT, i -> Math.min(MAX_WEIGHT, i));
+
+ assertNull(queue.poll());
+ }
+
+ @Test
+ public void testTake() throws InterruptedException {
Review Comment:
Thanks, I just meant something like testPoll_withTimeout with a large
timeout we never expected to get hit. You could remove
testPoll_withLargeTimeout since I don't think it adds much beyond withTimeout
and theses tests don't seem to run with much parallelism and we don't want to
slow the build down too much.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/KeyCommitTooLargeException.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+
+public class KeyCommitTooLargeException extends Exception {
Review Comment:
ping
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/counters/NameContext.java:
##########
@@ -31,7 +31,10 @@ public abstract class NameContext {
* systemName} and a {@code userName}.
*/
public static NameContext create(
- String stageName, String originalName, String systemName, String
userName) {
+ String stageName,
Review Comment:
Ping, I realized this is just method and not members generated by autovalue
below. But can stageName be non-null below?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java:
##########
@@ -125,23 +126,28 @@ public boolean activateWork(ShardedKey shardedKey, Work
work) {
/**
* Marks the work for the given shardedKey as complete. Schedules queued
work for the key if any.
*/
- public void completeWork(ShardedKey shardedKey, long workToken) {
- Work nextWork;
+ public void completeWorkAndScheduleNextWork(ShardedKey shardedKey, long
workToken) {
+ Optional<Work> nextWork;
synchronized (activeWork) {
- Queue<Work> queue = activeWork.get(shardedKey);
- if (queue == null) {
+ Queue<Work> workQueue = activeWork.get(shardedKey);
Review Comment:
nullable annotation or optional
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java:
##########
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import com.google.api.services.dataflow.model.MapTask;
+import java.io.PrintWriter;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.dataflow.worker.WindmillStateCache;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class representing the state of a computation.
+ *
+ * <p>This class is synchronized, but only used from the dispatch and commit
threads, so should not
+ * be heavily contended. Still, blocking work should not be done by it.
+ */
+public class ComputationState implements AutoCloseable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ComputationState.class);
+
+ // The max number of keys in COMMITTING or COMMIT_QUEUED status to be shown.
+ private static final int MAX_PRINTABLE_COMMIT_PENDING_KEYS = 50;
+
+ private final String computationId;
+ private final MapTask mapTask;
+ private final ImmutableMap<String, String> transformUserNameToStateFamily;
+ // Map from key to work for the key. The first item in the queue is
+ // actively processing. Synchronized by itself.
+ private final Map<ShardedKey, Deque<Work>> activeWork = new HashMap<>();
+ private final BoundedQueueExecutor executor;
+ private final ConcurrentLinkedQueue<ExecutionState> executionStateQueue =
+ new ConcurrentLinkedQueue<>();
+ private final WindmillStateCache.ForComputation computationStateCache;
+
+ public ComputationState(
+ String computationId,
+ MapTask mapTask,
+ BoundedQueueExecutor executor,
+ Map<String, String> transformUserNameToStateFamily,
+ WindmillStateCache.ForComputation computationStateCache) {
+ this.computationId = computationId;
+ this.mapTask = mapTask;
+ this.executor = executor;
+ this.transformUserNameToStateFamily =
+ transformUserNameToStateFamily != null
+ ? ImmutableMap.copyOf(transformUserNameToStateFamily)
+ : ImmutableMap.of();
+ this.computationStateCache = computationStateCache;
+ Preconditions.checkNotNull(mapTask.getStageName());
+ Preconditions.checkNotNull(mapTask.getSystemName());
+ }
+
+ public String getComputationId() {
+ return computationId;
+ }
+
+ public MapTask getMapTask() {
+ return mapTask;
+ }
+
+ public ImmutableMap<String, String> getTransformUserNameToStateFamily() {
+ return transformUserNameToStateFamily;
+ }
+
+ public ConcurrentLinkedQueue<ExecutionState> getExecutionStateQueue() {
+ return executionStateQueue;
+ }
+
+ /** Mark the given shardedKey and work as active. */
+ public boolean activateWork(ShardedKey shardedKey, Work work) {
+ synchronized (activeWork) {
+ Deque<Work> queue = activeWork.get(shardedKey);
+ if (queue != null) {
+ Preconditions.checkState(!queue.isEmpty());
+ // Ensure we don't already have this work token queueud.
+ for (Work queuedWork : queue) {
+ if (queuedWork.getWorkItem().getWorkToken() ==
work.getWorkItem().getWorkToken()) {
+ return false;
+ }
+ }
+ // Queue the work for later processing.
+ queue.addLast(work);
+ return true;
+ } else {
+ queue = new ArrayDeque<>();
+ queue.addLast(work);
+ activeWork.put(shardedKey, queue);
+ // Fall through to execute without the lock held.
+ }
+ }
+ executor.execute(work, work.getWorkItem().getSerializedSize());
+ return true;
+ }
+
+ /**
+ * Marks the work for the given shardedKey as complete. Schedules queued
work for the key if any.
+ */
+ public void completeWork(ShardedKey shardedKey, long workToken) {
+ Work nextWork;
+ synchronized (activeWork) {
+ Queue<Work> queue = activeWork.get(shardedKey);
+ if (queue == null) {
+ // Work may have been completed due to clearing of stuck commits.
+ LOG.warn(
+ "Unable to complete inactive work for key {} and token {}.",
shardedKey, workToken);
+ return;
+ }
+ Work completedWork = queue.peek();
+ // avoid Preconditions.checkState here to prevent eagerly evaluating the
+ // format string parameters for the error message.
+ if (completedWork == null) {
+ throw new IllegalStateException(
+ String.format("Active key %s without work, expected token %d",
shardedKey, workToken));
+ }
+ if (completedWork.getWorkItem().getWorkToken() != workToken) {
+ // Work may have been completed due to clearing of stuck commits.
+ LOG.warn(
+ "Unable to complete due to token mismatch for key {} and token {},
actual token was {}.",
+ shardedKey,
+ workToken,
+ completedWork.getWorkItem().getWorkToken());
+ return;
+ }
+ queue.remove(); // We consumed the matching work item.
+ nextWork = queue.peek();
+ if (nextWork == null) {
+ Preconditions.checkState(queue == activeWork.remove(shardedKey));
+ }
+ }
+ if (nextWork != null) {
+ executor.forceExecute(nextWork,
nextWork.getWorkItem().getSerializedSize());
+ }
+ }
+
+ public void invalidateStuckCommits(Instant stuckCommitDeadline) {
+ synchronized (activeWork) {
+ // Determine the stuck commit keys but complete them outside the loop
iterating over
+ // activeWork as completeWork may delete the entry from activeWork.
+ Map<ShardedKey, Long> stuckCommits = new HashMap<>();
+ for (Map.Entry<ShardedKey, Deque<Work>> entry : activeWork.entrySet()) {
+ ShardedKey shardedKey = entry.getKey();
+ @Nullable Work work = entry.getValue().peek();
+ if (work != null) {
+ if (work.getState() == Work.State.COMMITTING
+ && work.getStateStartTime().isBefore(stuckCommitDeadline)) {
+ LOG.error(
+ "Detected key {} stuck in COMMITTING state since {},
completing it with error.",
+ shardedKey,
+ work.getStateStartTime());
+ stuckCommits.put(shardedKey, work.getWorkItem().getWorkToken());
+ }
+ }
+ }
+ for (Map.Entry<ShardedKey, Long> stuckCommit : stuckCommits.entrySet()) {
+ computationStateCache.invalidate(
+ stuckCommit.getKey().key(), stuckCommit.getKey().shardingKey());
+ completeWork(stuckCommit.getKey(), stuckCommit.getValue());
+ }
+ }
+ }
+
+ /** Adds any work started before the refreshDeadline to the GetDataRequest
builder. */
+ public List<Windmill.KeyedGetDataRequest> getKeysToRefresh(Instant
refreshDeadline) {
+ List<Windmill.KeyedGetDataRequest> result = new ArrayList<>();
+ synchronized (activeWork) {
+ for (Map.Entry<ShardedKey, Deque<Work>> entry : activeWork.entrySet()) {
+ ShardedKey shardedKey = entry.getKey();
+ for (Work work : entry.getValue()) {
+ if (work.getStartTime().isBefore(refreshDeadline)) {
+ result.add(
+ Windmill.KeyedGetDataRequest.newBuilder()
+ .setKey(shardedKey.key())
+ .setShardingKey(shardedKey.shardingKey())
+ .setWorkToken(work.getWorkItem().getWorkToken())
+ .addAllLatencyAttribution(work.getLatencyAttributions())
+ .build());
+ }
+ }
+ }
+ }
+ return result;
+ }
+
+ private String elapsedString(Instant start, Instant end) {
+ Duration activeFor = new Duration(start, end);
+ // Duration's toString always starts with "PT"; remove that here.
+ return activeFor.toString().substring(2);
+ }
+
+ public void printActiveWork(PrintWriter writer) {
+ writer.println(
+ "<table border=\"1\" "
+ +
"style=\"border-collapse:collapse;padding:5px;border-spacing:5px;border:1px\">");
+ writer.println(
+ "<tr><th>Key</th><th>Token</th><th>Queued</th><th>Active
For</th><th>State</th><th>State Active For</th></tr>");
+
+ // We use a StringBuilder in the synchronized section to buffer writes
since the provided
+ // PrintWriter may block when flushing.
+ StringBuilder builder = new StringBuilder();
+ final Instant now = Instant.now();
+ int commitPendingCount = 0;
+ synchronized (activeWork) {
+ for (Map.Entry<ShardedKey, Deque<Work>> entry : activeWork.entrySet()) {
+ Queue<Work> queue = Preconditions.checkNotNull(entry.getValue());
+ Work work = Preconditions.checkNotNull(queue.peek());
+ Windmill.WorkItem workItem = work.getWorkItem();
+ if (work.isCommitPending()) {
+ if (++commitPendingCount >= MAX_PRINTABLE_COMMIT_PENDING_KEYS) {
+ continue;
+ }
+ }
+ builder.append("<tr>");
+ builder.append("<td>");
+ builder.append(String.format("%016x", workItem.getShardingKey()));
+ builder.append("</td><td>");
+ builder.append(String.format("%016x", workItem.getWorkToken()));
+ builder.append("</td><td>");
+ builder.append(queue.size() - 1);
+ builder.append("</td><td>");
+ builder.append(elapsedString(work.getStartTime(), now));
+ builder.append("</td><td>");
+ builder.append(work.getState());
+ builder.append("</td><td>");
+ builder.append(elapsedString(work.getStateStartTime(), now));
+ builder.append("</td></tr>\n");
+ }
+ }
+ writer.print(builder);
+ writer.println("</table>");
+ if (commitPendingCount >= MAX_PRINTABLE_COMMIT_PENDING_KEYS) {
+ writer.println("<br>");
+ writer.print("Skipped keys in COMMITTING/COMMIT_QUEUED: ");
+ writer.println(commitPendingCount - MAX_PRINTABLE_COMMIT_PENDING_KEYS);
+ writer.println("<br>");
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ ExecutionState executionState;
Review Comment:
ping
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]