scwhittle commented on code in PR #30764:
URL: https://github.com/apache/beam/pull/30764#discussion_r1551205078
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdater.java:
##########
@@ -67,7 +67,7 @@ public DataflowWorkProgressUpdater(
super(worker, Integer.MAX_VALUE);
this.workItemStatusClient = workItemStatusClient;
this.workItem = workItem;
- this.hotKeyLogger = new HotKeyLogger();
+ this.hotKeyLogger = HotKeyLogger.ofSystemClock();
Review Comment:
Can cleanups be moved to separate PRs? Less churn if things are reverted and
easier to review and summarize with commit description.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java:
##########
@@ -64,7 +64,7 @@
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
-class DataflowWorkUnitClient implements WorkUnitClient {
+public class DataflowWorkUnitClient implements WorkUnitClient {
Review Comment:
mark these classes you are making public as Internal
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/HotKeyLogger.java:
##########
@@ -18,30 +18,38 @@
package org.apache.beam.runners.dataflow.worker;
import com.google.api.client.util.Clock;
+import javax.annotation.concurrent.NotThreadSafe;
import org.apache.beam.runners.dataflow.util.TimeUtil;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@NotThreadSafe
Review Comment:
mark Internal
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReaderCache.java:
##########
@@ -39,7 +39,7 @@
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
-class ReaderCache {
+public class ReaderCache {
Review Comment:
internal
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -599,8 +601,10 @@ public static void main(String[] args) throws Exception {
StreamingDataflowWorker.class.getSimpleName());
LOG.debug("Creating StreamingDataflowWorker from options: {}", options);
- StreamingDataflowWorker worker =
StreamingDataflowWorker.fromOptions(options);
-
+ StreamingWorkerHarness worker =
+ isDirectPathPipeline(options)
+ ? StreamingEngineDirectPathWorkerHarness.fromOptions(options)
+ : StreamingDataflowWorker.fromOptions(options);
Review Comment:
how about moving the harness portions of StreamingDataflowWorker to a new
StreamingSingleEndpointWorkerHarness class? (open to better name ideas).
I think that is clearer which parts of this file are non-direct path
specific and which are shared.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/computations/ActiveWorkState.java:
##########
@@ -124,11 +128,40 @@ private static Stream<HeartbeatRequest>
toHeartbeatRequestStream(
.setWorkToken(work.getWorkItem().getWorkToken())
.setCacheToken(work.getWorkItem().getCacheToken())
.addAllLatencyAttribution(
- work.getLatencyAttributions(
- /* isHeartbeat= */ true,
work.getLatencyTrackingId(), sampler))
+ work.getLatencyAttributions(/* isHeartbeat= */ true,
sampler))
.build());
}
+ private static Stream<DirectHeartbeatRequest>
toHeartbeatRequestStreamDirectPath(
+ Entry<ShardedKey, Deque<Work>> shardedKeyAndWorkQueue,
+ Instant refreshDeadline,
+ DataflowExecutionStateSampler sampler) {
+ ShardedKey shardedKey = shardedKeyAndWorkQueue.getKey();
+ Deque<Work> workQueue = shardedKeyAndWorkQueue.getValue();
+
+ return workQueue.stream()
+ .filter(work -> work.getStartTime().isBefore(refreshDeadline))
+ .peek(
+ work -> {
+ if (work.getProcessWorkItemClient().getDataStream().isClosed()) {
+ work.setFailed();
+ }
+ })
+ // Don't send heartbeats for queued work we already know is failed.
+ .filter(work -> !work.isFailed())
+ .map(
+ work ->
+ DirectHeartbeatRequest.create(
Review Comment:
it seems like it would be more convenient for the caller to get back a
map<DataStream, heartbeats>, and then we could avoid the other
DirectHeartbeatRequest class.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceConfigLoader.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import java.util.Optional;
+import java.util.function.Consumer;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetConfigResponse;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
+import org.apache.beam.sdk.annotations.Internal;
+
+@Internal
+public final class StreamingApplianceConfigLoader
+ implements StreamingConfigLoader<GetConfigResponse> {
+
+ private final WindmillServerStub windmillServer;
+ private final Consumer<GetConfigResponse> onConfigResponse;
+
+ public StreamingApplianceConfigLoader(
+ WindmillServerStub windmillServer, Consumer<GetConfigResponse>
onConfigResponse) {
+ this.windmillServer = windmillServer;
+ this.onConfigResponse = onConfigResponse;
+ }
+
+ @Override
+ public void start() {
+ // no-op. Does not perform any asynchronous processing internally.
+ }
+
+ @Override
+ public Optional<GetConfigResponse> getComputationConfig(String
computationId) {
+ GetConfigResponse getConfigResponse =
Review Comment:
verify computationid is non-empty
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/computations/ActiveWorkState.java:
##########
@@ -124,11 +128,40 @@ private static Stream<HeartbeatRequest>
toHeartbeatRequestStream(
.setWorkToken(work.getWorkItem().getWorkToken())
.setCacheToken(work.getWorkItem().getCacheToken())
.addAllLatencyAttribution(
- work.getLatencyAttributions(
- /* isHeartbeat= */ true,
work.getLatencyTrackingId(), sampler))
+ work.getLatencyAttributions(/* isHeartbeat= */ true,
sampler))
.build());
}
+ private static Stream<DirectHeartbeatRequest>
toHeartbeatRequestStreamDirectPath(
Review Comment:
It would be nice to have a single helper used by both direct/non-direct
methods since they are largely the same and could otherwise drift.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/computations/ActiveWorkState.java:
##########
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.runners.dataflow.worker.streaming;
+package org.apache.beam.runners.dataflow.worker.streaming.computations;
Review Comment:
computations is a confusing package name to me. We are grouping by
computation but the objects are more to do with processing.
However instead of renaming the package how about just leaving these in
streaming?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingEngineDirectPathWorkerHarness.java:
##########
@@ -0,0 +1,1131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.harness;
+
+import static org.apache.beam.runners.dataflow.DataflowRunner.hasExperiment;
+import static
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.remoteChannel;
+
+import com.google.api.services.dataflow.model.MapTask;
+import com.google.auto.value.AutoValue;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.beam.runners.dataflow.internal.CustomSources;
+import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
+import org.apache.beam.runners.dataflow.util.CloudObject;
+import org.apache.beam.runners.dataflow.util.CloudObjects;
+import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext;
+import org.apache.beam.runners.dataflow.worker.DataflowExecutionStateSampler;
+import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutor;
+import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutorFactory;
+import org.apache.beam.runners.dataflow.worker.DataflowWorkUnitClient;
+import org.apache.beam.runners.dataflow.worker.HotKeyLogger;
+import org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory;
+import org.apache.beam.runners.dataflow.worker.ReaderCache;
+import org.apache.beam.runners.dataflow.worker.ReaderRegistry;
+import org.apache.beam.runners.dataflow.worker.SinkRegistry;
+import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext;
+import org.apache.beam.runners.dataflow.worker.WindmillComputationKey;
+import org.apache.beam.runners.dataflow.worker.WindmillKeyedWorkItem;
+import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils;
+import org.apache.beam.runners.dataflow.worker.WorkItemCancelledException;
+import org.apache.beam.runners.dataflow.worker.WorkUnitClient;
+import org.apache.beam.runners.dataflow.worker.counters.NameContext;
+import org.apache.beam.runners.dataflow.worker.graph.Edges;
+import org.apache.beam.runners.dataflow.worker.graph.Networks;
+import org.apache.beam.runners.dataflow.worker.graph.Nodes;
+import
org.apache.beam.runners.dataflow.worker.graph.Nodes.ParallelInstructionNode;
+import
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingMDC;
+import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler;
+import org.apache.beam.runners.dataflow.worker.status.DebugCapture;
+import org.apache.beam.runners.dataflow.worker.status.WorkerStatusPages;
+import org.apache.beam.runners.dataflow.worker.streaming.ExecutionState;
+import
org.apache.beam.runners.dataflow.worker.streaming.KeyCommitTooLargeException;
+import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey;
+import org.apache.beam.runners.dataflow.worker.streaming.StageInfo;
+import org.apache.beam.runners.dataflow.worker.streaming.Work;
+import
org.apache.beam.runners.dataflow.worker.streaming.WorkHeartbeatResponseProcessor;
+import
org.apache.beam.runners.dataflow.worker.streaming.computations.ComputationState;
+import
org.apache.beam.runners.dataflow.worker.streaming.computations.ComputationStateCache;
+import
org.apache.beam.runners.dataflow.worker.streaming.computations.StreamingEngineComputationStateCacheLoader;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingConfigLoader;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEngineConfigLoader;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEnginePipelineConfig;
+import
org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
+import
org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter;
+import
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor;
+import
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter;
+import
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
+import org.apache.beam.runners.dataflow.worker.windmill.client.CloseableStream;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream;
+import org.apache.beam.runners.dataflow.worker.windmill.client.commits.Commit;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.commits.CompleteCommit;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.commits.StreamingEngineWorkCommitter;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.ChannelzServlet;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcDispatcherClient;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.StreamingEngineClient;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCache;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingRemoteStubFactory;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingStubFactory;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.IsolationChannel;
+import org.apache.beam.runners.dataflow.worker.windmill.state.GetDataClient;
+import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateReader;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.ProcessWorkItemClient;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetDistributors;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.FailureTracker;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.StreamingEngineFailureTracker;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.WorkFailureProcessor;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.refresh.ActiveWorkRefresher;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.refresh.DirectActiveWorkRefresher;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.graph.MutableNetwork;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({
+ "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public final class StreamingEngineDirectPathWorkerHarness implements
StreamingWorkerHarness {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(StreamingEngineDirectPathWorkerHarness.class);
+ // Controls processing parallelism. Maximum number of threads for
processing. Currently, each
+ // thread processes one key at a time.
+ private static final int MAX_PROCESSING_THREADS = 300;
Review Comment:
There is too much duplication between this and the other harness, which will
make it difficult to add new features (such as current PR to make processing
threads dynamic).
It seems like a lot of the things: executor, metrics, reporting, cache etc
are not affected by how work is obtained, committed or state fetched. It would
be better if we could instead keep the logic and just inject different work
obtainer, committer, state fetcher.
Or alternatively we could make everything work with direct-path by always
plumbing somethign to use for getdata/commitwork and in the non-direct path
cases just having a single one.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/computations/ComputationState.java:
##########
@@ -74,8 +83,12 @@ public ImmutableMap<String, String>
getTransformUserNameToStateFamily() {
return transformUserNameToStateFamily;
}
- public ConcurrentLinkedQueue<ExecutionState> getExecutionStateQueue() {
- return executionStateQueue;
+ public void releaseExecutionState(ExecutionState executionState) {
+ executionStateQueue.offer(executionState);
+ }
+
+ public Optional<ExecutionState> getExecutionState() {
Review Comment:
name acquireExecutionState or pollExecutionState?
get makes it sound like a simple accessor
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java:
##########
@@ -64,27 +75,92 @@ private Work(Windmill.WorkItem workItem, Supplier<Instant>
clock, Consumer<Work>
.setCacheToken(workItem.getCacheToken())
.setWorkToken(workItem.getWorkToken())
.build();
+ this.latencyTrackingId = buildLatencyTrackingId(workItem);
Review Comment:
rm method and just inline?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/computations/StreamingApplianceComputationStateCacheLoader.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.computations;
+
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap;
+
+import com.google.api.services.dataflow.model.MapTask;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingConfigLoader;
+import
org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingEnvironment;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.gcp.util.Transport;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public final class StreamingApplianceComputationStateCacheLoader
Review Comment:
Let's put the config loading changes in a separate PR
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]