scwhittle commented on code in PR #30764:
URL: https://github.com/apache/beam/pull/30764#discussion_r1574449155
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/HotKeyLogger.java:
##########
@@ -18,30 +18,41 @@
package org.apache.beam.runners.dataflow.worker;
import com.google.api.client.util.Clock;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
import org.apache.beam.runners.dataflow.util.TimeUtil;
+import org.apache.beam.sdk.annotations.Internal;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@NotThreadSafe
+@Internal
public class HotKeyLogger {
private final Logger LOG = LoggerFactory.getLogger(HotKeyLogger.class);
/** Clock used to either provide real system time or mocked to virtualize
time for testing. */
- private Clock clock = Clock.SYSTEM;
-
+ private final Clock clock;
+ /** Throttles logging the detection to every loggingPeriod */
+ private final Duration loggingPeriod = Duration.standardMinutes(5);
/**
* The previous time the HotKeyDetection was logged. This is used to
throttle logging to every 5
* minutes.
*/
private long prevHotKeyDetectionLogMs = 0;
- /** Throttles logging the detection to every loggingPeriod */
- private final Duration loggingPeriod = Duration.standardMinutes(5);
+ private HotKeyLogger(Clock clock) {
+ this.clock = clock;
+ }
- HotKeyLogger() {}
+ public static HotKeyLogger ofSystemClock() {
Review Comment:
just name create?
##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsLogger.java:
##########
@@ -33,10 +33,18 @@ public class MetricsLogger extends MetricsContainerImpl {
AtomicLong lastReportedMillis = new AtomicLong(System.currentTimeMillis());
@Nullable MetricsContainerImpl lastMetricsSnapshot = null;
- public MetricsLogger(@Nullable String stepName) {
+ private MetricsLogger(@Nullable String stepName) {
super(stepName);
}
+ public static MetricsLogger createUnboundedMetricsLogger() {
Review Comment:
unbounded is confusing with bounded/unbounded pcollection.
How about workerMetricsLogger if it is metrics not scoped to a step?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/logging/DataflowWorkerLoggingMDC.java:
##########
@@ -17,15 +17,19 @@
*/
package org.apache.beam.runners.dataflow.worker.logging;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
/** Mapped diagnostic context for the Dataflow worker. */
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
Review Comment:
can this be removed with your changes?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java:
##########
@@ -74,8 +79,22 @@ public ImmutableMap<String, String>
getTransformUserNameToStateFamily() {
return transformUserNameToStateFamily;
}
- public ConcurrentLinkedQueue<ExecutionState> getExecutionStateQueue() {
- return executionStateQueue;
+ /**
+ * Adds the {@link ExecutionState} to the internal {@link
#executionStateQueue} so that it can be
Review Comment:
nit: I wouldn't reference internal variable, just note that it adds to a
cache and may be returned on subsequent acquireExecutionState calls.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java:
##########
@@ -128,21 +152,26 @@ private void forceExecute(Work work) {
executor.forceExecute(work, work.getWorkItem().getSerializedSize());
}
- /** Gets HeartbeatRequests for any work started before refreshDeadline. */
- public ImmutableList<HeartbeatRequest> getKeyHeartbeats(
- Instant refreshDeadline, DataflowExecutionStateSampler sampler) {
- return activeWorkState.getKeyHeartbeats(refreshDeadline, sampler);
+ public ImmutableMap<ShardedKey, Deque<Work>> currentActiveWorkReadOnly() {
+ return activeWorkState.getReadOnlyActiveWork();
}
public void printActiveWork(PrintWriter writer) {
activeWorkState.printActiveWork(writer, Instant.now());
}
- @Override
- public void close() throws Exception {
+ public GetWorkBudget getActiveWorkBudget() {
+ return activeWorkState.currentActiveWorkBudget();
+ }
+
+ public final void close() {
@Nullable ExecutionState executionState;
while ((executionState = executionStateQueue.poll()) != null) {
- executionState.workExecutor().close();
+ try {
+ executionState.workExecutor().close();
Review Comment:
seems better to call executionState.close() instead since it is added
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java:
##########
@@ -74,8 +79,22 @@ public ImmutableMap<String, String>
getTransformUserNameToStateFamily() {
return transformUserNameToStateFamily;
}
- public ConcurrentLinkedQueue<ExecutionState> getExecutionStateQueue() {
- return executionStateQueue;
+ /**
+ * Adds the {@link ExecutionState} to the internal {@link
#executionStateQueue} so that it can be
+ * re-used in future processing.
+ */
+ public void releaseExecutionState(ExecutionState executionState) {
+ executionStateQueue.offer(executionState);
+ }
+
+ /**
+ * Removes an {@link ExecutionState} instance from {@link
#executionStateQueue} if one exists, and
Review Comment:
ditto, instead of referring to internal impl just note that it returns null
or a state that was previously offered wtih releaseExecutionState
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StreamingApplianceComputationStateCacheLoader.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap;
+
+import com.google.api.services.dataflow.model.MapTask;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingConfigLoader;
+import
org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerEnvironment;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.gcp.util.Transport;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public final class StreamingApplianceComputationStateCacheLoader
+ extends CacheLoader<String, Optional<ComputationState>> {
+ private static final Logger LOG =
+
LoggerFactory.getLogger(StreamingApplianceComputationStateCacheLoader.class);
+
+ private final StreamingConfigLoader<Windmill.GetConfigResponse>
streamingApplianceConfigLoader;
+ private final BoundedQueueExecutor workUnitExecutor;
+ private final ConcurrentHashMap<String, String> systemNameToComputationIdMap;
+ private final Function<String, WindmillStateCache.ForComputation>
+ perComputationStateCacheViewFactory;
+
+ public StreamingApplianceComputationStateCacheLoader(
+ StreamingConfigLoader<Windmill.GetConfigResponse>
streamingApplianceConfigLoader,
+ BoundedQueueExecutor workUnitExecutor,
+ Function<String, WindmillStateCache.ForComputation>
perComputationStateCacheViewFactory) {
+ this.streamingApplianceConfigLoader = streamingApplianceConfigLoader;
+ this.workUnitExecutor = workUnitExecutor;
+ this.systemNameToComputationIdMap = new ConcurrentHashMap<>();
+ this.perComputationStateCacheViewFactory =
perComputationStateCacheViewFactory;
+ }
+
+ private static Map<String, Map<String, String>>
transformUserNameToStateFamilyByComputationId(
+ Windmill.GetConfigResponse response) {
+ // Outer keys are computation ids. Outer values are map from transform
username to state
+ // family.
+ Map<String, Map<String, String>>
transformUserNameToStateFamilyByComputationId =
+ new HashMap<>();
+ for (Windmill.GetConfigResponse.ComputationConfigMapEntry
computationConfig :
+ response.getComputationConfigMapList()) {
+ Map<String, String> transformUserNameToStateFamily =
+ transformUserNameToStateFamilyByComputationId.computeIfAbsent(
+ computationConfig.getComputationId(), k -> new HashMap<>());
+ for (Windmill.ComputationConfig.TransformUserNameToStateFamilyEntry
entry :
+
computationConfig.getComputationConfig().getTransformUserNameToStateFamilyList())
{
+ transformUserNameToStateFamily.put(entry.getTransformUserName(),
entry.getStateFamily());
+ }
+ }
+
+ return transformUserNameToStateFamilyByComputationId;
+ }
+
+ /** Deserialize {@link MapTask} and populate MultiOutputInfos in MapTask. */
+ private static Optional<MapTask> deserializeAndFixMapTask(String
serializedMapTask) {
+ try {
+ return Optional.of(
+ StreamingWorkerEnvironment.fixMapTaskMultiOutputInfoFnInstance()
+ .apply(Transport.getJsonFactory().fromString(serializedMapTask,
MapTask.class)));
+ } catch (IOException e) {
+ LOG.warn("Parsing MapTask failed: {}", serializedMapTask, e);
+ }
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<ComputationState> load(String computationId) throws
Exception {
+ return Preconditions.checkNotNull(
+ loadAll(ImmutableList.of(computationId)).getOrDefault(computationId,
Optional.empty()));
+ }
+
+ @Override
+ public Map<String, Optional<ComputationState>> loadAll(
+ Iterable<? extends String> computationIds) {
+ Optional<Windmill.GetConfigResponse> maybeResponse =
+ streamingApplianceConfigLoader.getComputationConfig(
+ Iterables.getOnlyElement(computationIds));
Review Comment:
can there be a separate assert for single element?
Or what about moving the implementation to load and changing loadAll to
iterate over ids and call load for each one?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/ApplianceWorkerHarness.java:
##########
@@ -0,0 +1,572 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.harness;
+
+import com.google.api.services.dataflow.model.MapTask;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
+import org.apache.beam.runners.dataflow.worker.DataflowExecutionStateSampler;
+import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutorFactory;
+import org.apache.beam.runners.dataflow.worker.DataflowWorkUnitClient;
+import org.apache.beam.runners.dataflow.worker.HotKeyLogger;
+import org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory;
+import
org.apache.beam.runners.dataflow.worker.MetricTrackingWindmillServerStub;
+import org.apache.beam.runners.dataflow.worker.ReaderCache;
+import org.apache.beam.runners.dataflow.worker.WindmillComputationKey;
+import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils;
+import org.apache.beam.runners.dataflow.worker.WorkUnitClient;
+import org.apache.beam.runners.dataflow.worker.streaming.ComputationState;
+import org.apache.beam.runners.dataflow.worker.streaming.ComputationStateCache;
+import org.apache.beam.runners.dataflow.worker.streaming.StageInfo;
+import
org.apache.beam.runners.dataflow.worker.streaming.StreamingApplianceComputationStateCacheLoader;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingApplianceConfigLoader;
+import
org.apache.beam.runners.dataflow.worker.streaming.processing.ExecutionStateFactory;
+import
org.apache.beam.runners.dataflow.worker.streaming.processing.StreamingCommitFinalizer;
+import
org.apache.beam.runners.dataflow.worker.streaming.processing.StreamingWorkScheduler;
+import
org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.ComputationWorkItems;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkResponse;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
+import
org.apache.beam.runners.dataflow.worker.windmill.appliance.JniWindmillApplianceServer;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.commits.CompleteCommit;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.commits.StreamingApplianceWorkCommitter;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter;
+import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.WorkProcessingContext;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.FailureTracker;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.StreamingApplianceFailureTracker;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.WorkFailureProcessor;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.refresh.ActiveWorkRefresher;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.refresh.DispatchedActiveWorkRefresher;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.io.FileSystems;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Streaming Appliance worker harness implementation. */
+@Internal
+@ThreadSafe
+public final class ApplianceWorkerHarness implements StreamingWorkerHarness {
+ private static final Logger LOG =
LoggerFactory.getLogger(ApplianceWorkerHarness.class);
+ private static final String MEMORY_MONITOR_EXECUTOR = "MemoryMonitor";
+ private static final String GET_WORK_EXECUTOR = "GetWorkDispatcher";
+ private static final String REFRESH_WORK_EXECUTOR = "RefreshWork";
+
+ private final DataflowWorkerHarnessOptions options;
+ private final long clientId;
+ private final WindmillServerStub windmillServer;
+ private final MetricTrackingWindmillServerStub getDataClient;
+ private final MemoryMonitor memoryMonitor;
+ private final ComputationStateCache computationCache;
+ private final StreamingWorkScheduler streamingWorkScheduler;
+ private final WorkCommitter workCommitter;
+ private final StreamingWorkerStatusReporter workerStatusReporter;
+ private final StreamingStatusPages statusPages;
+ private final DataflowExecutionStateSampler sampler;
+ private final ActiveWorkRefresher activeWorkRefresher;
+ private final BoundedQueueExecutor workExecutor;
+ private final ExecutorService memoryMonitoryExecutor;
+ private final ExecutorService getWorkExecutor;
+ private final AtomicBoolean isRunning;
+
+ private ApplianceWorkerHarness(
+ DataflowWorkerHarnessOptions options,
+ long clientId,
+ WindmillServerStub windmillServer,
+ MetricTrackingWindmillServerStub getDataClient,
+ MemoryMonitor memoryMonitor,
+ ComputationStateCache computationCache,
+ StreamingWorkScheduler streamingWorkScheduler,
+ WorkCommitter workCommitter,
+ StreamingWorkerStatusReporter workerStatusReporter,
+ StreamingStatusPages statusPages,
+ DataflowExecutionStateSampler sampler,
+ ActiveWorkRefresher activeWorkRefresher,
+ ExecutorService memoryMonitoryExecutor,
+ ExecutorService getWorkExecutor,
+ AtomicBoolean isRunning,
+ BoundedQueueExecutor workExecutor) {
+ this.options = options;
+ this.clientId = clientId;
+ this.windmillServer = windmillServer;
+ this.getDataClient = getDataClient;
+ this.memoryMonitor = memoryMonitor;
+ this.computationCache = computationCache;
+ this.streamingWorkScheduler = streamingWorkScheduler;
+ this.workCommitter = workCommitter;
+ this.workerStatusReporter = workerStatusReporter;
+ this.statusPages = statusPages;
+ this.sampler = sampler;
+ this.activeWorkRefresher = activeWorkRefresher;
+ this.memoryMonitoryExecutor = memoryMonitoryExecutor;
+ this.getWorkExecutor = getWorkExecutor;
+ this.isRunning = isRunning;
+ this.workExecutor = workExecutor;
+ // Register standard file systems.
+ FileSystems.setDefaultPipelineOptions(options);
+ }
+
+ public static ApplianceWorkerHarness
fromOptions(DataflowWorkerHarnessOptions options) {
Review Comment:
At a high-level the difference between appliance/dispatcher/direct paths
seem to be limited to:
- how to get the config
- how to get/schedule WorkProcessingContext (which then knows how to
fetch/commit/heartbeat to right worker).
I think that moving things out of StreamingWorkerHarness into classes can
help testing/readability but that there is still a lot of duplicated setup
between the different harnesses which will make it harder to maintain.
Can we instead have a shared StreamingDataflowWorker that uses the objects
and which just minimal injection differences are made when constructing for
appliance/fanout/direct?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java:
##########
@@ -74,8 +79,22 @@ public ImmutableMap<String, String>
getTransformUserNameToStateFamily() {
return transformUserNameToStateFamily;
}
- public ConcurrentLinkedQueue<ExecutionState> getExecutionStateQueue() {
- return executionStateQueue;
+ /**
+ * Adds the {@link ExecutionState} to the internal {@link
#executionStateQueue} so that it can be
+ * re-used in future processing.
+ */
+ public void releaseExecutionState(ExecutionState executionState) {
+ executionStateQueue.offer(executionState);
Review Comment:
should we reset() the execution state? or add a comment that it should be
reset before adding here?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StreamingApplianceComputationStateCacheLoader.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap;
+
+import com.google.api.services.dataflow.model.MapTask;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingConfigLoader;
+import
org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerEnvironment;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.gcp.util.Transport;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public final class StreamingApplianceComputationStateCacheLoader
Review Comment:
perhaps the computation cache and cache loaders might be a good thing to
separate out from this PR so it isn't as monolithic?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -90,11 +83,11 @@ private ActiveWorkState(
}
static ActiveWorkState create(WindmillStateCache.ForComputation
computationStateCache) {
- return new ActiveWorkState(new HashMap<>(), computationStateCache);
+ return new ActiveWorkState(new ConcurrentHashMap<>(),
computationStateCache);
Review Comment:
why is the concurrent hash map required? It seems getReadOnlyActivework is
synchronized below
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -321,11 +291,8 @@ private synchronized ImmutableMap<ShardedKey, WorkId>
getStuckCommitsAt(
return stuckCommits.build();
}
- synchronized ImmutableList<HeartbeatRequest> getKeyHeartbeats(
- Instant refreshDeadline, DataflowExecutionStateSampler sampler) {
- return activeWork.entrySet().stream()
- .flatMap(entry -> toHeartbeatRequestStream(entry, refreshDeadline,
sampler))
- .collect(toImmutableList());
+ synchronized ImmutableMap<ShardedKey, Deque<Work>> getReadOnlyActiveWork() {
+ return ImmutableMap.copyOf(activeWork);
Review Comment:
this is just a shallow copy of deque so it seems like there will be
concurrency issues.
Instead of having to copy deeper, you could instead have a visitor pattern
where you accept some function to run over the keys and queued work with
appropriate synchronization
##########
runners/google-cloud-dataflow-java/worker/build.gradle:
##########
@@ -257,3 +257,4 @@ checkstyleMain.enabled = false
checkstyleTest.enabled = false
//TODO(https://github.com/apache/beam/issues/19119): javadoc task should be
enabled in the future.
javadoc.enabled = false
+test.outputs.upToDateWhen {false}
Review Comment:
remove?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCache.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import static java.util.stream.Collectors.toMap;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList.toImmutableList;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet.toImmutableSet;
+
+import java.io.PrintWriter;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
+import org.apache.beam.sdk.annotations.Internal;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Cache of {@link String} computationId to {@link ComputationState} objects.
*/
+@Internal
+@ThreadSafe
+public final class ComputationStateCache implements StatusDataProvider {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ComputationStateCache.class);
+
+ private final LoadingCache<String, Optional<ComputationState>>
computationCache;
+
+ private ComputationStateCache(LoadingCache<String,
Optional<ComputationState>> computationCache) {
+ this.computationCache = computationCache;
+ }
+
+ public static ComputationStateCache create(
+ CacheLoader<String, Optional<ComputationState>> cacheLoader) {
+ return new
ComputationStateCache(CacheBuilder.newBuilder().build(cacheLoader));
+ }
+
+ /**
+ * Returns the {@link ComputationState} associated with the given
computationId. May perform IO if
+ * a value is not present, and it is possible that after IO is performed
there is no value
+ * correlated with that computationId.
+ */
+ public Optional<ComputationState> getComputationState(String computationId) {
+ try {
+ Optional<ComputationState> computationState =
+
computationCache.getAll(ImmutableList.of(computationId)).get(computationId);
+ return computationState != null ? computationState : Optional.empty();
Review Comment:
I don't think null is possible for loading cache values
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StreamingApplianceComputationStateCacheLoader.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap;
+
+import com.google.api.services.dataflow.model.MapTask;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingConfigLoader;
+import
org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerEnvironment;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.gcp.util.Transport;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public final class StreamingApplianceComputationStateCacheLoader
+ extends CacheLoader<String, Optional<ComputationState>> {
+ private static final Logger LOG =
+
LoggerFactory.getLogger(StreamingApplianceComputationStateCacheLoader.class);
+
+ private final StreamingConfigLoader<Windmill.GetConfigResponse>
streamingApplianceConfigLoader;
+ private final BoundedQueueExecutor workUnitExecutor;
+ private final ConcurrentHashMap<String, String> systemNameToComputationIdMap;
+ private final Function<String, WindmillStateCache.ForComputation>
+ perComputationStateCacheViewFactory;
+
+ public StreamingApplianceComputationStateCacheLoader(
+ StreamingConfigLoader<Windmill.GetConfigResponse>
streamingApplianceConfigLoader,
+ BoundedQueueExecutor workUnitExecutor,
+ Function<String, WindmillStateCache.ForComputation>
perComputationStateCacheViewFactory) {
+ this.streamingApplianceConfigLoader = streamingApplianceConfigLoader;
+ this.workUnitExecutor = workUnitExecutor;
+ this.systemNameToComputationIdMap = new ConcurrentHashMap<>();
+ this.perComputationStateCacheViewFactory =
perComputationStateCacheViewFactory;
+ }
+
+ private static Map<String, Map<String, String>>
transformUserNameToStateFamilyByComputationId(
+ Windmill.GetConfigResponse response) {
+ // Outer keys are computation ids. Outer values are map from transform
username to state
+ // family.
+ Map<String, Map<String, String>>
transformUserNameToStateFamilyByComputationId =
+ new HashMap<>();
+ for (Windmill.GetConfigResponse.ComputationConfigMapEntry
computationConfig :
+ response.getComputationConfigMapList()) {
+ Map<String, String> transformUserNameToStateFamily =
+ transformUserNameToStateFamilyByComputationId.computeIfAbsent(
+ computationConfig.getComputationId(), k -> new HashMap<>());
+ for (Windmill.ComputationConfig.TransformUserNameToStateFamilyEntry
entry :
+
computationConfig.getComputationConfig().getTransformUserNameToStateFamilyList())
{
+ transformUserNameToStateFamily.put(entry.getTransformUserName(),
entry.getStateFamily());
+ }
+ }
+
+ return transformUserNameToStateFamilyByComputationId;
+ }
+
+ /** Deserialize {@link MapTask} and populate MultiOutputInfos in MapTask. */
+ private static Optional<MapTask> deserializeAndFixMapTask(String
serializedMapTask) {
+ try {
+ return Optional.of(
+ StreamingWorkerEnvironment.fixMapTaskMultiOutputInfoFnInstance()
+ .apply(Transport.getJsonFactory().fromString(serializedMapTask,
MapTask.class)));
+ } catch (IOException e) {
+ LOG.warn("Parsing MapTask failed: {}", serializedMapTask, e);
+ }
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<ComputationState> load(String computationId) throws
Exception {
+ return Preconditions.checkNotNull(
+ loadAll(ImmutableList.of(computationId)).getOrDefault(computationId,
Optional.empty()));
+ }
+
+ @Override
+ public Map<String, Optional<ComputationState>> loadAll(
+ Iterable<? extends String> computationIds) {
+ Optional<Windmill.GetConfigResponse> maybeResponse =
+ streamingApplianceConfigLoader.getComputationConfig(
+ Iterables.getOnlyElement(computationIds));
+ if (!maybeResponse.isPresent()) {
+ return ImmutableMap.of();
+ }
+
+ Windmill.GetConfigResponse response = maybeResponse.get();
+
+ // The max work item commit bytes should be modified to be dynamic once it
is available in
Review Comment:
odd spot for this comment, the global id has this field but not individual
computations.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StreamingApplianceComputationStateCacheLoader.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap;
+
+import com.google.api.services.dataflow.model.MapTask;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingConfigLoader;
+import
org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerEnvironment;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.gcp.util.Transport;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public final class StreamingApplianceComputationStateCacheLoader
+ extends CacheLoader<String, Optional<ComputationState>> {
+ private static final Logger LOG =
+
LoggerFactory.getLogger(StreamingApplianceComputationStateCacheLoader.class);
+
+ private final StreamingConfigLoader<Windmill.GetConfigResponse>
streamingApplianceConfigLoader;
+ private final BoundedQueueExecutor workUnitExecutor;
+ private final ConcurrentHashMap<String, String> systemNameToComputationIdMap;
+ private final Function<String, WindmillStateCache.ForComputation>
+ perComputationStateCacheViewFactory;
+
+ public StreamingApplianceComputationStateCacheLoader(
+ StreamingConfigLoader<Windmill.GetConfigResponse>
streamingApplianceConfigLoader,
+ BoundedQueueExecutor workUnitExecutor,
+ Function<String, WindmillStateCache.ForComputation>
perComputationStateCacheViewFactory) {
+ this.streamingApplianceConfigLoader = streamingApplianceConfigLoader;
+ this.workUnitExecutor = workUnitExecutor;
+ this.systemNameToComputationIdMap = new ConcurrentHashMap<>();
+ this.perComputationStateCacheViewFactory =
perComputationStateCacheViewFactory;
+ }
+
+ private static Map<String, Map<String, String>>
transformUserNameToStateFamilyByComputationId(
+ Windmill.GetConfigResponse response) {
+ // Outer keys are computation ids. Outer values are map from transform
username to state
Review Comment:
nit: The inner map maps from ...
seems like it woudl read better than "Outer values are map"
also since this is returned woudl be better to move out of method
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ExecutionState.java:
##########
@@ -51,4 +54,13 @@ public abstract static class Builder {
public abstract ExecutionState build();
}
+
+ public final void close() {
+ try {
+ context().invalidateCache();
+ workExecutor().close();
+ } catch (Exception e) {
+ LOG.warn("Failed to close map task executor: ", e);
Review Comment:
either update log to indicate it may be invalidatin ghte cache as well or
remove invalidating hte cache from the try block
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StreamingApplianceComputationStateCacheLoader.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap;
+
+import com.google.api.services.dataflow.model.MapTask;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingConfigLoader;
+import
org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerEnvironment;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.gcp.util.Transport;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public final class StreamingApplianceComputationStateCacheLoader
+ extends CacheLoader<String, Optional<ComputationState>> {
+ private static final Logger LOG =
+
LoggerFactory.getLogger(StreamingApplianceComputationStateCacheLoader.class);
+
+ private final StreamingConfigLoader<Windmill.GetConfigResponse>
streamingApplianceConfigLoader;
+ private final BoundedQueueExecutor workUnitExecutor;
+ private final ConcurrentHashMap<String, String> systemNameToComputationIdMap;
+ private final Function<String, WindmillStateCache.ForComputation>
+ perComputationStateCacheViewFactory;
+
+ public StreamingApplianceComputationStateCacheLoader(
+ StreamingConfigLoader<Windmill.GetConfigResponse>
streamingApplianceConfigLoader,
+ BoundedQueueExecutor workUnitExecutor,
+ Function<String, WindmillStateCache.ForComputation>
perComputationStateCacheViewFactory) {
+ this.streamingApplianceConfigLoader = streamingApplianceConfigLoader;
+ this.workUnitExecutor = workUnitExecutor;
+ this.systemNameToComputationIdMap = new ConcurrentHashMap<>();
+ this.perComputationStateCacheViewFactory =
perComputationStateCacheViewFactory;
+ }
+
+ private static Map<String, Map<String, String>>
transformUserNameToStateFamilyByComputationId(
+ Windmill.GetConfigResponse response) {
+ // Outer keys are computation ids. Outer values are map from transform
username to state
+ // family.
+ Map<String, Map<String, String>>
transformUserNameToStateFamilyByComputationId =
+ new HashMap<>();
+ for (Windmill.GetConfigResponse.ComputationConfigMapEntry
computationConfig :
+ response.getComputationConfigMapList()) {
+ Map<String, String> transformUserNameToStateFamily =
+ transformUserNameToStateFamilyByComputationId.computeIfAbsent(
+ computationConfig.getComputationId(), k -> new HashMap<>());
+ for (Windmill.ComputationConfig.TransformUserNameToStateFamilyEntry
entry :
+
computationConfig.getComputationConfig().getTransformUserNameToStateFamilyList())
{
+ transformUserNameToStateFamily.put(entry.getTransformUserName(),
entry.getStateFamily());
+ }
+ }
+
+ return transformUserNameToStateFamilyByComputationId;
+ }
+
+ /** Deserialize {@link MapTask} and populate MultiOutputInfos in MapTask. */
+ private static Optional<MapTask> deserializeAndFixMapTask(String
serializedMapTask) {
+ try {
+ return Optional.of(
+ StreamingWorkerEnvironment.fixMapTaskMultiOutputInfoFnInstance()
+ .apply(Transport.getJsonFactory().fromString(serializedMapTask,
MapTask.class)));
+ } catch (IOException e) {
+ LOG.warn("Parsing MapTask failed: {}", serializedMapTask, e);
+ }
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<ComputationState> load(String computationId) throws
Exception {
+ return Preconditions.checkNotNull(
+ loadAll(ImmutableList.of(computationId)).getOrDefault(computationId,
Optional.empty()));
+ }
+
+ @Override
+ public Map<String, Optional<ComputationState>> loadAll(
+ Iterable<? extends String> computationIds) {
+ Optional<Windmill.GetConfigResponse> maybeResponse =
+ streamingApplianceConfigLoader.getComputationConfig(
+ Iterables.getOnlyElement(computationIds));
+ if (!maybeResponse.isPresent()) {
+ return ImmutableMap.of();
+ }
+
+ Windmill.GetConfigResponse response = maybeResponse.get();
+
+ // The max work item commit bytes should be modified to be dynamic once it
is available in
+ // the request.
+ for (Windmill.GetConfigResponse.SystemNameToComputationIdMapEntry entry :
+ response.getSystemNameToComputationIdMapList()) {
+ systemNameToComputationIdMap.put(entry.getSystemName(),
entry.getComputationId());
Review Comment:
can you get rid of this?
It seems we just request a single item, so should we just get a single
response back? Can we then just pass the computationId to
createComputationSTate instead of trying to map back from the response?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkProcessingContext.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work;
+
+import com.google.auto.value.AutoValue;
+import com.google.auto.value.extension.memoized.Memoized;
+import java.util.Optional;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataRequest;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataResponse;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
+import org.apache.beam.runners.dataflow.worker.windmill.client.commits.Commit;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter;
+import org.apache.beam.sdk.annotations.Internal;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+/** Context to process {@link WorkItem} */
+@AutoValue
+@Internal
+public abstract class WorkProcessingContext {
+ public static WorkProcessingContext.Builder builder() {
+ return new AutoValue_WorkProcessingContext.Builder();
+ }
+
+ public static WorkProcessingContext.Builder builder(
+ String computationId,
+ BiFunction<String, KeyedGetDataRequest, Optional<KeyedGetDataResponse>>
keyedDataFetcher) {
+ return builder()
+ .setComputationId(computationId)
+ .setKeyedDataFetcher(request -> keyedDataFetcher.apply(computationId,
request));
+ }
+
+ public static WorkProcessingContext.Builder builder(
+ String computationId, GetDataStream getDataStream) {
+ return builder()
+ .setComputationId(computationId)
+ .setGetDataStream(getDataStream)
+ .setKeyedDataFetcher(
+ request ->
Optional.ofNullable(getDataStream.requestKeyedData(computationId, request)));
+ }
+
+ public abstract String computationId();
+
+ public abstract Instant inputDataWatermark();
+
+ public abstract @Nullable Instant synchronizedProcessingTime();
+
+ public abstract @Nullable Instant outputDataWatermark();
+
+ public abstract @Nullable GetDataStream getDataStream();
+
+ /** {@link WorkItem} being processed. */
+ public abstract WorkItem workItem();
+
+ /**
+ * {@link GetDataStream} that connects to the backend Windmill worker
handling the {@link
+ * WorkItem}.
+ */
+ public abstract Function<KeyedGetDataRequest,
Optional<KeyedGetDataResponse>> keyedDataFetcher();
+
+ /**
+ * {@link WorkCommitter} that commits completed work to the backend Windmill
worker handling the
+ * {@link WorkItem}.
+ */
+ public abstract Consumer<Commit> workCommitter();
+
+ public final void queueCommit(Commit commit) {
+ workCommitter().accept(commit);
+ }
+
+ @Memoized
+ public ShardedKey shardedKey() {
+ return ShardedKey.create(workItem().getKey(), workItem().getShardingKey());
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setComputationId(String value);
+
+ public abstract Builder setInputDataWatermark(Instant value);
+
+ public abstract Builder setSynchronizedProcessingTime(@Nullable Instant
value);
+
+ public abstract Builder setOutputDataWatermark(@Nullable Instant value);
+
+ public abstract Builder setWorkItem(WorkItem value);
+
+ public abstract Builder setKeyedDataFetcher(
+ Function<KeyedGetDataRequest, Optional<KeyedGetDataResponse>> value);
+
+ public abstract Builder setWorkCommitter(Consumer<Commit> value);
+
+ public abstract Builder setGetDataStream(GetDataStream value);
+
+ abstract WorkProcessingContext autoBuild();
Review Comment:
private?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCache.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import static java.util.stream.Collectors.toMap;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList.toImmutableList;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet.toImmutableSet;
+
+import java.io.PrintWriter;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
+import org.apache.beam.sdk.annotations.Internal;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Cache of {@link String} computationId to {@link ComputationState} objects.
*/
+@Internal
+@ThreadSafe
+public final class ComputationStateCache implements StatusDataProvider {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ComputationStateCache.class);
+
+ private final LoadingCache<String, Optional<ComputationState>>
computationCache;
+
+ private ComputationStateCache(LoadingCache<String,
Optional<ComputationState>> computationCache) {
+ this.computationCache = computationCache;
+ }
+
+ public static ComputationStateCache create(
+ CacheLoader<String, Optional<ComputationState>> cacheLoader) {
+ return new
ComputationStateCache(CacheBuilder.newBuilder().build(cacheLoader));
+ }
+
+ /**
+ * Returns the {@link ComputationState} associated with the given
computationId. May perform IO if
+ * a value is not present, and it is possible that after IO is performed
there is no value
+ * correlated with that computationId.
+ */
+ public Optional<ComputationState> getComputationState(String computationId) {
+ try {
+ Optional<ComputationState> computationState =
+
computationCache.getAll(ImmutableList.of(computationId)).get(computationId);
Review Comment:
use get() instead of getAll
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StreamingApplianceComputationStateCacheLoader.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap;
+
+import com.google.api.services.dataflow.model.MapTask;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingConfigLoader;
+import
org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerEnvironment;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.gcp.util.Transport;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public final class StreamingApplianceComputationStateCacheLoader
+ extends CacheLoader<String, Optional<ComputationState>> {
+ private static final Logger LOG =
+
LoggerFactory.getLogger(StreamingApplianceComputationStateCacheLoader.class);
+
+ private final StreamingConfigLoader<Windmill.GetConfigResponse>
streamingApplianceConfigLoader;
+ private final BoundedQueueExecutor workUnitExecutor;
+ private final ConcurrentHashMap<String, String> systemNameToComputationIdMap;
+ private final Function<String, WindmillStateCache.ForComputation>
+ perComputationStateCacheViewFactory;
+
+ public StreamingApplianceComputationStateCacheLoader(
+ StreamingConfigLoader<Windmill.GetConfigResponse>
streamingApplianceConfigLoader,
+ BoundedQueueExecutor workUnitExecutor,
+ Function<String, WindmillStateCache.ForComputation>
perComputationStateCacheViewFactory) {
+ this.streamingApplianceConfigLoader = streamingApplianceConfigLoader;
+ this.workUnitExecutor = workUnitExecutor;
+ this.systemNameToComputationIdMap = new ConcurrentHashMap<>();
+ this.perComputationStateCacheViewFactory =
perComputationStateCacheViewFactory;
+ }
+
+ private static Map<String, Map<String, String>>
transformUserNameToStateFamilyByComputationId(
+ Windmill.GetConfigResponse response) {
+ // Outer keys are computation ids. Outer values are map from transform
username to state
+ // family.
+ Map<String, Map<String, String>>
transformUserNameToStateFamilyByComputationId =
+ new HashMap<>();
+ for (Windmill.GetConfigResponse.ComputationConfigMapEntry
computationConfig :
+ response.getComputationConfigMapList()) {
+ Map<String, String> transformUserNameToStateFamily =
+ transformUserNameToStateFamilyByComputationId.computeIfAbsent(
+ computationConfig.getComputationId(), k -> new HashMap<>());
+ for (Windmill.ComputationConfig.TransformUserNameToStateFamilyEntry
entry :
+
computationConfig.getComputationConfig().getTransformUserNameToStateFamilyList())
{
+ transformUserNameToStateFamily.put(entry.getTransformUserName(),
entry.getStateFamily());
+ }
+ }
+
+ return transformUserNameToStateFamilyByComputationId;
+ }
+
+ /** Deserialize {@link MapTask} and populate MultiOutputInfos in MapTask. */
+ private static Optional<MapTask> deserializeAndFixMapTask(String
serializedMapTask) {
+ try {
+ return Optional.of(
+ StreamingWorkerEnvironment.fixMapTaskMultiOutputInfoFnInstance()
+ .apply(Transport.getJsonFactory().fromString(serializedMapTask,
MapTask.class)));
+ } catch (IOException e) {
+ LOG.warn("Parsing MapTask failed: {}", serializedMapTask, e);
+ }
+ return Optional.empty();
Review Comment:
would it be better to surface an exception so that the loading cache
retries? how does this compare to the existing code? Caching an optional empty
seems like it is going to make processing this computation on the worker never
succeed.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/StreamingApplianceComputationStateCacheLoader.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap;
+
+import com.google.api.services.dataflow.model.MapTask;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingConfigLoader;
+import
org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerEnvironment;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.gcp.util.Transport;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+public final class StreamingApplianceComputationStateCacheLoader
+ extends CacheLoader<String, Optional<ComputationState>> {
+ private static final Logger LOG =
+
LoggerFactory.getLogger(StreamingApplianceComputationStateCacheLoader.class);
+
+ private final StreamingConfigLoader<Windmill.GetConfigResponse>
streamingApplianceConfigLoader;
+ private final BoundedQueueExecutor workUnitExecutor;
+ private final ConcurrentHashMap<String, String> systemNameToComputationIdMap;
+ private final Function<String, WindmillStateCache.ForComputation>
+ perComputationStateCacheViewFactory;
+
+ public StreamingApplianceComputationStateCacheLoader(
+ StreamingConfigLoader<Windmill.GetConfigResponse>
streamingApplianceConfigLoader,
+ BoundedQueueExecutor workUnitExecutor,
+ Function<String, WindmillStateCache.ForComputation>
perComputationStateCacheViewFactory) {
+ this.streamingApplianceConfigLoader = streamingApplianceConfigLoader;
+ this.workUnitExecutor = workUnitExecutor;
+ this.systemNameToComputationIdMap = new ConcurrentHashMap<>();
+ this.perComputationStateCacheViewFactory =
perComputationStateCacheViewFactory;
+ }
+
+ private static Map<String, Map<String, String>>
transformUserNameToStateFamilyByComputationId(
+ Windmill.GetConfigResponse response) {
+ // Outer keys are computation ids. Outer values are map from transform
username to state
+ // family.
+ Map<String, Map<String, String>>
transformUserNameToStateFamilyByComputationId =
+ new HashMap<>();
+ for (Windmill.GetConfigResponse.ComputationConfigMapEntry
computationConfig :
+ response.getComputationConfigMapList()) {
+ Map<String, String> transformUserNameToStateFamily =
+ transformUserNameToStateFamilyByComputationId.computeIfAbsent(
+ computationConfig.getComputationId(), k -> new HashMap<>());
+ for (Windmill.ComputationConfig.TransformUserNameToStateFamilyEntry
entry :
+
computationConfig.getComputationConfig().getTransformUserNameToStateFamilyList())
{
+ transformUserNameToStateFamily.put(entry.getTransformUserName(),
entry.getStateFamily());
+ }
+ }
+
+ return transformUserNameToStateFamilyByComputationId;
+ }
+
+ /** Deserialize {@link MapTask} and populate MultiOutputInfos in MapTask. */
+ private static Optional<MapTask> deserializeAndFixMapTask(String
serializedMapTask) {
+ try {
+ return Optional.of(
+ StreamingWorkerEnvironment.fixMapTaskMultiOutputInfoFnInstance()
+ .apply(Transport.getJsonFactory().fromString(serializedMapTask,
MapTask.class)));
+ } catch (IOException e) {
+ LOG.warn("Parsing MapTask failed: {}", serializedMapTask, e);
+ }
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<ComputationState> load(String computationId) throws
Exception {
+ return Preconditions.checkNotNull(
+ loadAll(ImmutableList.of(computationId)).getOrDefault(computationId,
Optional.empty()));
+ }
+
+ @Override
+ public Map<String, Optional<ComputationState>> loadAll(
+ Iterable<? extends String> computationIds) {
+ Optional<Windmill.GetConfigResponse> maybeResponse =
+ streamingApplianceConfigLoader.getComputationConfig(
+ Iterables.getOnlyElement(computationIds));
+ if (!maybeResponse.isPresent()) {
+ return ImmutableMap.of();
+ }
+
+ Windmill.GetConfigResponse response = maybeResponse.get();
+
+ // The max work item commit bytes should be modified to be dynamic once it
is available in
+ // the request.
+ for (Windmill.GetConfigResponse.SystemNameToComputationIdMapEntry entry :
+ response.getSystemNameToComputationIdMapList()) {
+ systemNameToComputationIdMap.put(entry.getSystemName(),
entry.getComputationId());
+ }
+
+ Map<String, Map<String, String>>
transformUserNameToStateFamilyByComputationId =
+ transformUserNameToStateFamilyByComputationId(response);
+
+ return response.getCloudWorksList().stream()
+ .map(
+ serializedMapTask ->
+ createComputationState(
+ serializedMapTask,
transformUserNameToStateFamilyByComputationId))
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .collect(toImmutableMap(ComputationState::getComputationId,
Optional::of));
+ }
+
+ private Optional<ComputationState> createComputationState(
+ String serializedMapTask,
+ Map<String, Map<String, String>>
transformUserNameToStateFamilyByComputationId) {
+ return deserializeAndFixMapTask(serializedMapTask)
+ .map(
+ mapTask -> {
+ String computationId =
+ systemNameToComputationIdMap.getOrDefault(
+ mapTask.getSystemName(), mapTask.getSystemName());
+ return new ComputationState(
+ computationId,
+ mapTask,
+ workUnitExecutor,
+ transformUserNameToStateFamilyByComputationId.getOrDefault(
+ computationId, ImmutableMap.of()),
+ perComputationStateCacheViewFactory.apply(computationId));
Review Comment:
it seems like the appliance and SE differ just in how they get the map task
and username->sf map.
What if we had an interface instead of
StreamingConfigLoader<Windmill.GetConfigResponse> that can vend the MapTask and
usertransform to statefamilymap for a computation?
Then the ComputationStateCacheLoader could just be a final class taking that
loader. The StreamingEngineConfigLoader could implement that interface as well
as the other one for the dynamic config and the appliance could just implement
that.
I'm not sure the StreamingConfigLoader templated interface is buying us much
since only the SE needs the background threads.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]