scwhittle commented on code in PR #28537: URL: https://github.com/apache/beam/pull/28537#discussion_r1332811222
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WeightedBoundedQueue.java: ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Bounded set of queues, with a maximum total weight. */ +public class WeightedBoundedQueue<V> { Review Comment: would be nice to add some tests for this now that it's visible ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WeightedBoundedQueue.java: ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Bounded set of queues, with a maximum total weight. */ +public class WeightedBoundedQueue<V> { + + private final LinkedBlockingQueue<V> queue; + private final int maxWeight; + private final Semaphore limit; + private final Function<V, Integer> weigher; + + private WeightedBoundedQueue( + LinkedBlockingQueue<V> linkedBlockingQueue, + int maxWeight, + Semaphore limit, + Function<V, Integer> weigher) { + this.queue = linkedBlockingQueue; + this.maxWeight = maxWeight; + this.limit = limit; + this.weigher = weigher; + } + + public static <V> WeightedBoundedQueue<V> create(int maxWeight, Function<V, Integer> weigherFn) { + return new WeightedBoundedQueue<>( + new LinkedBlockingQueue<>(), maxWeight, new Semaphore(maxWeight, true), weigherFn); + } + + /** + * Adds the value to the queue, blocking if this would cause the overall weight to exceed the + * limit. + */ + public void put(V value) { + limit.acquireUninterruptibly(weigher.apply(value)); + queue.add(value); + } + + /** Returns and removes the next value, or null if there is no such value. */ + public @Nullable V poll() { + V result = queue.poll(); + if (result != null) { + limit.release(weigher.apply(result)); + } + return result; + } + + /** + * Retrieves and removes the head of this queue, waiting up to the specified wait time if + * necessary for an element to become available. + * + * @param timeout how long to wait before giving up, in units of {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the {@code timeout} parameter + * @return the head of this queue, or {@code null} if the specified waiting time elapses before an + * element is available + * @throws InterruptedException if interrupted while waiting + */ + public @Nullable V poll(long timeout, TimeUnit unit) throws InterruptedException { + V result = queue.poll(timeout, unit); + if (result != null) { + limit.release(weigher.apply(result)); + } + return result; + } + + /** Returns and removes the next value, or blocks until one is available. */ + public @Nullable V take() throws InterruptedException { + V result = queue.take(); + limit.release(weigher.apply(result)); + return result; + } + + /** Returns the current weight of the queue. */ + public int weight() { Review Comment: IMO this method name and size() aren't particularly obvious, what do you think? maybe queuedElementsWeight() queuedElementsCount() ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java: ########## @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming; + +import com.google.api.services.dataflow.model.MapTask; +import java.io.PrintWriter; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import javax.annotation.Nullable; +import org.apache.beam.runners.dataflow.worker.WindmillStateCache; +import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class representing the state of a computation. + * + * <p>This class is synchronized, but only used from the dispatch and commit threads, so should not + * be heavily contended. Still, blocking work should not be done by it. + */ +public class ComputationState implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(ComputationState.class); + + // The max number of keys in COMMITTING or COMMIT_QUEUED status to be shown. + private static final int MAX_PRINTABLE_COMMIT_PENDING_KEYS = 50; + + private final String computationId; + private final MapTask mapTask; + private final ImmutableMap<String, String> transformUserNameToStateFamily; + // Map from key to work for the key. The first item in the queue is + // actively processing. Synchronized by itself. + private final Map<ShardedKey, Deque<Work>> activeWork = new HashMap<>(); + private final BoundedQueueExecutor executor; + private final ConcurrentLinkedQueue<ExecutionState> executionStateQueue = + new ConcurrentLinkedQueue<>(); + private final WindmillStateCache.ForComputation computationStateCache; + + public ComputationState( + String computationId, + MapTask mapTask, + BoundedQueueExecutor executor, + Map<String, String> transformUserNameToStateFamily, + WindmillStateCache.ForComputation computationStateCache) { + this.computationId = computationId; + this.mapTask = mapTask; + this.executor = executor; + this.transformUserNameToStateFamily = + transformUserNameToStateFamily != null Review Comment: add @ Nullable annotation or remove ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StageInfo.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming; + +import static org.apache.beam.runners.dataflow.worker.DataflowSystemMetrics.THROTTLING_MSECS_METRIC_NAME; + +import com.google.api.services.dataflow.model.CounterStructuredName; +import com.google.api.services.dataflow.model.CounterUpdate; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.runners.dataflow.worker.DataflowSystemMetrics; +import org.apache.beam.runners.dataflow.worker.MetricsContainerRegistry; +import org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker; +import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext; +import org.apache.beam.runners.dataflow.worker.StreamingStepMetricsContainer; +import org.apache.beam.runners.dataflow.worker.counters.Counter; +import org.apache.beam.runners.dataflow.worker.counters.CounterSet; +import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor; +import org.apache.beam.runners.dataflow.worker.counters.NameContext; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; + +/** Contains a few of the stage specific fields. E.g. metrics container registry, counters etc. */ +public class StageInfo { + + private final String stageName; + private final String systemName; + private final MetricsContainerRegistry<StreamingStepMetricsContainer> metricsContainerRegistry; + private final StreamingModeExecutionContext.StreamingModeExecutionStateRegistry + executionStateRegistry; + private final CounterSet deltaCounters; + private final Counter<Long, Long> throttledMsecs; + private final Counter<Long, Long> totalProcessingMsecs; + private final Counter<Long, Long> timerProcessingMsecs; + + public StageInfo(String stageName, String systemName, StreamingDataflowWorker worker) { + this.stageName = stageName; + this.systemName = systemName; + metricsContainerRegistry = StreamingStepMetricsContainer.createRegistry(); + executionStateRegistry = + new StreamingModeExecutionContext.StreamingModeExecutionStateRegistry(worker); + NameContext nameContext = NameContext.create(stageName, null, systemName, null); + deltaCounters = new CounterSet(); + throttledMsecs = + deltaCounters.longSum( + DataflowSystemMetrics.StreamingPerStageSystemCounterNames.THROTTLED_MSECS.counterName( + nameContext)); + totalProcessingMsecs = + deltaCounters.longSum( + DataflowSystemMetrics.StreamingPerStageSystemCounterNames.TOTAL_PROCESSING_MSECS + .counterName(nameContext)); + timerProcessingMsecs = + deltaCounters.longSum( + DataflowSystemMetrics.StreamingPerStageSystemCounterNames.TIMER_PROCESSING_MSECS + .counterName(nameContext)); + } + + public List<CounterUpdate> extractCounterUpdates() { + List<CounterUpdate> counterUpdates = new ArrayList<>(); + Iterables.addAll( + counterUpdates, + StreamingStepMetricsContainer.extractMetricUpdates(metricsContainerRegistry)); + Iterables.addAll(counterUpdates, executionStateRegistry.extractUpdates(false)); + for (CounterUpdate counterUpdate : counterUpdates) { + translateKnownStepCounters(counterUpdate); + } + counterUpdates.addAll( + deltaCounters.extractModifiedDeltaUpdates(DataflowCounterUpdateExtractor.INSTANCE)); + return counterUpdates; + } + + public String getStageName() { Review Comment: would it be beneficial to use autovalue? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ShardedKey.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming; + +import com.google.auto.value.AutoValue; +import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString; +import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.TextFormat; + +@AutoValue +public abstract class ShardedKey { + + public static ShardedKey create(ByteString key, long shardingKey) { + return new AutoValue_ShardedKey(key, shardingKey); + } + + public abstract ByteString key(); + + public abstract long shardingKey(); + + @Override + public final String toString() { + ByteString keyToDisplay = key(); + if (keyToDisplay.size() > 100) { + keyToDisplay = keyToDisplay.substring(0, 100); + } + return String.format("%016x-%s", shardingKey(), TextFormat.escapeBytes(keyToDisplay)); Review Comment: let's just show the shardingKey ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ########## @@ -301,69 +248,35 @@ public static void main(String[] args) throws Exception { private final Counter<Long, Long> timeAtMaxActiveThreads; private final Counter<Integer, Integer> windmillMaxObservedWorkItemCommitBytes; private final Counter<Integer, Integer> memoryThrashing; - private ScheduledExecutorService refreshWorkTimer; - private ScheduledExecutorService statusPageTimer; - private final boolean publishCounters; - private ScheduledExecutorService globalWorkerUpdatesTimer; - private int retryLocallyDelayMs = 10000; - - // Periodically fires a global config request to dataflow service. Only used when windmill service - // is enabled. - private ScheduledExecutorService globalConfigRefreshTimer; - private final MemoryMonitor memoryMonitor; private final Thread memoryMonitorThread; - private final WorkerStatusPages statusPages; - // Periodic sender of debug information to the debug capture service. - private DebugCapture.Manager debugCaptureManager = null; - // Limit on bytes sinked (committed) in a work item. private final long maxSinkBytes; // = MAX_SINK_BYTES unless disabled in options. - // Possibly overridden by streaming engine config. - private int maxWorkItemCommitBytes = Integer.MAX_VALUE; - private final EvictingQueue<String> pendingFailuresToReport = - EvictingQueue.<String>create(MAX_FAILURES_TO_REPORT_IN_UPDATE); - + EvictingQueue.create(MAX_FAILURES_TO_REPORT_IN_UPDATE); private final ReaderCache readerCache; - private final WorkUnitClient workUnitClient; private final CompletableFuture<Void> isDoneFuture; private final Function<MapTask, MutableNetwork<Node, Edge>> mapTaskToNetwork; - - /** - * Sinks are marked 'full' in {@link StreamingModeExecutionContext} once the amount of data sinked - * (across all the sinks, if there are more than one) reaches this limit. This serves as hint for - * readers to stop producing more. This can be disabled with 'disable_limiting_bundle_sink_bytes' - * experiment. - */ - static final int MAX_SINK_BYTES = 10_000_000; - private final ReaderRegistry readerRegistry = ReaderRegistry.defaultRegistry(); private final SinkRegistry sinkRegistry = SinkRegistry.defaultRegistry(); - - private HotKeyLogger hotKeyLogger; - private final Supplier<Instant> clock; private final Function<String, ScheduledExecutorService> executorSupplier; - - public static StreamingDataflowWorker fromDataflowWorkerHarnessOptions( - DataflowWorkerHarnessOptions options) throws IOException { - - return new StreamingDataflowWorker( - Collections.emptyList(), - IntrinsicMapTaskExecutorFactory.defaultFactory(), - new DataflowWorkUnitClient(options, LOG), - options.as(StreamingDataflowWorkerOptions.class), - true, - new HotKeyLogger(), - Instant::now, - (threadName) -> - Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat(threadName).build())); - } + private final DataflowMapTaskExecutorFactory mapTaskExecutorFactory; + private final HotKeyLogger hotKeyLogger; + private ScheduledExecutorService refreshWorkTimer; + private ScheduledExecutorService statusPageTimer; + private ScheduledExecutorService globalWorkerUpdatesTimer; + private int retryLocallyDelayMs = 10000; + // Periodically fires a global config request to dataflow service. Only used when windmill service + // is enabled. + private ScheduledExecutorService globalConfigRefreshTimer; + // Periodic sender of debug information to the debug capture service. + private DebugCapture.Manager debugCaptureManager = null; Review Comment: can remove = null, mark it nullable ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ########## @@ -933,13 +905,13 @@ private void process( final Windmill.WorkItem workItem = work.getWorkItem(); final String computationId = computationState.getComputationId(); final ByteString key = workItem.getKey(); - work.setState(Work.State.PROCESSING); + work.setState(State.PROCESSING); { - StringBuilder workIdBuilder = new StringBuilder(33); - workIdBuilder.append(Long.toHexString(workItem.getShardingKey())); - workIdBuilder.append('-'); - workIdBuilder.append(Long.toHexString(workItem.getWorkToken())); - DataflowWorkerLoggingMDC.setWorkId(workIdBuilder.toString()); + String workIdBuilder = Review Comment: no longer a builder. Does Java optimize repeated string additions better now? IIRC we maybe saw cpu or objects on a profile ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/KeyCommitTooLargeException.java: ########## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming; + +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; + +public class KeyCommitTooLargeException extends Exception { Review Comment: (I'm not really a java style expert) Is it beneficial to have these classes in separate files if they are not used anywhere else (and are pretty specific)? Can they be private or package private instead of public? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/counters/NameContext.java: ########## @@ -31,7 +31,10 @@ public abstract class NameContext { * systemName} and a {@code userName}. */ public static NameContext create( - String stageName, String originalName, String systemName, String userName) { + String stageName, Review Comment: seems like all of these are @Nullable based upon functions below system name is null in forStage method possibly stageName is always non-null and could be modified in accessor? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java: ########## @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming; + +import com.google.api.services.dataflow.model.MapTask; +import java.io.PrintWriter; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import javax.annotation.Nullable; +import org.apache.beam.runners.dataflow.worker.WindmillStateCache; +import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class representing the state of a computation. + * + * <p>This class is synchronized, but only used from the dispatch and commit threads, so should not + * be heavily contended. Still, blocking work should not be done by it. + */ +public class ComputationState implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(ComputationState.class); + + // The max number of keys in COMMITTING or COMMIT_QUEUED status to be shown. + private static final int MAX_PRINTABLE_COMMIT_PENDING_KEYS = 50; + + private final String computationId; + private final MapTask mapTask; + private final ImmutableMap<String, String> transformUserNameToStateFamily; + // Map from key to work for the key. The first item in the queue is + // actively processing. Synchronized by itself. + private final Map<ShardedKey, Deque<Work>> activeWork = new HashMap<>(); + private final BoundedQueueExecutor executor; + private final ConcurrentLinkedQueue<ExecutionState> executionStateQueue = + new ConcurrentLinkedQueue<>(); + private final WindmillStateCache.ForComputation computationStateCache; + + public ComputationState( + String computationId, + MapTask mapTask, + BoundedQueueExecutor executor, + Map<String, String> transformUserNameToStateFamily, + WindmillStateCache.ForComputation computationStateCache) { + this.computationId = computationId; + this.mapTask = mapTask; + this.executor = executor; + this.transformUserNameToStateFamily = + transformUserNameToStateFamily != null + ? ImmutableMap.copyOf(transformUserNameToStateFamily) + : ImmutableMap.of(); + this.computationStateCache = computationStateCache; + Preconditions.checkNotNull(mapTask.getStageName()); + Preconditions.checkNotNull(mapTask.getSystemName()); + } + + public String getComputationId() { + return computationId; + } + + public MapTask getMapTask() { + return mapTask; + } + + public ImmutableMap<String, String> getTransformUserNameToStateFamily() { + return transformUserNameToStateFamily; + } + + public ConcurrentLinkedQueue<ExecutionState> getExecutionStateQueue() { + return executionStateQueue; + } + + /** Mark the given shardedKey and work as active. */ Review Comment: and schedules execution of work if there is no active work for the shardedKey already processing. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ########## @@ -148,8 +149,29 @@ }) public class StreamingDataflowWorker { - private static final Logger LOG = LoggerFactory.getLogger(StreamingDataflowWorker.class); + // TODO(https://github.com/apache/beam/issues/19632): Update throttling counters to use generic + // throttling-msecs metric. + public static final MetricName BIGQUERY_STREAMING_INSERT_THROTTLE_TIME = + MetricName.named( + "org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl", + "throttling-msecs"); + // Maximum number of threads for processing. Currently each thread processes one key at a time. + static final int MAX_PROCESSING_THREADS = 300; + static final long THREAD_EXPIRATION_TIME_SEC = 60; + static final long TARGET_COMMIT_BUNDLE_BYTES = 32 << 20; + static final int MAX_COMMIT_QUEUE_BYTES = 500 << 20; // 500MB + static final int NUM_COMMIT_STREAMS = 1; + static final int GET_WORK_STREAM_TIMEOUT_MINUTES = 3; + static final Duration COMMIT_STREAM_TIMEOUT = Duration.standardMinutes(1); Review Comment: should the other constants be moved to the top here? (if not, what is the motivation for moving some of them?) ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java: ########## @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming; + +import com.google.api.services.dataflow.model.MapTask; +import java.io.PrintWriter; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import javax.annotation.Nullable; +import org.apache.beam.runners.dataflow.worker.WindmillStateCache; +import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class representing the state of a computation. + * + * <p>This class is synchronized, but only used from the dispatch and commit threads, so should not + * be heavily contended. Still, blocking work should not be done by it. + */ +public class ComputationState implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(ComputationState.class); + + // The max number of keys in COMMITTING or COMMIT_QUEUED status to be shown. + private static final int MAX_PRINTABLE_COMMIT_PENDING_KEYS = 50; + + private final String computationId; + private final MapTask mapTask; + private final ImmutableMap<String, String> transformUserNameToStateFamily; + // Map from key to work for the key. The first item in the queue is + // actively processing. Synchronized by itself. + private final Map<ShardedKey, Deque<Work>> activeWork = new HashMap<>(); + private final BoundedQueueExecutor executor; + private final ConcurrentLinkedQueue<ExecutionState> executionStateQueue = + new ConcurrentLinkedQueue<>(); + private final WindmillStateCache.ForComputation computationStateCache; + + public ComputationState( + String computationId, + MapTask mapTask, + BoundedQueueExecutor executor, + Map<String, String> transformUserNameToStateFamily, + WindmillStateCache.ForComputation computationStateCache) { + this.computationId = computationId; + this.mapTask = mapTask; + this.executor = executor; + this.transformUserNameToStateFamily = + transformUserNameToStateFamily != null + ? ImmutableMap.copyOf(transformUserNameToStateFamily) + : ImmutableMap.of(); + this.computationStateCache = computationStateCache; + Preconditions.checkNotNull(mapTask.getStageName()); + Preconditions.checkNotNull(mapTask.getSystemName()); + } + + public String getComputationId() { + return computationId; + } + + public MapTask getMapTask() { + return mapTask; + } + + public ImmutableMap<String, String> getTransformUserNameToStateFamily() { + return transformUserNameToStateFamily; + } + + public ConcurrentLinkedQueue<ExecutionState> getExecutionStateQueue() { + return executionStateQueue; + } + + /** Mark the given shardedKey and work as active. */ + public boolean activateWork(ShardedKey shardedKey, Work work) { + synchronized (activeWork) { + Deque<Work> queue = activeWork.get(shardedKey); + if (queue != null) { + Preconditions.checkState(!queue.isEmpty()); + // Ensure we don't already have this work token queueud. + for (Work queuedWork : queue) { + if (queuedWork.getWorkItem().getWorkToken() == work.getWorkItem().getWorkToken()) { + return false; + } + } + // Queue the work for later processing. + queue.addLast(work); + return true; + } else { + queue = new ArrayDeque<>(); + queue.addLast(work); + activeWork.put(shardedKey, queue); + // Fall through to execute without the lock held. + } + } + executor.execute(work, work.getWorkItem().getSerializedSize()); + return true; + } + + /** + * Marks the work for the given shardedKey as complete. Schedules queued work for the key if any. + */ + public void completeWork(ShardedKey shardedKey, long workToken) { + Work nextWork; + synchronized (activeWork) { + Queue<Work> queue = activeWork.get(shardedKey); + if (queue == null) { + // Work may have been completed due to clearing of stuck commits. + LOG.warn( + "Unable to complete inactive work for key {} and token {}.", shardedKey, workToken); + return; + } + Work completedWork = queue.peek(); Review Comment: @ Nullable annotations ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java: ########## @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming; + +import com.google.api.services.dataflow.model.MapTask; +import java.io.PrintWriter; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import javax.annotation.Nullable; +import org.apache.beam.runners.dataflow.worker.WindmillStateCache; +import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class representing the state of a computation. + * + * <p>This class is synchronized, but only used from the dispatch and commit threads, so should not + * be heavily contended. Still, blocking work should not be done by it. + */ +public class ComputationState implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(ComputationState.class); + + // The max number of keys in COMMITTING or COMMIT_QUEUED status to be shown. + private static final int MAX_PRINTABLE_COMMIT_PENDING_KEYS = 50; + + private final String computationId; + private final MapTask mapTask; + private final ImmutableMap<String, String> transformUserNameToStateFamily; + // Map from key to work for the key. The first item in the queue is + // actively processing. Synchronized by itself. + private final Map<ShardedKey, Deque<Work>> activeWork = new HashMap<>(); + private final BoundedQueueExecutor executor; + private final ConcurrentLinkedQueue<ExecutionState> executionStateQueue = + new ConcurrentLinkedQueue<>(); + private final WindmillStateCache.ForComputation computationStateCache; + + public ComputationState( + String computationId, + MapTask mapTask, + BoundedQueueExecutor executor, + Map<String, String> transformUserNameToStateFamily, + WindmillStateCache.ForComputation computationStateCache) { + this.computationId = computationId; + this.mapTask = mapTask; + this.executor = executor; + this.transformUserNameToStateFamily = + transformUserNameToStateFamily != null + ? ImmutableMap.copyOf(transformUserNameToStateFamily) + : ImmutableMap.of(); + this.computationStateCache = computationStateCache; + Preconditions.checkNotNull(mapTask.getStageName()); + Preconditions.checkNotNull(mapTask.getSystemName()); + } + + public String getComputationId() { + return computationId; + } + + public MapTask getMapTask() { + return mapTask; + } + + public ImmutableMap<String, String> getTransformUserNameToStateFamily() { + return transformUserNameToStateFamily; + } + + public ConcurrentLinkedQueue<ExecutionState> getExecutionStateQueue() { + return executionStateQueue; + } + + /** Mark the given shardedKey and work as active. */ + public boolean activateWork(ShardedKey shardedKey, Work work) { + synchronized (activeWork) { + Deque<Work> queue = activeWork.get(shardedKey); + if (queue != null) { + Preconditions.checkState(!queue.isEmpty()); + // Ensure we don't already have this work token queueud. + for (Work queuedWork : queue) { + if (queuedWork.getWorkItem().getWorkToken() == work.getWorkItem().getWorkToken()) { + return false; + } + } + // Queue the work for later processing. + queue.addLast(work); + return true; + } else { + queue = new ArrayDeque<>(); + queue.addLast(work); + activeWork.put(shardedKey, queue); + // Fall through to execute without the lock held. + } + } + executor.execute(work, work.getWorkItem().getSerializedSize()); + return true; + } + + /** + * Marks the work for the given shardedKey as complete. Schedules queued work for the key if any. + */ + public void completeWork(ShardedKey shardedKey, long workToken) { + Work nextWork; Review Comment: @ Nullable annotation ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java: ########## @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming; + +import com.google.api.services.dataflow.model.MapTask; +import java.io.PrintWriter; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import javax.annotation.Nullable; +import org.apache.beam.runners.dataflow.worker.WindmillStateCache; +import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class representing the state of a computation. + * + * <p>This class is synchronized, but only used from the dispatch and commit threads, so should not + * be heavily contended. Still, blocking work should not be done by it. + */ +public class ComputationState implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(ComputationState.class); + + // The max number of keys in COMMITTING or COMMIT_QUEUED status to be shown. + private static final int MAX_PRINTABLE_COMMIT_PENDING_KEYS = 50; + + private final String computationId; + private final MapTask mapTask; + private final ImmutableMap<String, String> transformUserNameToStateFamily; + // Map from key to work for the key. The first item in the queue is + // actively processing. Synchronized by itself. + private final Map<ShardedKey, Deque<Work>> activeWork = new HashMap<>(); + private final BoundedQueueExecutor executor; + private final ConcurrentLinkedQueue<ExecutionState> executionStateQueue = + new ConcurrentLinkedQueue<>(); + private final WindmillStateCache.ForComputation computationStateCache; + + public ComputationState( + String computationId, + MapTask mapTask, + BoundedQueueExecutor executor, + Map<String, String> transformUserNameToStateFamily, + WindmillStateCache.ForComputation computationStateCache) { + this.computationId = computationId; + this.mapTask = mapTask; + this.executor = executor; + this.transformUserNameToStateFamily = + transformUserNameToStateFamily != null + ? ImmutableMap.copyOf(transformUserNameToStateFamily) + : ImmutableMap.of(); + this.computationStateCache = computationStateCache; + Preconditions.checkNotNull(mapTask.getStageName()); Review Comment: ditto (also if kept put the preconditions at the top?) ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java: ########## @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming; + +import com.google.api.services.dataflow.model.MapTask; +import java.io.PrintWriter; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import javax.annotation.Nullable; +import org.apache.beam.runners.dataflow.worker.WindmillStateCache; +import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class representing the state of a computation. + * + * <p>This class is synchronized, but only used from the dispatch and commit threads, so should not + * be heavily contended. Still, blocking work should not be done by it. + */ +public class ComputationState implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(ComputationState.class); + + // The max number of keys in COMMITTING or COMMIT_QUEUED status to be shown. + private static final int MAX_PRINTABLE_COMMIT_PENDING_KEYS = 50; + + private final String computationId; + private final MapTask mapTask; + private final ImmutableMap<String, String> transformUserNameToStateFamily; + // Map from key to work for the key. The first item in the queue is + // actively processing. Synchronized by itself. + private final Map<ShardedKey, Deque<Work>> activeWork = new HashMap<>(); + private final BoundedQueueExecutor executor; + private final ConcurrentLinkedQueue<ExecutionState> executionStateQueue = + new ConcurrentLinkedQueue<>(); + private final WindmillStateCache.ForComputation computationStateCache; + + public ComputationState( + String computationId, + MapTask mapTask, + BoundedQueueExecutor executor, + Map<String, String> transformUserNameToStateFamily, + WindmillStateCache.ForComputation computationStateCache) { + this.computationId = computationId; + this.mapTask = mapTask; + this.executor = executor; + this.transformUserNameToStateFamily = + transformUserNameToStateFamily != null + ? ImmutableMap.copyOf(transformUserNameToStateFamily) + : ImmutableMap.of(); + this.computationStateCache = computationStateCache; + Preconditions.checkNotNull(mapTask.getStageName()); + Preconditions.checkNotNull(mapTask.getSystemName()); + } + + public String getComputationId() { + return computationId; + } + + public MapTask getMapTask() { + return mapTask; + } + + public ImmutableMap<String, String> getTransformUserNameToStateFamily() { + return transformUserNameToStateFamily; + } + + public ConcurrentLinkedQueue<ExecutionState> getExecutionStateQueue() { + return executionStateQueue; + } + + /** Mark the given shardedKey and work as active. */ + public boolean activateWork(ShardedKey shardedKey, Work work) { + synchronized (activeWork) { + Deque<Work> queue = activeWork.get(shardedKey); + if (queue != null) { + Preconditions.checkState(!queue.isEmpty()); + // Ensure we don't already have this work token queueud. + for (Work queuedWork : queue) { + if (queuedWork.getWorkItem().getWorkToken() == work.getWorkItem().getWorkToken()) { + return false; + } + } + // Queue the work for later processing. + queue.addLast(work); + return true; + } else { + queue = new ArrayDeque<>(); + queue.addLast(work); + activeWork.put(shardedKey, queue); + // Fall through to execute without the lock held. + } + } + executor.execute(work, work.getWorkItem().getSerializedSize()); + return true; + } + + /** + * Marks the work for the given shardedKey as complete. Schedules queued work for the key if any. + */ + public void completeWork(ShardedKey shardedKey, long workToken) { + Work nextWork; + synchronized (activeWork) { + Queue<Work> queue = activeWork.get(shardedKey); + if (queue == null) { + // Work may have been completed due to clearing of stuck commits. + LOG.warn( + "Unable to complete inactive work for key {} and token {}.", shardedKey, workToken); + return; + } + Work completedWork = queue.peek(); + // avoid Preconditions.checkState here to prevent eagerly evaluating the + // format string parameters for the error message. + if (completedWork == null) { + throw new IllegalStateException( + String.format("Active key %s without work, expected token %d", shardedKey, workToken)); + } + if (completedWork.getWorkItem().getWorkToken() != workToken) { + // Work may have been completed due to clearing of stuck commits. + LOG.warn( + "Unable to complete due to token mismatch for key {} and token {}, actual token was {}.", + shardedKey, + workToken, + completedWork.getWorkItem().getWorkToken()); + return; + } + queue.remove(); // We consumed the matching work item. + nextWork = queue.peek(); + if (nextWork == null) { + Preconditions.checkState(queue == activeWork.remove(shardedKey)); + } + } + if (nextWork != null) { + executor.forceExecute(nextWork, nextWork.getWorkItem().getSerializedSize()); + } + } + + public void invalidateStuckCommits(Instant stuckCommitDeadline) { + synchronized (activeWork) { + // Determine the stuck commit keys but complete them outside the loop iterating over + // activeWork as completeWork may delete the entry from activeWork. + Map<ShardedKey, Long> stuckCommits = new HashMap<>(); + for (Map.Entry<ShardedKey, Deque<Work>> entry : activeWork.entrySet()) { + ShardedKey shardedKey = entry.getKey(); + @Nullable Work work = entry.getValue().peek(); + if (work != null) { + if (work.getState() == Work.State.COMMITTING + && work.getStateStartTime().isBefore(stuckCommitDeadline)) { + LOG.error( + "Detected key {} stuck in COMMITTING state since {}, completing it with error.", + shardedKey, + work.getStateStartTime()); + stuckCommits.put(shardedKey, work.getWorkItem().getWorkToken()); + } + } + } + for (Map.Entry<ShardedKey, Long> stuckCommit : stuckCommits.entrySet()) { + computationStateCache.invalidate( + stuckCommit.getKey().key(), stuckCommit.getKey().shardingKey()); + completeWork(stuckCommit.getKey(), stuckCommit.getValue()); + } + } + } + + /** Adds any work started before the refreshDeadline to the GetDataRequest builder. */ + public List<Windmill.KeyedGetDataRequest> getKeysToRefresh(Instant refreshDeadline) { + List<Windmill.KeyedGetDataRequest> result = new ArrayList<>(); + synchronized (activeWork) { + for (Map.Entry<ShardedKey, Deque<Work>> entry : activeWork.entrySet()) { + ShardedKey shardedKey = entry.getKey(); + for (Work work : entry.getValue()) { + if (work.getStartTime().isBefore(refreshDeadline)) { + result.add( + Windmill.KeyedGetDataRequest.newBuilder() + .setKey(shardedKey.key()) + .setShardingKey(shardedKey.shardingKey()) + .setWorkToken(work.getWorkItem().getWorkToken()) + .addAllLatencyAttribution(work.getLatencyAttributions()) + .build()); + } + } + } + } + return result; + } + + private String elapsedString(Instant start, Instant end) { + Duration activeFor = new Duration(start, end); + // Duration's toString always starts with "PT"; remove that here. + return activeFor.toString().substring(2); + } + + public void printActiveWork(PrintWriter writer) { + writer.println( + "<table border=\"1\" " + + "style=\"border-collapse:collapse;padding:5px;border-spacing:5px;border:1px\">"); + writer.println( + "<tr><th>Key</th><th>Token</th><th>Queued</th><th>Active For</th><th>State</th><th>State Active For</th></tr>"); + + // We use a StringBuilder in the synchronized section to buffer writes since the provided + // PrintWriter may block when flushing. + StringBuilder builder = new StringBuilder(); + final Instant now = Instant.now(); + int commitPendingCount = 0; + synchronized (activeWork) { + for (Map.Entry<ShardedKey, Deque<Work>> entry : activeWork.entrySet()) { + Queue<Work> queue = Preconditions.checkNotNull(entry.getValue()); + Work work = Preconditions.checkNotNull(queue.peek()); + Windmill.WorkItem workItem = work.getWorkItem(); + if (work.isCommitPending()) { + if (++commitPendingCount >= MAX_PRINTABLE_COMMIT_PENDING_KEYS) { + continue; + } + } + builder.append("<tr>"); + builder.append("<td>"); + builder.append(String.format("%016x", workItem.getShardingKey())); + builder.append("</td><td>"); + builder.append(String.format("%016x", workItem.getWorkToken())); + builder.append("</td><td>"); + builder.append(queue.size() - 1); + builder.append("</td><td>"); + builder.append(elapsedString(work.getStartTime(), now)); + builder.append("</td><td>"); + builder.append(work.getState()); + builder.append("</td><td>"); + builder.append(elapsedString(work.getStateStartTime(), now)); + builder.append("</td></tr>\n"); + } + } + writer.print(builder); + writer.println("</table>"); + if (commitPendingCount >= MAX_PRINTABLE_COMMIT_PENDING_KEYS) { + writer.println("<br>"); + writer.print("Skipped keys in COMMITTING/COMMIT_QUEUED: "); + writer.println(commitPendingCount - MAX_PRINTABLE_COMMIT_PENDING_KEYS); + writer.println("<br>"); + } + } + + @Override + public void close() throws Exception { + ExecutionState executionState; Review Comment: nullable ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ########## @@ -170,106 +191,35 @@ public class StreamingDataflowWorker { private static final Function<MapTask, MutableNetwork<Node, Edge>> mapTaskToBaseNetwork = new MapTaskToNetworkFunction(idGenerator); - private static Random clientIdGenerator = new Random(); - - // Maximum number of threads for processing. Currently each thread processes one key at a time. - static final int MAX_PROCESSING_THREADS = 300; - static final long THREAD_EXPIRATION_TIME_SEC = 60; - static final long TARGET_COMMIT_BUNDLE_BYTES = 32 << 20; - static final int MAX_COMMIT_QUEUE_BYTES = 500 << 20; // 500MB - static final int NUM_COMMIT_STREAMS = 1; - static final int GET_WORK_STREAM_TIMEOUT_MINUTES = 3; - static final Duration COMMIT_STREAM_TIMEOUT = Duration.standardMinutes(1); - private static final int DEFAULT_STATUS_PORT = 8081; - // Maximum size of the result of a GetWork request. private static final long MAX_GET_WORK_FETCH_BYTES = 64L << 20; // 64m - // Reserved ID for counter updates. // Matches kWindmillCounterUpdate in workflow_worker_service_multi_hubs.cc. private static final String WINDMILL_COUNTER_UPDATE_WORK_ID = "3"; - /** Maximum number of failure stacktraces to report in each update sent to backend. */ private static final int MAX_FAILURES_TO_REPORT_IN_UPDATE = 1000; - // TODO(https://github.com/apache/beam/issues/19632): Update throttling counters to use generic - // throttling-msecs metric. - public static final MetricName BIGQUERY_STREAMING_INSERT_THROTTLE_TIME = - MetricName.named( - "org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl", - "throttling-msecs"); - private static final Duration MAX_LOCAL_PROCESSING_RETRY_DURATION = Duration.standardMinutes(5); - - /** Returns whether an exception was caused by a {@link OutOfMemoryError}. */ - private static boolean isOutOfMemoryError(Throwable t) { - while (t != null) { - if (t instanceof OutOfMemoryError) { - return true; - } - t = t.getCause(); - } - return false; - } - - private static MapTask parseMapTask(String input) throws IOException { - return Transport.getJsonFactory().fromString(input, MapTask.class); - } - - public static void main(String[] args) throws Exception { - JvmInitializers.runOnStartup(); - - DataflowWorkerHarnessHelper.initializeLogging(StreamingDataflowWorker.class); - DataflowWorkerHarnessOptions options = - DataflowWorkerHarnessHelper.initializeGlobalStateAndPipelineOptions( - StreamingDataflowWorker.class); - DataflowWorkerHarnessHelper.configureLogging(options); - checkArgument( - options.isStreaming(), - "%s instantiated with options indicating batch use", - StreamingDataflowWorker.class.getName()); - - checkArgument( - !DataflowRunner.hasExperiment(options, "beam_fn_api"), - "%s cannot be main() class with beam_fn_api enabled", - StreamingDataflowWorker.class.getSimpleName()); - - StreamingDataflowWorker worker = - StreamingDataflowWorker.fromDataflowWorkerHarnessOptions(options); - - // Use the MetricsLogger container which is used by BigQueryIO to periodically log process-wide - // metrics. - MetricsEnvironment.setProcessWideContainer(new MetricsLogger(null)); - - JvmInitializers.runBeforeProcessing(options); - worker.startStatusPages(); - worker.start(); - } - + private static final Random clientIdGenerator = new Random(); + final WindmillStateCache stateCache; Review Comment: can this be private final? and stay in same location? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ########## @@ -1393,7 +1367,7 @@ private Commit batchCommitsToStream(CommitWorkStream commitStream) { Commit commit; try { if (commits < 5) { - commit = commitQueue.poll(10 - 2 * commits, TimeUnit.MILLISECONDS); + commit = commitQueue.poll(10 - 2L * commits, TimeUnit.MILLISECONDS); Review Comment: why is L required? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
