scwhittle commented on code in PR #30764: URL: https://github.com/apache/beam/pull/30764#discussion_r1551205078
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkProgressUpdater.java: ########## @@ -67,7 +67,7 @@ public DataflowWorkProgressUpdater( super(worker, Integer.MAX_VALUE); this.workItemStatusClient = workItemStatusClient; this.workItem = workItem; - this.hotKeyLogger = new HotKeyLogger(); + this.hotKeyLogger = HotKeyLogger.ofSystemClock(); Review Comment: Can cleanups be moved to separate PRs? Less churn if things are reverted and easier to review and summarize with commit description. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkUnitClient.java: ########## @@ -64,7 +64,7 @@ @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) -class DataflowWorkUnitClient implements WorkUnitClient { +public class DataflowWorkUnitClient implements WorkUnitClient { Review Comment: mark these classes you are making public as Internal ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/HotKeyLogger.java: ########## @@ -18,30 +18,38 @@ package org.apache.beam.runners.dataflow.worker; import com.google.api.client.util.Clock; +import javax.annotation.concurrent.NotThreadSafe; import org.apache.beam.runners.dataflow.util.TimeUtil; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@NotThreadSafe Review Comment: mark Internal ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ReaderCache.java: ########## @@ -39,7 +39,7 @@ @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) -class ReaderCache { +public class ReaderCache { Review Comment: internal ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ########## @@ -599,8 +601,10 @@ public static void main(String[] args) throws Exception { StreamingDataflowWorker.class.getSimpleName()); LOG.debug("Creating StreamingDataflowWorker from options: {}", options); - StreamingDataflowWorker worker = StreamingDataflowWorker.fromOptions(options); - + StreamingWorkerHarness worker = + isDirectPathPipeline(options) + ? StreamingEngineDirectPathWorkerHarness.fromOptions(options) + : StreamingDataflowWorker.fromOptions(options); Review Comment: how about moving the harness portions of StreamingDataflowWorker to a new StreamingSingleEndpointWorkerHarness class? (open to better name ideas). I think that is clearer which parts of this file are non-direct path specific and which are shared. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/computations/ActiveWorkState.java: ########## @@ -124,11 +128,40 @@ private static Stream<HeartbeatRequest> toHeartbeatRequestStream( .setWorkToken(work.getWorkItem().getWorkToken()) .setCacheToken(work.getWorkItem().getCacheToken()) .addAllLatencyAttribution( - work.getLatencyAttributions( - /* isHeartbeat= */ true, work.getLatencyTrackingId(), sampler)) + work.getLatencyAttributions(/* isHeartbeat= */ true, sampler)) .build()); } + private static Stream<DirectHeartbeatRequest> toHeartbeatRequestStreamDirectPath( + Entry<ShardedKey, Deque<Work>> shardedKeyAndWorkQueue, + Instant refreshDeadline, + DataflowExecutionStateSampler sampler) { + ShardedKey shardedKey = shardedKeyAndWorkQueue.getKey(); + Deque<Work> workQueue = shardedKeyAndWorkQueue.getValue(); + + return workQueue.stream() + .filter(work -> work.getStartTime().isBefore(refreshDeadline)) + .peek( + work -> { + if (work.getProcessWorkItemClient().getDataStream().isClosed()) { + work.setFailed(); + } + }) + // Don't send heartbeats for queued work we already know is failed. + .filter(work -> !work.isFailed()) + .map( + work -> + DirectHeartbeatRequest.create( Review Comment: it seems like it would be more convenient for the caller to get back a map<DataStream, heartbeats>, and then we could avoid the other DirectHeartbeatRequest class. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceConfigLoader.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming.config; + +import java.util.Optional; +import java.util.function.Consumer; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetConfigResponse; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub; +import org.apache.beam.sdk.annotations.Internal; + +@Internal +public final class StreamingApplianceConfigLoader + implements StreamingConfigLoader<GetConfigResponse> { + + private final WindmillServerStub windmillServer; + private final Consumer<GetConfigResponse> onConfigResponse; + + public StreamingApplianceConfigLoader( + WindmillServerStub windmillServer, Consumer<GetConfigResponse> onConfigResponse) { + this.windmillServer = windmillServer; + this.onConfigResponse = onConfigResponse; + } + + @Override + public void start() { + // no-op. Does not perform any asynchronous processing internally. + } + + @Override + public Optional<GetConfigResponse> getComputationConfig(String computationId) { + GetConfigResponse getConfigResponse = Review Comment: verify computationid is non-empty ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/computations/ActiveWorkState.java: ########## @@ -124,11 +128,40 @@ private static Stream<HeartbeatRequest> toHeartbeatRequestStream( .setWorkToken(work.getWorkItem().getWorkToken()) .setCacheToken(work.getWorkItem().getCacheToken()) .addAllLatencyAttribution( - work.getLatencyAttributions( - /* isHeartbeat= */ true, work.getLatencyTrackingId(), sampler)) + work.getLatencyAttributions(/* isHeartbeat= */ true, sampler)) .build()); } + private static Stream<DirectHeartbeatRequest> toHeartbeatRequestStreamDirectPath( Review Comment: It would be nice to have a single helper used by both direct/non-direct methods since they are largely the same and could otherwise drift. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/computations/ActiveWorkState.java: ########## @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.dataflow.worker.streaming; +package org.apache.beam.runners.dataflow.worker.streaming.computations; Review Comment: computations is a confusing package name to me. We are grouping by computation but the objects are more to do with processing. However instead of renaming the package how about just leaving these in streaming? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingEngineDirectPathWorkerHarness.java: ########## @@ -0,0 +1,1131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming.harness; + +import static org.apache.beam.runners.dataflow.DataflowRunner.hasExperiment; +import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.remoteChannel; + +import com.google.api.services.dataflow.model.MapTask; +import com.google.auto.value.AutoValue; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import org.apache.beam.runners.dataflow.internal.CustomSources; +import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions; +import org.apache.beam.runners.dataflow.util.CloudObject; +import org.apache.beam.runners.dataflow.util.CloudObjects; +import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext; +import org.apache.beam.runners.dataflow.worker.DataflowExecutionStateSampler; +import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutor; +import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutorFactory; +import org.apache.beam.runners.dataflow.worker.DataflowWorkUnitClient; +import org.apache.beam.runners.dataflow.worker.HotKeyLogger; +import org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory; +import org.apache.beam.runners.dataflow.worker.ReaderCache; +import org.apache.beam.runners.dataflow.worker.ReaderRegistry; +import org.apache.beam.runners.dataflow.worker.SinkRegistry; +import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext; +import org.apache.beam.runners.dataflow.worker.WindmillComputationKey; +import org.apache.beam.runners.dataflow.worker.WindmillKeyedWorkItem; +import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils; +import org.apache.beam.runners.dataflow.worker.WorkItemCancelledException; +import org.apache.beam.runners.dataflow.worker.WorkUnitClient; +import org.apache.beam.runners.dataflow.worker.counters.NameContext; +import org.apache.beam.runners.dataflow.worker.graph.Edges; +import org.apache.beam.runners.dataflow.worker.graph.Networks; +import org.apache.beam.runners.dataflow.worker.graph.Nodes; +import org.apache.beam.runners.dataflow.worker.graph.Nodes.ParallelInstructionNode; +import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingMDC; +import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler; +import org.apache.beam.runners.dataflow.worker.status.DebugCapture; +import org.apache.beam.runners.dataflow.worker.status.WorkerStatusPages; +import org.apache.beam.runners.dataflow.worker.streaming.ExecutionState; +import org.apache.beam.runners.dataflow.worker.streaming.KeyCommitTooLargeException; +import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey; +import org.apache.beam.runners.dataflow.worker.streaming.StageInfo; +import org.apache.beam.runners.dataflow.worker.streaming.Work; +import org.apache.beam.runners.dataflow.worker.streaming.WorkHeartbeatResponseProcessor; +import org.apache.beam.runners.dataflow.worker.streaming.computations.ComputationState; +import org.apache.beam.runners.dataflow.worker.streaming.computations.ComputationStateCache; +import org.apache.beam.runners.dataflow.worker.streaming.computations.StreamingEngineComputationStateCacheLoader; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingConfigLoader; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEngineConfigLoader; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEnginePipelineConfig; +import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher; +import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; +import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor; +import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter; +import org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor; +import org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter; +import org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; +import org.apache.beam.runners.dataflow.worker.windmill.client.CloseableStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream; +import org.apache.beam.runners.dataflow.worker.windmill.client.commits.Commit; +import org.apache.beam.runners.dataflow.worker.windmill.client.commits.CompleteCommit; +import org.apache.beam.runners.dataflow.worker.windmill.client.commits.StreamingEngineWorkCommitter; +import org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.ChannelzServlet; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcDispatcherClient; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.StreamingEngineClient; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCache; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingRemoteStubFactory; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingStubFactory; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.IsolationChannel; +import org.apache.beam.runners.dataflow.worker.windmill.state.GetDataClient; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateReader; +import org.apache.beam.runners.dataflow.worker.windmill.work.ProcessWorkItemClient; +import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget; +import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetDistributors; +import org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.FailureTracker; +import org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.StreamingEngineFailureTracker; +import org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.WorkFailureProcessor; +import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.ActiveWorkRefresher; +import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.DirectActiveWorkRefresher; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; +import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.graph.MutableNetwork; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public final class StreamingEngineDirectPathWorkerHarness implements StreamingWorkerHarness { + private static final Logger LOG = + LoggerFactory.getLogger(StreamingEngineDirectPathWorkerHarness.class); + // Controls processing parallelism. Maximum number of threads for processing. Currently, each + // thread processes one key at a time. + private static final int MAX_PROCESSING_THREADS = 300; Review Comment: There is too much duplication between this and the other harness, which will make it difficult to add new features (such as current PR to make processing threads dynamic). It seems like a lot of the things: executor, metrics, reporting, cache etc are not affected by how work is obtained, committed or state fetched. It would be better if we could instead keep the logic and just inject different work obtainer, committer, state fetcher. Or alternatively we could make everything work with direct-path by always plumbing somethign to use for getdata/commitwork and in the non-direct path cases just having a single one. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/computations/ComputationState.java: ########## @@ -74,8 +83,12 @@ public ImmutableMap<String, String> getTransformUserNameToStateFamily() { return transformUserNameToStateFamily; } - public ConcurrentLinkedQueue<ExecutionState> getExecutionStateQueue() { - return executionStateQueue; + public void releaseExecutionState(ExecutionState executionState) { + executionStateQueue.offer(executionState); + } + + public Optional<ExecutionState> getExecutionState() { Review Comment: name acquireExecutionState or pollExecutionState? get makes it sound like a simple accessor ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java: ########## @@ -64,27 +75,92 @@ private Work(Windmill.WorkItem workItem, Supplier<Instant> clock, Consumer<Work> .setCacheToken(workItem.getCacheToken()) .setWorkToken(workItem.getWorkToken()) .build(); + this.latencyTrackingId = buildLatencyTrackingId(workItem); Review Comment: rm method and just inline? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/computations/StreamingApplianceComputationStateCacheLoader.java: ########## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming.computations; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap; + +import com.google.api.services.dataflow.model.MapTask; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingConfigLoader; +import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingEnvironment; +import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.extensions.gcp.util.Transport; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Internal +public final class StreamingApplianceComputationStateCacheLoader Review Comment: Let's put the config loading changes in a separate PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org