scwhittle commented on code in PR #31133:
URL: https://github.com/apache/beam/pull/31133#discussion_r1590985677
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1114,9 +1092,12 @@ private void process(
streamingCounters.pendingDeltaCounters(),
computationId,
readerCache,
- !computationState.getTransformUserNameToStateFamily().isEmpty()
- ? computationState.getTransformUserNameToStateFamily()
- : stateNameMap,
+ //
!computationState.getTransformUserNameToStateFamily().isEmpty()
Review Comment:
rm comments
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCache.java:
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import static java.util.stream.Collectors.toConcurrentMap;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList.toImmutableList;
+
+import com.google.api.services.dataflow.model.MapTask;
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+import javax.annotation.concurrent.ThreadSafe;
+import
org.apache.beam.runners.dataflow.worker.apiary.FixMultiOutputInfosOnParDoInstructions;
+import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.fn.IdGenerator;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Cache of {@link String} computationId to {@link ComputationState}. */
+@Internal
+@ThreadSafe
+public final class ComputationStateCache implements StatusDataProvider {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ComputationStateCache.class);
+
+ private final LoadingCache<String, ComputationState> computationCache;
+
+ /**
+ * Fix up MapTask representation because MultiOutputInfos are missing from
system generated
+ * ParDoInstructions.
+ */
+ private final Function<MapTask, MapTask>
fixMultiOutputInfosOnParDoInstructions;
+
+ private ComputationStateCache(
+ LoadingCache<String, ComputationState> computationCache,
+ Function<MapTask, MapTask> fixMultiOutputInfosOnParDoInstructions) {
+ this.computationCache = computationCache;
+ this.fixMultiOutputInfosOnParDoInstructions =
fixMultiOutputInfosOnParDoInstructions;
+ }
+
+ public static ComputationStateCache create(
+ ComputationConfig.Fetcher computationConfigFetcher,
+ BoundedQueueExecutor workUnitExecutor,
+ Function<String, WindmillStateCache.ForComputation>
perComputationStateCacheViewFactory,
+ IdGenerator idGenerator) {
+ Function<MapTask, MapTask> fixMultiOutputInfosOnParDoInstructions =
+ new FixMultiOutputInfosOnParDoInstructions(idGenerator);
+ ConcurrentMap<String, String> stateNameMap = new ConcurrentHashMap<>();
+ return new ComputationStateCache(
+ CacheBuilder.newBuilder()
+ .build(
+ newComputationStateCacheLoader(
+ computationConfigFetcher,
+ workUnitExecutor,
+ perComputationStateCacheViewFactory,
+ fixMultiOutputInfosOnParDoInstructions,
+ stateNameMap)),
+ fixMultiOutputInfosOnParDoInstructions);
+ }
+
+ @VisibleForTesting
+ public static ComputationStateCache forTesting(
+ ComputationConfig.Fetcher computationConfigFetcher,
+ BoundedQueueExecutor workUnitExecutor,
+ Function<String, WindmillStateCache.ForComputation>
perComputationStateCacheViewFactory,
+ IdGenerator idGenerator,
+ ConcurrentMap<String, String> stateNameMap) {
+ Function<MapTask, MapTask> fixMultiOutputInfosOnParDoInstructions =
+ new FixMultiOutputInfosOnParDoInstructions(idGenerator);
+ return new ComputationStateCache(
Review Comment:
Could this instead just use create and then poke in to add initial state
name map values?
via
cache = create(...);
cache.stateNameMap.addAll(stateNameMap);
It's a fair bit of setup to duplicate and it coudl drift meaning we're not
testing the main code-path.
Or you could further reduce duplication of the parameters the
create/forTesting methods and just expose a visiblefortesting method for the
statenamemap and the current caller of this method could use that directly.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceComputationConfigFetcher.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap;
+
+import com.google.api.services.dataflow.model.MapTask;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetConfigResponse;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetConfigResponse.NameMapEntry;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.gcp.util.Transport;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.HashBasedTable;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Fetches computation config from Streaming Appliance. */
+@Internal
+@ThreadSafe
+public final class StreamingApplianceComputationConfigFetcher implements
ComputationConfig.Fetcher {
+
+ private static final Logger LOG =
+
LoggerFactory.getLogger(StreamingApplianceComputationConfigFetcher.class);
+
+ private final WindmillServerStub windmillServer;
+ private final ConcurrentHashMap<String, String> systemNameToComputationIdMap;
+
+ public StreamingApplianceComputationConfigFetcher(WindmillServerStub
windmillServer) {
+ this.windmillServer = windmillServer;
+ this.systemNameToComputationIdMap = new ConcurrentHashMap<>();
+ }
+
+ private static Table<String, String, String>
transformUserNameToStateFamilyByComputationId(
+ Windmill.GetConfigResponse response) {
+ // a row in the table is <ComputationId, TransformUserName,
StateFamilyName>
+ Table<String, String, String>
computationIdTransformUserNameStateFamilyNameTable =
+ HashBasedTable.create();
+ for (Windmill.GetConfigResponse.ComputationConfigMapEntry
computationConfig :
+ response.getComputationConfigMapList()) {
+ for (Windmill.ComputationConfig.TransformUserNameToStateFamilyEntry
entry :
+
computationConfig.getComputationConfig().getTransformUserNameToStateFamilyList())
{
+ computationIdTransformUserNameStateFamilyNameTable.put(
+ computationConfig.getComputationId(),
+ entry.getTransformUserName(),
+ entry.getStateFamily());
+ }
+ }
+
+ return computationIdTransformUserNameStateFamilyNameTable;
+ }
+
+ /** Deserialize {@link MapTask} and populate MultiOutputInfos in MapTask. */
+ private Optional<MapTask> deserializeAndFixMapTask(String serializedMapTask)
{
+ try {
+ return
Optional.of(Transport.getJsonFactory().fromString(serializedMapTask,
MapTask.class));
+ } catch (IOException e) {
+ LOG.warn("Parsing MapTask failed: {}", serializedMapTask, e);
+ }
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<ComputationConfig> getConfig(String computationId) {
+ Preconditions.checkArgument(
+ !computationId.isEmpty(),
+ "computationId is empty. Cannot fetch computation config without a
computationId.");
+
+ GetConfigResponse response =
+ windmillServer.getConfig(
Review Comment:
instead of taking in WindmillServer can you take in a functional interface
matching this method?
That will help show that the full class isn't used and could simplify
testing. It could help in the future break up windmillServer if we get rid of
the jni class keeping it all tied together ATM.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceComputationConfigFetcher.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap;
+
+import com.google.api.services.dataflow.model.MapTask;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetConfigResponse;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetConfigResponse.NameMapEntry;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.gcp.util.Transport;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.HashBasedTable;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Fetches computation config from Streaming Appliance. */
+@Internal
+@ThreadSafe
+public final class StreamingApplianceComputationConfigFetcher implements
ComputationConfig.Fetcher {
+
+ private static final Logger LOG =
+
LoggerFactory.getLogger(StreamingApplianceComputationConfigFetcher.class);
+
+ private final WindmillServerStub windmillServer;
+ private final ConcurrentHashMap<String, String> systemNameToComputationIdMap;
+
+ public StreamingApplianceComputationConfigFetcher(WindmillServerStub
windmillServer) {
+ this.windmillServer = windmillServer;
+ this.systemNameToComputationIdMap = new ConcurrentHashMap<>();
+ }
+
+ private static Table<String, String, String>
transformUserNameToStateFamilyByComputationId(
+ Windmill.GetConfigResponse response) {
+ // a row in the table is <ComputationId, TransformUserName,
StateFamilyName>
Review Comment:
move to method comment?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import static java.util.stream.StreamSupport.stream;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet.toImmutableSet;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
+import com.google.api.services.dataflow.model.MapTask;
+import com.google.api.services.dataflow.model.StreamingComputationConfig;
+import com.google.api.services.dataflow.model.StreamingConfigTask;
+import com.google.api.services.dataflow.model.WorkItem;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.worker.WorkUnitClient;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+@ThreadSafe
+public final class StreamingEngineComputationConfigFetcher implements
ComputationConfig.Fetcher {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(StreamingEngineComputationConfigFetcher.class);
+ private static final String GLOBAL_PIPELINE_CONFIG_REFRESHER =
"GlobalPipelineConfigRefresher";
+
+ private final long globalConfigRefreshPeriodMillis;
+ private final WorkUnitClient dataflowServiceClient;
+ private final ScheduledExecutorService globalConfigRefresher;
+ private final Consumer<StreamingEnginePipelineConfig> onStreamingConfig;
+ private final AtomicBoolean hasReceivedGlobalConfig;
+
+ private StreamingEngineComputationConfigFetcher(
+ boolean hasReceivedGlobalConfig,
+ long globalConfigRefreshPeriodMillis,
+ WorkUnitClient dataflowServiceClient,
+ ScheduledExecutorService globalConfigRefresher,
+ Consumer<StreamingEnginePipelineConfig> onStreamingConfig) {
+ this.globalConfigRefreshPeriodMillis = globalConfigRefreshPeriodMillis;
+ this.dataflowServiceClient = dataflowServiceClient;
+ this.globalConfigRefresher = globalConfigRefresher;
+ this.onStreamingConfig = onStreamingConfig;
+ this.hasReceivedGlobalConfig = new AtomicBoolean(hasReceivedGlobalConfig);
+ }
+
+ public static StreamingEngineComputationConfigFetcher create(
+ long globalConfigRefreshPeriodMillis,
+ WorkUnitClient dataflowServiceClient,
+ Consumer<StreamingEnginePipelineConfig> onStreamingConfig) {
+ return new StreamingEngineComputationConfigFetcher(
+ /* hasReceivedGlobalConfig= */ false,
+ globalConfigRefreshPeriodMillis,
+ dataflowServiceClient,
+ Executors.newSingleThreadScheduledExecutor(
+ new
ThreadFactoryBuilder().setNameFormat(GLOBAL_PIPELINE_CONFIG_REFRESHER).build()),
+ onStreamingConfig);
+ }
+
+ public static StreamingEngineComputationConfigFetcher forTesting(
Review Comment:
visiblefortesting
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import static java.util.stream.StreamSupport.stream;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet.toImmutableSet;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
+import com.google.api.services.dataflow.model.MapTask;
+import com.google.api.services.dataflow.model.StreamingComputationConfig;
+import com.google.api.services.dataflow.model.StreamingConfigTask;
+import com.google.api.services.dataflow.model.WorkItem;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.worker.WorkUnitClient;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+@ThreadSafe
+public final class StreamingEngineComputationConfigFetcher implements
ComputationConfig.Fetcher {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(StreamingEngineComputationConfigFetcher.class);
+ private static final String GLOBAL_PIPELINE_CONFIG_REFRESHER =
"GlobalPipelineConfigRefresher";
+
+ private final long globalConfigRefreshPeriodMillis;
+ private final WorkUnitClient dataflowServiceClient;
+ private final ScheduledExecutorService globalConfigRefresher;
+ private final Consumer<StreamingEnginePipelineConfig> onStreamingConfig;
+ private final AtomicBoolean hasReceivedGlobalConfig;
+
+ private StreamingEngineComputationConfigFetcher(
+ boolean hasReceivedGlobalConfig,
+ long globalConfigRefreshPeriodMillis,
+ WorkUnitClient dataflowServiceClient,
+ ScheduledExecutorService globalConfigRefresher,
+ Consumer<StreamingEnginePipelineConfig> onStreamingConfig) {
+ this.globalConfigRefreshPeriodMillis = globalConfigRefreshPeriodMillis;
+ this.dataflowServiceClient = dataflowServiceClient;
+ this.globalConfigRefresher = globalConfigRefresher;
+ this.onStreamingConfig = onStreamingConfig;
+ this.hasReceivedGlobalConfig = new AtomicBoolean(hasReceivedGlobalConfig);
+ }
+
+ public static StreamingEngineComputationConfigFetcher create(
+ long globalConfigRefreshPeriodMillis,
+ WorkUnitClient dataflowServiceClient,
+ Consumer<StreamingEnginePipelineConfig> onStreamingConfig) {
+ return new StreamingEngineComputationConfigFetcher(
+ /* hasReceivedGlobalConfig= */ false,
+ globalConfigRefreshPeriodMillis,
+ dataflowServiceClient,
+ Executors.newSingleThreadScheduledExecutor(
+ new
ThreadFactoryBuilder().setNameFormat(GLOBAL_PIPELINE_CONFIG_REFRESHER).build()),
+ onStreamingConfig);
+ }
+
+ public static StreamingEngineComputationConfigFetcher forTesting(
+ boolean hasReceivedGlobalConfig,
+ long globalConfigRefreshPeriodMillis,
+ WorkUnitClient dataflowServiceClient,
+ Function<String, ScheduledExecutorService> executorSupplier,
+ Consumer<StreamingEnginePipelineConfig> onStreamingConfig) {
+ return new StreamingEngineComputationConfigFetcher(
+ hasReceivedGlobalConfig,
+ globalConfigRefreshPeriodMillis,
+ dataflowServiceClient,
+ executorSupplier.apply(GLOBAL_PIPELINE_CONFIG_REFRESHER),
+ onStreamingConfig);
+ }
+
+ private static BackOff defaultConfigBackoff() {
+ return FluentBackoff.DEFAULT
+ .withInitialBackoff(Duration.millis(100))
+ .withMaxBackoff(Duration.standardMinutes(1))
+ .withMaxCumulativeBackoff(Duration.standardMinutes(5))
+ .backoff();
+ }
+
+ private MapTask createMapTask(StreamingComputationConfig computationConfig) {
+ return new MapTask()
+ .setSystemName(computationConfig.getSystemName())
+ .setStageName(computationConfig.getStageName())
+ .setInstructions(computationConfig.getInstructions());
+ }
+
+ @Override
+ public void start() {
+ fetchInitialPipelineGlobalConfig();
+ schedulePeriodicGlobalConfigRequests();
+ }
+
+ @Override
+ public Optional<ComputationConfig> getConfig(String computationId) {
+ Preconditions.checkArgument(
+ !computationId.isEmpty(),
+ "computationId is empty. Cannot fetch computation config without a
computationId.");
+ return getComputationConfigInternal(computationId)
+ .flatMap(
+ config -> {
+ Map<String, String> stateNameMap =
config.userStepToStateFamilyNameMap();
+ return config
+ .computationConfig()
+ .map(
+ computationConfig ->
+ ComputationConfig.create(
+ createMapTask(computationConfig),
+
computationConfig.getTransformUserNameToStateFamily(),
+ stateNameMap));
+ });
+ }
+
+ @Override
+ public void stop() {
+ // We have already shutdown or start has not been called.
+ if (globalConfigRefresher.isShutdown() || !hasReceivedGlobalConfig.get()) {
+ return;
+ }
+
+ globalConfigRefresher.shutdown();
+ boolean isTerminated = false;
+ try {
+ isTerminated = globalConfigRefresher.awaitTermination(10,
TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOG.warn("Error occurred shutting down: {}", globalConfigRefresher);
+ }
+ if (!isTerminated) {
+ globalConfigRefresher.shutdownNow();
+ }
+ }
+
+ /**
+ * Initially b Schedules a background thread that periodically sends
getConfig requests to
+ * Dataflow Service to obtain the windmill service endpoint.
+ */
+ @SuppressWarnings("FutureReturnValueIgnored")
+ private void schedulePeriodicGlobalConfigRequests() {
+ globalConfigRefresher.scheduleWithFixedDelay(
+ () -> getGlobalConfig().ifPresent(onStreamingConfig),
+ 0,
+ globalConfigRefreshPeriodMillis,
+ TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Blocks until we have received the initial global config from Dataflow or
an exception is
+ * thrown.
+ */
+ private synchronized void fetchInitialPipelineGlobalConfig() {
+ if (!hasReceivedGlobalConfig.get()) {
+ // Get the initial global configuration. This will initialize the
windmillServer stub.
+ while (true) {
+ LOG.info("Sending request to get initial global configuration for this
worker.");
+ Optional<StreamingEnginePipelineConfig> globalConfig =
getGlobalConfig();
+ if (globalConfig.isPresent()) {
+ onStreamingConfig.accept(globalConfig.get());
+ hasReceivedGlobalConfig.compareAndSet(false, true);
+ break;
+ }
+ LOG.info("Haven't received initial global configuration, will retry in
5 seconds");
+ sleepUninterruptibly(5, TimeUnit.SECONDS);
+ }
+ }
+ LOG.info("Initial global configuration received, harness is now ready");
+ }
+
+ private Optional<StreamingEnginePipelineConfig> getComputationConfigInternal(
+ String computationId) {
+ Optional<StreamingEnginePipelineConfig> streamingConfig =
getConfigInternal(computationId);
+ streamingConfig.ifPresent(onStreamingConfig);
+ return streamingConfig;
+ }
+
+ private Optional<StreamingEnginePipelineConfig> getGlobalConfig() {
+ return getConfigInternal(null);
+ }
+
+ private Optional<StreamingEnginePipelineConfig> getConfigInternal(@Nullable
String computation) {
Review Comment:
ditto for the methods in this class, prefer fetch to get
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import static java.util.stream.StreamSupport.stream;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet.toImmutableSet;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
+import com.google.api.services.dataflow.model.MapTask;
+import com.google.api.services.dataflow.model.StreamingComputationConfig;
+import com.google.api.services.dataflow.model.StreamingConfigTask;
+import com.google.api.services.dataflow.model.WorkItem;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.worker.WorkUnitClient;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+@ThreadSafe
+public final class StreamingEngineComputationConfigFetcher implements
ComputationConfig.Fetcher {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(StreamingEngineComputationConfigFetcher.class);
+ private static final String GLOBAL_PIPELINE_CONFIG_REFRESHER =
"GlobalPipelineConfigRefresher";
Review Comment:
add thread_name to variable? maybe shorten then as well
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import static java.util.stream.StreamSupport.stream;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet.toImmutableSet;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
+import com.google.api.services.dataflow.model.MapTask;
+import com.google.api.services.dataflow.model.StreamingComputationConfig;
+import com.google.api.services.dataflow.model.StreamingConfigTask;
+import com.google.api.services.dataflow.model.WorkItem;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.worker.WorkUnitClient;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+@ThreadSafe
+public final class StreamingEngineComputationConfigFetcher implements
ComputationConfig.Fetcher {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(StreamingEngineComputationConfigFetcher.class);
+ private static final String GLOBAL_PIPELINE_CONFIG_REFRESHER =
"GlobalPipelineConfigRefresher";
+
+ private final long globalConfigRefreshPeriodMillis;
+ private final WorkUnitClient dataflowServiceClient;
+ private final ScheduledExecutorService globalConfigRefresher;
+ private final Consumer<StreamingEnginePipelineConfig> onStreamingConfig;
+ private final AtomicBoolean hasReceivedGlobalConfig;
+
+ private StreamingEngineComputationConfigFetcher(
+ boolean hasReceivedGlobalConfig,
+ long globalConfigRefreshPeriodMillis,
+ WorkUnitClient dataflowServiceClient,
+ ScheduledExecutorService globalConfigRefresher,
+ Consumer<StreamingEnginePipelineConfig> onStreamingConfig) {
+ this.globalConfigRefreshPeriodMillis = globalConfigRefreshPeriodMillis;
+ this.dataflowServiceClient = dataflowServiceClient;
+ this.globalConfigRefresher = globalConfigRefresher;
+ this.onStreamingConfig = onStreamingConfig;
+ this.hasReceivedGlobalConfig = new AtomicBoolean(hasReceivedGlobalConfig);
+ }
+
+ public static StreamingEngineComputationConfigFetcher create(
+ long globalConfigRefreshPeriodMillis,
+ WorkUnitClient dataflowServiceClient,
+ Consumer<StreamingEnginePipelineConfig> onStreamingConfig) {
+ return new StreamingEngineComputationConfigFetcher(
+ /* hasReceivedGlobalConfig= */ false,
+ globalConfigRefreshPeriodMillis,
+ dataflowServiceClient,
+ Executors.newSingleThreadScheduledExecutor(
+ new
ThreadFactoryBuilder().setNameFormat(GLOBAL_PIPELINE_CONFIG_REFRESHER).build()),
+ onStreamingConfig);
+ }
+
+ public static StreamingEngineComputationConfigFetcher forTesting(
+ boolean hasReceivedGlobalConfig,
+ long globalConfigRefreshPeriodMillis,
+ WorkUnitClient dataflowServiceClient,
+ Function<String, ScheduledExecutorService> executorSupplier,
+ Consumer<StreamingEnginePipelineConfig> onStreamingConfig) {
+ return new StreamingEngineComputationConfigFetcher(
+ hasReceivedGlobalConfig,
+ globalConfigRefreshPeriodMillis,
+ dataflowServiceClient,
+ executorSupplier.apply(GLOBAL_PIPELINE_CONFIG_REFRESHER),
+ onStreamingConfig);
+ }
+
+ private static BackOff defaultConfigBackoff() {
+ return FluentBackoff.DEFAULT
+ .withInitialBackoff(Duration.millis(100))
+ .withMaxBackoff(Duration.standardMinutes(1))
+ .withMaxCumulativeBackoff(Duration.standardMinutes(5))
+ .backoff();
+ }
+
+ private MapTask createMapTask(StreamingComputationConfig computationConfig) {
+ return new MapTask()
+ .setSystemName(computationConfig.getSystemName())
+ .setStageName(computationConfig.getStageName())
+ .setInstructions(computationConfig.getInstructions());
+ }
+
+ @Override
+ public void start() {
+ fetchInitialPipelineGlobalConfig();
+ schedulePeriodicGlobalConfigRequests();
+ }
+
+ @Override
+ public Optional<ComputationConfig> getConfig(String computationId) {
+ Preconditions.checkArgument(
+ !computationId.isEmpty(),
+ "computationId is empty. Cannot fetch computation config without a
computationId.");
+ return getComputationConfigInternal(computationId)
+ .flatMap(
+ config -> {
+ Map<String, String> stateNameMap =
config.userStepToStateFamilyNameMap();
+ return config
+ .computationConfig()
+ .map(
+ computationConfig ->
+ ComputationConfig.create(
+ createMapTask(computationConfig),
+
computationConfig.getTransformUserNameToStateFamily(),
+ stateNameMap));
+ });
+ }
+
+ @Override
+ public void stop() {
+ // We have already shutdown or start has not been called.
+ if (globalConfigRefresher.isShutdown() || !hasReceivedGlobalConfig.get()) {
+ return;
+ }
+
+ globalConfigRefresher.shutdown();
+ boolean isTerminated = false;
+ try {
+ isTerminated = globalConfigRefresher.awaitTermination(10,
TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOG.warn("Error occurred shutting down: {}", globalConfigRefresher);
+ }
+ if (!isTerminated) {
+ globalConfigRefresher.shutdownNow();
+ }
+ }
+
+ /**
+ * Initially b Schedules a background thread that periodically sends
getConfig requests to
+ * Dataflow Service to obtain the windmill service endpoint.
+ */
+ @SuppressWarnings("FutureReturnValueIgnored")
+ private void schedulePeriodicGlobalConfigRequests() {
+ globalConfigRefresher.scheduleWithFixedDelay(
+ () -> getGlobalConfig().ifPresent(onStreamingConfig),
+ 0,
+ globalConfigRefreshPeriodMillis,
+ TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Blocks until we have received the initial global config from Dataflow or
an exception is
+ * thrown.
+ */
+ private synchronized void fetchInitialPipelineGlobalConfig() {
+ if (!hasReceivedGlobalConfig.get()) {
+ // Get the initial global configuration. This will initialize the
windmillServer stub.
+ while (true) {
Review Comment:
nit: seemsl ike you could move !hasReceivedGlobalConfig to the while
condition instead
removes some nesting and if we did have multiple callers calling
fetchInitialPipelineGlobalConfig it could speed things up.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/ComputationConfig.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import com.google.api.services.dataflow.model.MapTask;
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Internal;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+@Internal
+@AutoValue
+public abstract class ComputationConfig {
+
+ public static ComputationConfig create(
+ MapTask mapTask,
+ @Nullable Map<String, String> userTransformToStateFamilyName,
+ Map<String, String> stateNameMap) {
+ return new AutoValue_ComputationConfig(
+ mapTask,
+ Optional.ofNullable(userTransformToStateFamilyName)
+ .map(ImmutableMap::copyOf)
+ .orElseGet(ImmutableMap::of),
+ ImmutableMap.copyOf(stateNameMap));
+ }
+
+ public abstract MapTask mapTask();
+
+ public abstract ImmutableMap<String, String>
userTransformToStateFamilyName();
+
+ public abstract ImmutableMap<String, String> stateNameMap();
+
+ /** Interface to fetch configurations for a specific computation. */
+ @FunctionalInterface
+ public interface Fetcher {
+ default void start() {}
+
+ default void stop() {}
+
+ Optional<ComputationConfig> getConfig(String computationId);
Review Comment:
let's call this fetchConfig, it makes it clearer that it is likely an rpc
(and matches class name)
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfig.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import com.google.api.services.dataflow.model.StreamingComputationConfig;
+import com.google.auto.value.AutoValue;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Internal;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
+
+@AutoValue
+@Internal
+public abstract class StreamingEnginePipelineConfig {
+
+ private static final long DEFAULT_MAX_WORK_ITEM_COMMIT_BYTES = 180 << 20;
+
+ public static StreamingEnginePipelineConfig.Builder builder() {
+ return new AutoValue_StreamingEnginePipelineConfig.Builder()
+ .setMaxWorkItemCommitBytes(DEFAULT_MAX_WORK_ITEM_COMMIT_BYTES)
+ .setComputationConfig(null)
+ .setUserStepToStateFamilyNameMap(new HashMap<>())
+ .setWindmillServiceEndpoints(ImmutableSet.of());
+ }
+
+ public abstract long maxWorkItemCommitBytes();
+
+ public abstract Map<String, String> userStepToStateFamilyNameMap();
+
+ public abstract Optional<StreamingComputationConfig> computationConfig();
Review Comment:
see other comment, I think this class should just be for the global config,
not related to a computation.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import static java.util.stream.StreamSupport.stream;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet.toImmutableSet;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
+import com.google.api.services.dataflow.model.MapTask;
+import com.google.api.services.dataflow.model.StreamingComputationConfig;
+import com.google.api.services.dataflow.model.StreamingConfigTask;
+import com.google.api.services.dataflow.model.WorkItem;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.worker.WorkUnitClient;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+@ThreadSafe
+public final class StreamingEngineComputationConfigFetcher implements
ComputationConfig.Fetcher {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(StreamingEngineComputationConfigFetcher.class);
+ private static final String GLOBAL_PIPELINE_CONFIG_REFRESHER =
"GlobalPipelineConfigRefresher";
+
+ private final long globalConfigRefreshPeriodMillis;
+ private final WorkUnitClient dataflowServiceClient;
+ private final ScheduledExecutorService globalConfigRefresher;
+ private final Consumer<StreamingEnginePipelineConfig> onStreamingConfig;
+ private final AtomicBoolean hasReceivedGlobalConfig;
+
+ private StreamingEngineComputationConfigFetcher(
+ boolean hasReceivedGlobalConfig,
+ long globalConfigRefreshPeriodMillis,
+ WorkUnitClient dataflowServiceClient,
+ ScheduledExecutorService globalConfigRefresher,
+ Consumer<StreamingEnginePipelineConfig> onStreamingConfig) {
+ this.globalConfigRefreshPeriodMillis = globalConfigRefreshPeriodMillis;
+ this.dataflowServiceClient = dataflowServiceClient;
+ this.globalConfigRefresher = globalConfigRefresher;
+ this.onStreamingConfig = onStreamingConfig;
+ this.hasReceivedGlobalConfig = new AtomicBoolean(hasReceivedGlobalConfig);
+ }
+
+ public static StreamingEngineComputationConfigFetcher create(
+ long globalConfigRefreshPeriodMillis,
+ WorkUnitClient dataflowServiceClient,
+ Consumer<StreamingEnginePipelineConfig> onStreamingConfig) {
+ return new StreamingEngineComputationConfigFetcher(
+ /* hasReceivedGlobalConfig= */ false,
+ globalConfigRefreshPeriodMillis,
+ dataflowServiceClient,
+ Executors.newSingleThreadScheduledExecutor(
+ new
ThreadFactoryBuilder().setNameFormat(GLOBAL_PIPELINE_CONFIG_REFRESHER).build()),
+ onStreamingConfig);
+ }
+
+ public static StreamingEngineComputationConfigFetcher forTesting(
+ boolean hasReceivedGlobalConfig,
+ long globalConfigRefreshPeriodMillis,
+ WorkUnitClient dataflowServiceClient,
+ Function<String, ScheduledExecutorService> executorSupplier,
+ Consumer<StreamingEnginePipelineConfig> onStreamingConfig) {
+ return new StreamingEngineComputationConfigFetcher(
+ hasReceivedGlobalConfig,
+ globalConfigRefreshPeriodMillis,
+ dataflowServiceClient,
+ executorSupplier.apply(GLOBAL_PIPELINE_CONFIG_REFRESHER),
+ onStreamingConfig);
+ }
+
+ private static BackOff defaultConfigBackoff() {
+ return FluentBackoff.DEFAULT
+ .withInitialBackoff(Duration.millis(100))
+ .withMaxBackoff(Duration.standardMinutes(1))
+ .withMaxCumulativeBackoff(Duration.standardMinutes(5))
+ .backoff();
+ }
+
+ private MapTask createMapTask(StreamingComputationConfig computationConfig) {
+ return new MapTask()
+ .setSystemName(computationConfig.getSystemName())
+ .setStageName(computationConfig.getStageName())
+ .setInstructions(computationConfig.getInstructions());
+ }
+
+ @Override
+ public void start() {
+ fetchInitialPipelineGlobalConfig();
+ schedulePeriodicGlobalConfigRequests();
+ }
+
+ @Override
+ public Optional<ComputationConfig> getConfig(String computationId) {
+ Preconditions.checkArgument(
+ !computationId.isEmpty(),
+ "computationId is empty. Cannot fetch computation config without a
computationId.");
+ return getComputationConfigInternal(computationId)
+ .flatMap(
+ config -> {
+ Map<String, String> stateNameMap =
config.userStepToStateFamilyNameMap();
+ return config
+ .computationConfig()
+ .map(
+ computationConfig ->
+ ComputationConfig.create(
+ createMapTask(computationConfig),
+
computationConfig.getTransformUserNameToStateFamily(),
+ stateNameMap));
+ });
+ }
+
+ @Override
+ public void stop() {
+ // We have already shutdown or start has not been called.
+ if (globalConfigRefresher.isShutdown() || !hasReceivedGlobalConfig.get()) {
+ return;
+ }
+
+ globalConfigRefresher.shutdown();
+ boolean isTerminated = false;
+ try {
+ isTerminated = globalConfigRefresher.awaitTermination(10,
TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOG.warn("Error occurred shutting down: {}", globalConfigRefresher);
+ }
+ if (!isTerminated) {
+ globalConfigRefresher.shutdownNow();
+ }
+ }
+
+ /**
+ * Initially b Schedules a background thread that periodically sends
getConfig requests to
+ * Dataflow Service to obtain the windmill service endpoint.
+ */
+ @SuppressWarnings("FutureReturnValueIgnored")
+ private void schedulePeriodicGlobalConfigRequests() {
+ globalConfigRefresher.scheduleWithFixedDelay(
+ () -> getGlobalConfig().ifPresent(onStreamingConfig),
+ 0,
+ globalConfigRefreshPeriodMillis,
+ TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Blocks until we have received the initial global config from Dataflow or
an exception is
+ * thrown.
+ */
+ private synchronized void fetchInitialPipelineGlobalConfig() {
+ if (!hasReceivedGlobalConfig.get()) {
+ // Get the initial global configuration. This will initialize the
windmillServer stub.
+ while (true) {
+ LOG.info("Sending request to get initial global configuration for this
worker.");
+ Optional<StreamingEnginePipelineConfig> globalConfig =
getGlobalConfig();
+ if (globalConfig.isPresent()) {
+ onStreamingConfig.accept(globalConfig.get());
+ hasReceivedGlobalConfig.compareAndSet(false, true);
+ break;
+ }
+ LOG.info("Haven't received initial global configuration, will retry in
5 seconds");
+ sleepUninterruptibly(5, TimeUnit.SECONDS);
+ }
+ }
+ LOG.info("Initial global configuration received, harness is now ready");
+ }
+
+ private Optional<StreamingEnginePipelineConfig> getComputationConfigInternal(
+ String computationId) {
+ Optional<StreamingEnginePipelineConfig> streamingConfig =
getConfigInternal(computationId);
+ streamingConfig.ifPresent(onStreamingConfig);
Review Comment:
can we remove this listening from the computation config fetching? It seems
like it should just be the global config from the background thread that runs
onStreamingConfig.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import static java.util.stream.StreamSupport.stream;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet.toImmutableSet;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
+import com.google.api.services.dataflow.model.MapTask;
+import com.google.api.services.dataflow.model.StreamingComputationConfig;
+import com.google.api.services.dataflow.model.StreamingConfigTask;
+import com.google.api.services.dataflow.model.WorkItem;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.worker.WorkUnitClient;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+@ThreadSafe
+public final class StreamingEngineComputationConfigFetcher implements
ComputationConfig.Fetcher {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(StreamingEngineComputationConfigFetcher.class);
+ private static final String GLOBAL_PIPELINE_CONFIG_REFRESHER =
"GlobalPipelineConfigRefresher";
+
+ private final long globalConfigRefreshPeriodMillis;
+ private final WorkUnitClient dataflowServiceClient;
+ private final ScheduledExecutorService globalConfigRefresher;
+ private final Consumer<StreamingEnginePipelineConfig> onStreamingConfig;
+ private final AtomicBoolean hasReceivedGlobalConfig;
+
+ private StreamingEngineComputationConfigFetcher(
+ boolean hasReceivedGlobalConfig,
+ long globalConfigRefreshPeriodMillis,
+ WorkUnitClient dataflowServiceClient,
+ ScheduledExecutorService globalConfigRefresher,
+ Consumer<StreamingEnginePipelineConfig> onStreamingConfig) {
+ this.globalConfigRefreshPeriodMillis = globalConfigRefreshPeriodMillis;
+ this.dataflowServiceClient = dataflowServiceClient;
+ this.globalConfigRefresher = globalConfigRefresher;
+ this.onStreamingConfig = onStreamingConfig;
+ this.hasReceivedGlobalConfig = new AtomicBoolean(hasReceivedGlobalConfig);
+ }
+
+ public static StreamingEngineComputationConfigFetcher create(
+ long globalConfigRefreshPeriodMillis,
+ WorkUnitClient dataflowServiceClient,
+ Consumer<StreamingEnginePipelineConfig> onStreamingConfig) {
+ return new StreamingEngineComputationConfigFetcher(
+ /* hasReceivedGlobalConfig= */ false,
+ globalConfigRefreshPeriodMillis,
+ dataflowServiceClient,
+ Executors.newSingleThreadScheduledExecutor(
+ new
ThreadFactoryBuilder().setNameFormat(GLOBAL_PIPELINE_CONFIG_REFRESHER).build()),
+ onStreamingConfig);
+ }
+
+ public static StreamingEngineComputationConfigFetcher forTesting(
+ boolean hasReceivedGlobalConfig,
+ long globalConfigRefreshPeriodMillis,
+ WorkUnitClient dataflowServiceClient,
+ Function<String, ScheduledExecutorService> executorSupplier,
+ Consumer<StreamingEnginePipelineConfig> onStreamingConfig) {
+ return new StreamingEngineComputationConfigFetcher(
+ hasReceivedGlobalConfig,
+ globalConfigRefreshPeriodMillis,
+ dataflowServiceClient,
+ executorSupplier.apply(GLOBAL_PIPELINE_CONFIG_REFRESHER),
+ onStreamingConfig);
+ }
+
+ private static BackOff defaultConfigBackoff() {
+ return FluentBackoff.DEFAULT
+ .withInitialBackoff(Duration.millis(100))
+ .withMaxBackoff(Duration.standardMinutes(1))
+ .withMaxCumulativeBackoff(Duration.standardMinutes(5))
+ .backoff();
+ }
+
+ private MapTask createMapTask(StreamingComputationConfig computationConfig) {
+ return new MapTask()
+ .setSystemName(computationConfig.getSystemName())
+ .setStageName(computationConfig.getStageName())
+ .setInstructions(computationConfig.getInstructions());
+ }
+
+ @Override
+ public void start() {
+ fetchInitialPipelineGlobalConfig();
+ schedulePeriodicGlobalConfigRequests();
+ }
+
+ @Override
+ public Optional<ComputationConfig> getConfig(String computationId) {
+ Preconditions.checkArgument(
+ !computationId.isEmpty(),
+ "computationId is empty. Cannot fetch computation config without a
computationId.");
+ return getComputationConfigInternal(computationId)
+ .flatMap(
+ config -> {
+ Map<String, String> stateNameMap =
config.userStepToStateFamilyNameMap();
+ return config
+ .computationConfig()
+ .map(
+ computationConfig ->
+ ComputationConfig.create(
+ createMapTask(computationConfig),
+
computationConfig.getTransformUserNameToStateFamily(),
+ stateNameMap));
+ });
+ }
+
+ @Override
+ public void stop() {
+ // We have already shutdown or start has not been called.
+ if (globalConfigRefresher.isShutdown() || !hasReceivedGlobalConfig.get()) {
+ return;
+ }
+
+ globalConfigRefresher.shutdown();
+ boolean isTerminated = false;
+ try {
+ isTerminated = globalConfigRefresher.awaitTermination(10,
TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOG.warn("Error occurred shutting down: {}", globalConfigRefresher);
+ }
+ if (!isTerminated) {
+ globalConfigRefresher.shutdownNow();
+ }
+ }
+
+ /**
+ * Initially b Schedules a background thread that periodically sends
getConfig requests to
+ * Dataflow Service to obtain the windmill service endpoint.
+ */
+ @SuppressWarnings("FutureReturnValueIgnored")
+ private void schedulePeriodicGlobalConfigRequests() {
+ globalConfigRefresher.scheduleWithFixedDelay(
+ () -> getGlobalConfig().ifPresent(onStreamingConfig),
+ 0,
+ globalConfigRefreshPeriodMillis,
+ TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Blocks until we have received the initial global config from Dataflow or
an exception is
+ * thrown.
+ */
+ private synchronized void fetchInitialPipelineGlobalConfig() {
+ if (!hasReceivedGlobalConfig.get()) {
+ // Get the initial global configuration. This will initialize the
windmillServer stub.
+ while (true) {
+ LOG.info("Sending request to get initial global configuration for this
worker.");
+ Optional<StreamingEnginePipelineConfig> globalConfig =
getGlobalConfig();
+ if (globalConfig.isPresent()) {
+ onStreamingConfig.accept(globalConfig.get());
+ hasReceivedGlobalConfig.compareAndSet(false, true);
+ break;
+ }
+ LOG.info("Haven't received initial global configuration, will retry in
5 seconds");
+ sleepUninterruptibly(5, TimeUnit.SECONDS);
+ }
+ }
+ LOG.info("Initial global configuration received, harness is now ready");
+ }
+
+ private Optional<StreamingEnginePipelineConfig> getComputationConfigInternal(
Review Comment:
I think this should just return the computationconfig not full
StreamingEnginePipelineConfig. I think that StreamingEnginePipelineConfig can
have the computation config removed from it, as we don't care about listening
to computations as they are fetched.
That will then make it clearer that we only want the listener to trigger on
the periodic background fetching, not the computation fetching driven by the
cache.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import static java.util.stream.StreamSupport.stream;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet.toImmutableSet;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
+import com.google.api.services.dataflow.model.MapTask;
+import com.google.api.services.dataflow.model.StreamingComputationConfig;
+import com.google.api.services.dataflow.model.StreamingConfigTask;
+import com.google.api.services.dataflow.model.WorkItem;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.worker.WorkUnitClient;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+@ThreadSafe
+public final class StreamingEngineComputationConfigFetcher implements
ComputationConfig.Fetcher {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(StreamingEngineComputationConfigFetcher.class);
+ private static final String GLOBAL_PIPELINE_CONFIG_REFRESHER =
"GlobalPipelineConfigRefresher";
+
+ private final long globalConfigRefreshPeriodMillis;
+ private final WorkUnitClient dataflowServiceClient;
+ private final ScheduledExecutorService globalConfigRefresher;
+ private final Consumer<StreamingEnginePipelineConfig> onStreamingConfig;
+ private final AtomicBoolean hasReceivedGlobalConfig;
+
+ private StreamingEngineComputationConfigFetcher(
+ boolean hasReceivedGlobalConfig,
+ long globalConfigRefreshPeriodMillis,
+ WorkUnitClient dataflowServiceClient,
+ ScheduledExecutorService globalConfigRefresher,
+ Consumer<StreamingEnginePipelineConfig> onStreamingConfig) {
+ this.globalConfigRefreshPeriodMillis = globalConfigRefreshPeriodMillis;
+ this.dataflowServiceClient = dataflowServiceClient;
+ this.globalConfigRefresher = globalConfigRefresher;
+ this.onStreamingConfig = onStreamingConfig;
+ this.hasReceivedGlobalConfig = new AtomicBoolean(hasReceivedGlobalConfig);
+ }
+
+ public static StreamingEngineComputationConfigFetcher create(
+ long globalConfigRefreshPeriodMillis,
+ WorkUnitClient dataflowServiceClient,
+ Consumer<StreamingEnginePipelineConfig> onStreamingConfig) {
+ return new StreamingEngineComputationConfigFetcher(
+ /* hasReceivedGlobalConfig= */ false,
+ globalConfigRefreshPeriodMillis,
+ dataflowServiceClient,
+ Executors.newSingleThreadScheduledExecutor(
+ new
ThreadFactoryBuilder().setNameFormat(GLOBAL_PIPELINE_CONFIG_REFRESHER).build()),
+ onStreamingConfig);
+ }
+
+ public static StreamingEngineComputationConfigFetcher forTesting(
+ boolean hasReceivedGlobalConfig,
+ long globalConfigRefreshPeriodMillis,
+ WorkUnitClient dataflowServiceClient,
+ Function<String, ScheduledExecutorService> executorSupplier,
+ Consumer<StreamingEnginePipelineConfig> onStreamingConfig) {
+ return new StreamingEngineComputationConfigFetcher(
+ hasReceivedGlobalConfig,
+ globalConfigRefreshPeriodMillis,
+ dataflowServiceClient,
+ executorSupplier.apply(GLOBAL_PIPELINE_CONFIG_REFRESHER),
+ onStreamingConfig);
+ }
+
+ private static BackOff defaultConfigBackoff() {
+ return FluentBackoff.DEFAULT
+ .withInitialBackoff(Duration.millis(100))
+ .withMaxBackoff(Duration.standardMinutes(1))
+ .withMaxCumulativeBackoff(Duration.standardMinutes(5))
+ .backoff();
+ }
+
+ private MapTask createMapTask(StreamingComputationConfig computationConfig) {
+ return new MapTask()
+ .setSystemName(computationConfig.getSystemName())
+ .setStageName(computationConfig.getStageName())
+ .setInstructions(computationConfig.getInstructions());
+ }
+
+ @Override
+ public void start() {
+ fetchInitialPipelineGlobalConfig();
+ schedulePeriodicGlobalConfigRequests();
+ }
+
+ @Override
+ public Optional<ComputationConfig> getConfig(String computationId) {
+ Preconditions.checkArgument(
+ !computationId.isEmpty(),
+ "computationId is empty. Cannot fetch computation config without a
computationId.");
+ return getComputationConfigInternal(computationId)
+ .flatMap(
+ config -> {
+ Map<String, String> stateNameMap =
config.userStepToStateFamilyNameMap();
+ return config
+ .computationConfig()
+ .map(
+ computationConfig ->
+ ComputationConfig.create(
+ createMapTask(computationConfig),
+
computationConfig.getTransformUserNameToStateFamily(),
+ stateNameMap));
+ });
+ }
+
+ @Override
+ public void stop() {
+ // We have already shutdown or start has not been called.
+ if (globalConfigRefresher.isShutdown() || !hasReceivedGlobalConfig.get()) {
+ return;
+ }
+
+ globalConfigRefresher.shutdown();
+ boolean isTerminated = false;
+ try {
+ isTerminated = globalConfigRefresher.awaitTermination(10,
TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOG.warn("Error occurred shutting down: {}", globalConfigRefresher);
+ }
+ if (!isTerminated) {
+ globalConfigRefresher.shutdownNow();
+ }
+ }
+
+ /**
+ * Initially b Schedules a background thread that periodically sends
getConfig requests to
+ * Dataflow Service to obtain the windmill service endpoint.
+ */
+ @SuppressWarnings("FutureReturnValueIgnored")
+ private void schedulePeriodicGlobalConfigRequests() {
+ globalConfigRefresher.scheduleWithFixedDelay(
+ () -> getGlobalConfig().ifPresent(onStreamingConfig),
+ 0,
+ globalConfigRefreshPeriodMillis,
+ TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Blocks until we have received the initial global config from Dataflow or
an exception is
+ * thrown.
+ */
+ private synchronized void fetchInitialPipelineGlobalConfig() {
+ if (!hasReceivedGlobalConfig.get()) {
+ // Get the initial global configuration. This will initialize the
windmillServer stub.
+ while (true) {
+ LOG.info("Sending request to get initial global configuration for this
worker.");
+ Optional<StreamingEnginePipelineConfig> globalConfig =
getGlobalConfig();
+ if (globalConfig.isPresent()) {
+ onStreamingConfig.accept(globalConfig.get());
+ hasReceivedGlobalConfig.compareAndSet(false, true);
+ break;
+ }
+ LOG.info("Haven't received initial global configuration, will retry in
5 seconds");
+ sleepUninterruptibly(5, TimeUnit.SECONDS);
+ }
+ }
+ LOG.info("Initial global configuration received, harness is now ready");
+ }
+
+ private Optional<StreamingEnginePipelineConfig> getComputationConfigInternal(
+ String computationId) {
+ Optional<StreamingEnginePipelineConfig> streamingConfig =
getConfigInternal(computationId);
+ streamingConfig.ifPresent(onStreamingConfig);
+ return streamingConfig;
+ }
+
+ private Optional<StreamingEnginePipelineConfig> getGlobalConfig() {
+ return getConfigInternal(null);
+ }
+
+ private Optional<StreamingEnginePipelineConfig> getConfigInternal(@Nullable
String computation) {
Review Comment:
probably need to change this to some template if we're returnign different
types for global or per-computation config
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -718,100 +834,27 @@ int numCommitThreads() {
public void start() {
running.set(true);
- if (windmillServiceEnabled) {
- // Schedule the background getConfig thread. Blocks until windmillServer
stub is ready.
- schedulePeriodicGlobalConfigRequests();
- }
+ configFetcher.start();
memoryMonitorThread.start();
dispatchThread.start();
sampler.start();
- if (options.getPeriodicStatusPageOutputDirectory() != null) {
- ScheduledExecutorService statusPageTimer =
executorSupplier.apply("DumpStatusPages");
- statusPageTimer.scheduleWithFixedDelay(
- () -> {
- Collection<Capturable> pages = statusPages.getDebugCapturePages();
- if (pages.isEmpty()) {
- LOG.warn("No captured status pages.");
- }
- long timestamp = clock.get().getMillis();
- for (Capturable page : pages) {
- PrintWriter writer = null;
- try {
- File outputFile =
- new File(
- options.getPeriodicStatusPageOutputDirectory(),
- ("StreamingDataflowWorker"
- + options.getWorkerId()
- + "_"
- + page.pageName()
- + timestamp
- + ".html")
- .replaceAll("/", "_"));
- writer = new PrintWriter(outputFile, UTF_8.name());
- page.captureData(writer);
- } catch (IOException e) {
- LOG.warn("Error dumping status page.", e);
- } finally {
- if (writer != null) {
- writer.close();
- }
- }
- }
- },
- 60,
- 60,
- TimeUnit.SECONDS);
- scheduledExecutors.add(statusPageTimer);
- }
workCommitter.start();
workerStatusReporter.start();
activeWorkRefresher.start();
}
public void startStatusPages() {
Review Comment:
Add a comment that this may be omitted for lighterweight testing?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import static java.util.stream.StreamSupport.stream;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet.toImmutableSet;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
+import com.google.api.services.dataflow.model.MapTask;
+import com.google.api.services.dataflow.model.StreamingComputationConfig;
+import com.google.api.services.dataflow.model.StreamingConfigTask;
+import com.google.api.services.dataflow.model.WorkItem;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.worker.WorkUnitClient;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+@ThreadSafe
+public final class StreamingEngineComputationConfigFetcher implements
ComputationConfig.Fetcher {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(StreamingEngineComputationConfigFetcher.class);
+ private static final String GLOBAL_PIPELINE_CONFIG_REFRESHER =
"GlobalPipelineConfigRefresher";
+
+ private final long globalConfigRefreshPeriodMillis;
+ private final WorkUnitClient dataflowServiceClient;
+ private final ScheduledExecutorService globalConfigRefresher;
+ private final Consumer<StreamingEnginePipelineConfig> onStreamingConfig;
+ private final AtomicBoolean hasReceivedGlobalConfig;
+
+ private StreamingEngineComputationConfigFetcher(
+ boolean hasReceivedGlobalConfig,
+ long globalConfigRefreshPeriodMillis,
+ WorkUnitClient dataflowServiceClient,
+ ScheduledExecutorService globalConfigRefresher,
+ Consumer<StreamingEnginePipelineConfig> onStreamingConfig) {
+ this.globalConfigRefreshPeriodMillis = globalConfigRefreshPeriodMillis;
+ this.dataflowServiceClient = dataflowServiceClient;
+ this.globalConfigRefresher = globalConfigRefresher;
+ this.onStreamingConfig = onStreamingConfig;
+ this.hasReceivedGlobalConfig = new AtomicBoolean(hasReceivedGlobalConfig);
+ }
+
+ public static StreamingEngineComputationConfigFetcher create(
+ long globalConfigRefreshPeriodMillis,
+ WorkUnitClient dataflowServiceClient,
+ Consumer<StreamingEnginePipelineConfig> onStreamingConfig) {
+ return new StreamingEngineComputationConfigFetcher(
+ /* hasReceivedGlobalConfig= */ false,
+ globalConfigRefreshPeriodMillis,
+ dataflowServiceClient,
+ Executors.newSingleThreadScheduledExecutor(
+ new
ThreadFactoryBuilder().setNameFormat(GLOBAL_PIPELINE_CONFIG_REFRESHER).build()),
+ onStreamingConfig);
+ }
+
+ public static StreamingEngineComputationConfigFetcher forTesting(
+ boolean hasReceivedGlobalConfig,
+ long globalConfigRefreshPeriodMillis,
+ WorkUnitClient dataflowServiceClient,
+ Function<String, ScheduledExecutorService> executorSupplier,
+ Consumer<StreamingEnginePipelineConfig> onStreamingConfig) {
+ return new StreamingEngineComputationConfigFetcher(
+ hasReceivedGlobalConfig,
+ globalConfigRefreshPeriodMillis,
+ dataflowServiceClient,
+ executorSupplier.apply(GLOBAL_PIPELINE_CONFIG_REFRESHER),
+ onStreamingConfig);
+ }
+
+ private static BackOff defaultConfigBackoff() {
+ return FluentBackoff.DEFAULT
+ .withInitialBackoff(Duration.millis(100))
+ .withMaxBackoff(Duration.standardMinutes(1))
+ .withMaxCumulativeBackoff(Duration.standardMinutes(5))
+ .backoff();
+ }
+
+ private MapTask createMapTask(StreamingComputationConfig computationConfig) {
+ return new MapTask()
+ .setSystemName(computationConfig.getSystemName())
+ .setStageName(computationConfig.getStageName())
+ .setInstructions(computationConfig.getInstructions());
+ }
+
+ @Override
+ public void start() {
+ fetchInitialPipelineGlobalConfig();
+ schedulePeriodicGlobalConfigRequests();
+ }
+
+ @Override
+ public Optional<ComputationConfig> getConfig(String computationId) {
+ Preconditions.checkArgument(
+ !computationId.isEmpty(),
+ "computationId is empty. Cannot fetch computation config without a
computationId.");
+ return getComputationConfigInternal(computationId)
+ .flatMap(
+ config -> {
+ Map<String, String> stateNameMap =
config.userStepToStateFamilyNameMap();
+ return config
+ .computationConfig()
+ .map(
+ computationConfig ->
+ ComputationConfig.create(
+ createMapTask(computationConfig),
+
computationConfig.getTransformUserNameToStateFamily(),
+ stateNameMap));
+ });
+ }
+
+ @Override
+ public void stop() {
+ // We have already shutdown or start has not been called.
+ if (globalConfigRefresher.isShutdown() || !hasReceivedGlobalConfig.get()) {
+ return;
+ }
+
+ globalConfigRefresher.shutdown();
+ boolean isTerminated = false;
+ try {
+ isTerminated = globalConfigRefresher.awaitTermination(10,
TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOG.warn("Error occurred shutting down: {}", globalConfigRefresher);
+ }
+ if (!isTerminated) {
+ globalConfigRefresher.shutdownNow();
+ }
+ }
+
+ /**
+ * Initially b Schedules a background thread that periodically sends
getConfig requests to
+ * Dataflow Service to obtain the windmill service endpoint.
+ */
+ @SuppressWarnings("FutureReturnValueIgnored")
+ private void schedulePeriodicGlobalConfigRequests() {
+ globalConfigRefresher.scheduleWithFixedDelay(
+ () -> getGlobalConfig().ifPresent(onStreamingConfig),
+ 0,
+ globalConfigRefreshPeriodMillis,
+ TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Blocks until we have received the initial global config from Dataflow or
an exception is
+ * thrown.
+ */
+ private synchronized void fetchInitialPipelineGlobalConfig() {
+ if (!hasReceivedGlobalConfig.get()) {
+ // Get the initial global configuration. This will initialize the
windmillServer stub.
+ while (true) {
+ LOG.info("Sending request to get initial global configuration for this
worker.");
+ Optional<StreamingEnginePipelineConfig> globalConfig =
getGlobalConfig();
+ if (globalConfig.isPresent()) {
+ onStreamingConfig.accept(globalConfig.get());
+ hasReceivedGlobalConfig.compareAndSet(false, true);
Review Comment:
just set it true, no need for the compare
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import static java.util.stream.StreamSupport.stream;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet.toImmutableSet;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
+import com.google.api.services.dataflow.model.MapTask;
+import com.google.api.services.dataflow.model.StreamingComputationConfig;
+import com.google.api.services.dataflow.model.StreamingConfigTask;
+import com.google.api.services.dataflow.model.WorkItem;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.worker.WorkUnitClient;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+@ThreadSafe
+public final class StreamingEngineComputationConfigFetcher implements
ComputationConfig.Fetcher {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(StreamingEngineComputationConfigFetcher.class);
+ private static final String GLOBAL_PIPELINE_CONFIG_REFRESHER =
"GlobalPipelineConfigRefresher";
+
+ private final long globalConfigRefreshPeriodMillis;
+ private final WorkUnitClient dataflowServiceClient;
+ private final ScheduledExecutorService globalConfigRefresher;
+ private final Consumer<StreamingEnginePipelineConfig> onStreamingConfig;
+ private final AtomicBoolean hasReceivedGlobalConfig;
+
+ private StreamingEngineComputationConfigFetcher(
+ boolean hasReceivedGlobalConfig,
+ long globalConfigRefreshPeriodMillis,
+ WorkUnitClient dataflowServiceClient,
+ ScheduledExecutorService globalConfigRefresher,
+ Consumer<StreamingEnginePipelineConfig> onStreamingConfig) {
+ this.globalConfigRefreshPeriodMillis = globalConfigRefreshPeriodMillis;
+ this.dataflowServiceClient = dataflowServiceClient;
+ this.globalConfigRefresher = globalConfigRefresher;
+ this.onStreamingConfig = onStreamingConfig;
+ this.hasReceivedGlobalConfig = new AtomicBoolean(hasReceivedGlobalConfig);
+ }
+
+ public static StreamingEngineComputationConfigFetcher create(
+ long globalConfigRefreshPeriodMillis,
+ WorkUnitClient dataflowServiceClient,
+ Consumer<StreamingEnginePipelineConfig> onStreamingConfig) {
+ return new StreamingEngineComputationConfigFetcher(
+ /* hasReceivedGlobalConfig= */ false,
+ globalConfigRefreshPeriodMillis,
+ dataflowServiceClient,
+ Executors.newSingleThreadScheduledExecutor(
+ new
ThreadFactoryBuilder().setNameFormat(GLOBAL_PIPELINE_CONFIG_REFRESHER).build()),
+ onStreamingConfig);
+ }
+
+ public static StreamingEngineComputationConfigFetcher forTesting(
+ boolean hasReceivedGlobalConfig,
+ long globalConfigRefreshPeriodMillis,
+ WorkUnitClient dataflowServiceClient,
+ Function<String, ScheduledExecutorService> executorSupplier,
+ Consumer<StreamingEnginePipelineConfig> onStreamingConfig) {
+ return new StreamingEngineComputationConfigFetcher(
+ hasReceivedGlobalConfig,
+ globalConfigRefreshPeriodMillis,
+ dataflowServiceClient,
+ executorSupplier.apply(GLOBAL_PIPELINE_CONFIG_REFRESHER),
+ onStreamingConfig);
+ }
+
+ private static BackOff defaultConfigBackoff() {
+ return FluentBackoff.DEFAULT
+ .withInitialBackoff(Duration.millis(100))
+ .withMaxBackoff(Duration.standardMinutes(1))
+ .withMaxCumulativeBackoff(Duration.standardMinutes(5))
+ .backoff();
+ }
+
+ private MapTask createMapTask(StreamingComputationConfig computationConfig) {
Review Comment:
could be static
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCache.java:
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import static java.util.stream.Collectors.toConcurrentMap;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList.toImmutableList;
+
+import com.google.api.services.dataflow.model.MapTask;
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+import javax.annotation.concurrent.ThreadSafe;
+import
org.apache.beam.runners.dataflow.worker.apiary.FixMultiOutputInfosOnParDoInstructions;
+import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.fn.IdGenerator;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Cache of {@link String} computationId to {@link ComputationState}. */
+@Internal
+@ThreadSafe
+public final class ComputationStateCache implements StatusDataProvider {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ComputationStateCache.class);
+
+ private final LoadingCache<String, ComputationState> computationCache;
+
+ /**
+ * Fix up MapTask representation because MultiOutputInfos are missing from
system generated
+ * ParDoInstructions.
+ */
+ private final Function<MapTask, MapTask>
fixMultiOutputInfosOnParDoInstructions;
+
+ private ComputationStateCache(
+ LoadingCache<String, ComputationState> computationCache,
+ Function<MapTask, MapTask> fixMultiOutputInfosOnParDoInstructions) {
+ this.computationCache = computationCache;
+ this.fixMultiOutputInfosOnParDoInstructions =
fixMultiOutputInfosOnParDoInstructions;
+ }
+
+ public static ComputationStateCache create(
+ ComputationConfig.Fetcher computationConfigFetcher,
+ BoundedQueueExecutor workUnitExecutor,
+ Function<String, WindmillStateCache.ForComputation>
perComputationStateCacheViewFactory,
+ IdGenerator idGenerator) {
+ Function<MapTask, MapTask> fixMultiOutputInfosOnParDoInstructions =
+ new FixMultiOutputInfosOnParDoInstructions(idGenerator);
+ ConcurrentMap<String, String> stateNameMap = new ConcurrentHashMap<>();
+ return new ComputationStateCache(
+ CacheBuilder.newBuilder()
+ .build(
+ newComputationStateCacheLoader(
+ computationConfigFetcher,
+ workUnitExecutor,
+ perComputationStateCacheViewFactory,
+ fixMultiOutputInfosOnParDoInstructions,
+ stateNameMap)),
+ fixMultiOutputInfosOnParDoInstructions);
+ }
+
+ @VisibleForTesting
+ public static ComputationStateCache forTesting(
+ ComputationConfig.Fetcher computationConfigFetcher,
+ BoundedQueueExecutor workUnitExecutor,
+ Function<String, WindmillStateCache.ForComputation>
perComputationStateCacheViewFactory,
+ IdGenerator idGenerator,
+ ConcurrentMap<String, String> stateNameMap) {
+ Function<MapTask, MapTask> fixMultiOutputInfosOnParDoInstructions =
+ new FixMultiOutputInfosOnParDoInstructions(idGenerator);
+ return new ComputationStateCache(
+ CacheBuilder.newBuilder()
+ .build(
+ newComputationStateCacheLoader(
+ computationConfigFetcher,
+ workUnitExecutor,
+ perComputationStateCacheViewFactory,
+ fixMultiOutputInfosOnParDoInstructions,
+ stateNameMap)),
+ fixMultiOutputInfosOnParDoInstructions);
+ }
+
+ private static CacheLoader<String, ComputationState>
newComputationStateCacheLoader(
+ ComputationConfig.Fetcher computationConfigFetcher,
+ BoundedQueueExecutor workUnitExecutor,
+ Function<String, WindmillStateCache.ForComputation>
perComputationStateCacheViewFactory,
+ Function<MapTask, MapTask> fixMultiOutputInfosOnParDoInstructions,
+ ConcurrentMap<String, String> stateNameMap) {
+ return new CacheLoader<String, ComputationState>() {
+ @Override
+ public ComputationState load(String computationId) {
+ // LoadingCache load(K key) will throw an exception if we return null
here,
+ // throw ComputationStateNotFoundException to represent semantics
better.
+ ComputationConfig computationConfig =
+ computationConfigFetcher
+ .getConfig(computationId)
+ .orElseThrow(() -> new
ComputationStateNotFoundException(computationId));
+ stateNameMap.putAll(computationConfig.stateNameMap());
+ return new ComputationState(
+ computationId,
+
fixMultiOutputInfosOnParDoInstructions.apply(computationConfig.mapTask()),
+ workUnitExecutor,
+ !computationConfig.userTransformToStateFamilyName().isEmpty()
+ ? computationConfig.userTransformToStateFamilyName()
+ : stateNameMap,
+ perComputationStateCacheViewFactory.apply(computationId));
+ }
+ };
+ }
+
+ /**
+ * Returns the {@link ComputationState} associated with the given
computationId. May perform IO if
+ * a value is not present, and it is possible that after IO is performed
there is no value
+ * correlated with that computationId.
+ */
+ public Optional<ComputationState> get(String computationId) {
+ try {
+ return Optional.ofNullable(computationCache.get(computationId));
+ } catch (ExecutionException | ComputationStateNotFoundException e) {
+ if (e.getCause() instanceof ComputationStateNotFoundException) {
+ LOG.error(
+ "Trying to fetch unknown computation={}, known computations are
{}.",
+ computationId,
+ ImmutableSet.copyOf(computationCache.asMap().keySet()));
+ } else {
+ LOG.warn("Error occurred fetching computation for computationId={}",
computationId, e);
+ }
+ }
+
+ return Optional.empty();
+ }
+
+ public Optional<ComputationState> getIfPresent(String computationId) {
+ return Optional.ofNullable(computationCache.getIfPresent(computationId));
+ }
+
+ /** Returns a read-only view of all computations. */
+ public ImmutableList<ComputationState> getAllComputations() {
Review Comment:
should this be getAllPresentComputations?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]