m-trieu commented on code in PR #31133:
URL: https://github.com/apache/beam/pull/31133#discussion_r1591858956


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import static java.util.stream.StreamSupport.stream;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet.toImmutableSet;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
+import com.google.api.services.dataflow.model.MapTask;
+import com.google.api.services.dataflow.model.StreamingComputationConfig;
+import com.google.api.services.dataflow.model.StreamingConfigTask;
+import com.google.api.services.dataflow.model.WorkItem;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.worker.WorkUnitClient;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.BackOffUtils;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+@ThreadSafe
+public final class StreamingEngineComputationConfigFetcher implements 
ComputationConfig.Fetcher {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(StreamingEngineComputationConfigFetcher.class);
+  private static final String GLOBAL_PIPELINE_CONFIG_REFRESHER = 
"GlobalPipelineConfigRefresher";
+
+  private final long globalConfigRefreshPeriodMillis;
+  private final WorkUnitClient dataflowServiceClient;
+  private final ScheduledExecutorService globalConfigRefresher;
+  private final Consumer<StreamingEnginePipelineConfig> onStreamingConfig;
+  private final AtomicBoolean hasReceivedGlobalConfig;
+
+  private StreamingEngineComputationConfigFetcher(
+      boolean hasReceivedGlobalConfig,
+      long globalConfigRefreshPeriodMillis,
+      WorkUnitClient dataflowServiceClient,
+      ScheduledExecutorService globalConfigRefresher,
+      Consumer<StreamingEnginePipelineConfig> onStreamingConfig) {
+    this.globalConfigRefreshPeriodMillis = globalConfigRefreshPeriodMillis;
+    this.dataflowServiceClient = dataflowServiceClient;
+    this.globalConfigRefresher = globalConfigRefresher;
+    this.onStreamingConfig = onStreamingConfig;
+    this.hasReceivedGlobalConfig = new AtomicBoolean(hasReceivedGlobalConfig);
+  }
+
+  public static StreamingEngineComputationConfigFetcher create(
+      long globalConfigRefreshPeriodMillis,
+      WorkUnitClient dataflowServiceClient,
+      Consumer<StreamingEnginePipelineConfig> onStreamingConfig) {
+    return new StreamingEngineComputationConfigFetcher(
+        /* hasReceivedGlobalConfig= */ false,
+        globalConfigRefreshPeriodMillis,
+        dataflowServiceClient,
+        Executors.newSingleThreadScheduledExecutor(
+            new 
ThreadFactoryBuilder().setNameFormat(GLOBAL_PIPELINE_CONFIG_REFRESHER).build()),
+        onStreamingConfig);
+  }
+
+  public static StreamingEngineComputationConfigFetcher forTesting(
+      boolean hasReceivedGlobalConfig,
+      long globalConfigRefreshPeriodMillis,
+      WorkUnitClient dataflowServiceClient,
+      Function<String, ScheduledExecutorService> executorSupplier,
+      Consumer<StreamingEnginePipelineConfig> onStreamingConfig) {
+    return new StreamingEngineComputationConfigFetcher(
+        hasReceivedGlobalConfig,
+        globalConfigRefreshPeriodMillis,
+        dataflowServiceClient,
+        executorSupplier.apply(GLOBAL_PIPELINE_CONFIG_REFRESHER),
+        onStreamingConfig);
+  }
+
+  private static BackOff defaultConfigBackoff() {
+    return FluentBackoff.DEFAULT
+        .withInitialBackoff(Duration.millis(100))
+        .withMaxBackoff(Duration.standardMinutes(1))
+        .withMaxCumulativeBackoff(Duration.standardMinutes(5))
+        .backoff();
+  }
+
+  private MapTask createMapTask(StreamingComputationConfig computationConfig) {
+    return new MapTask()
+        .setSystemName(computationConfig.getSystemName())
+        .setStageName(computationConfig.getStageName())
+        .setInstructions(computationConfig.getInstructions());
+  }
+
+  @Override
+  public void start() {
+    fetchInitialPipelineGlobalConfig();
+    schedulePeriodicGlobalConfigRequests();
+  }
+
+  @Override
+  public Optional<ComputationConfig> getConfig(String computationId) {
+    Preconditions.checkArgument(
+        !computationId.isEmpty(),
+        "computationId is empty. Cannot fetch computation config without a 
computationId.");
+    return getComputationConfigInternal(computationId)
+        .flatMap(
+            config -> {
+              Map<String, String> stateNameMap = 
config.userStepToStateFamilyNameMap();
+              return config
+                  .computationConfig()
+                  .map(
+                      computationConfig ->
+                          ComputationConfig.create(
+                              createMapTask(computationConfig),
+                              
computationConfig.getTransformUserNameToStateFamily(),
+                              stateNameMap));
+            });
+  }
+
+  @Override
+  public void stop() {
+    // We have already shutdown or start has not been called.
+    if (globalConfigRefresher.isShutdown() || !hasReceivedGlobalConfig.get()) {
+      return;
+    }
+
+    globalConfigRefresher.shutdown();
+    boolean isTerminated = false;
+    try {
+      isTerminated = globalConfigRefresher.awaitTermination(10, 
TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOG.warn("Error occurred shutting down: {}", globalConfigRefresher);
+    }
+    if (!isTerminated) {
+      globalConfigRefresher.shutdownNow();
+    }
+  }
+
+  /**
+   * Initially b Schedules a background thread that periodically sends 
getConfig requests to
+   * Dataflow Service to obtain the windmill service endpoint.
+   */
+  @SuppressWarnings("FutureReturnValueIgnored")
+  private void schedulePeriodicGlobalConfigRequests() {
+    globalConfigRefresher.scheduleWithFixedDelay(
+        () -> getGlobalConfig().ifPresent(onStreamingConfig),
+        0,
+        globalConfigRefreshPeriodMillis,
+        TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Blocks until we have received the initial global config from Dataflow or 
an exception is
+   * thrown.
+   */
+  private synchronized void fetchInitialPipelineGlobalConfig() {
+    if (!hasReceivedGlobalConfig.get()) {
+      // Get the initial global configuration. This will initialize the 
windmillServer stub.
+      while (true) {
+        LOG.info("Sending request to get initial global configuration for this 
worker.");
+        Optional<StreamingEnginePipelineConfig> globalConfig = 
getGlobalConfig();
+        if (globalConfig.isPresent()) {
+          onStreamingConfig.accept(globalConfig.get());
+          hasReceivedGlobalConfig.compareAndSet(false, true);
+          break;
+        }
+        LOG.info("Haven't received initial global configuration, will retry in 
5 seconds");
+        sleepUninterruptibly(5, TimeUnit.SECONDS);
+      }
+    }
+    LOG.info("Initial global configuration received, harness is now ready");
+  }
+
+  private Optional<StreamingEnginePipelineConfig> getComputationConfigInternal(
+      String computationId) {
+    Optional<StreamingEnginePipelineConfig> streamingConfig = 
getConfigInternal(computationId);
+    streamingConfig.ifPresent(onStreamingConfig);
+    return streamingConfig;
+  }
+
+  private Optional<StreamingEnginePipelineConfig> getGlobalConfig() {
+    return getConfigInternal(null);
+  }
+
+  private Optional<StreamingEnginePipelineConfig> getConfigInternal(@Nullable 
String computation) {

Review Comment:
   done



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/ComputationConfig.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import com.google.api.services.dataflow.model.MapTask;
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Internal;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+@Internal
+@AutoValue
+public abstract class ComputationConfig {
+
+  public static ComputationConfig create(
+      MapTask mapTask,
+      @Nullable Map<String, String> userTransformToStateFamilyName,
+      Map<String, String> stateNameMap) {
+    return new AutoValue_ComputationConfig(
+        mapTask,
+        Optional.ofNullable(userTransformToStateFamilyName)
+            .map(ImmutableMap::copyOf)
+            .orElseGet(ImmutableMap::of),
+        ImmutableMap.copyOf(stateNameMap));
+  }
+
+  public abstract MapTask mapTask();
+
+  public abstract ImmutableMap<String, String> 
userTransformToStateFamilyName();
+
+  public abstract ImmutableMap<String, String> stateNameMap();
+
+  /** Interface to fetch configurations for a specific computation. */
+  @FunctionalInterface
+  public interface Fetcher {
+    default void start() {}
+
+    default void stop() {}
+
+    Optional<ComputationConfig> getConfig(String computationId);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to