scwhittle commented on code in PR #31784:
URL: https://github.com/apache/beam/pull/31784#discussion_r1667175263


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitter.java:
##########
@@ -104,17 +116,17 @@ public long currentActiveCommitBytes() {
 
   @Override
   public void stop() {
-    if (!commitSenders.isTerminated()) {
+    if (isRunning.compareAndSet(true, false) && !commitSenders.isTerminated()) 
{

Review Comment:
   ditto, can we just enforce stop called once and after start?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/DirectHeartbeatSender.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work.refresh;
+
+import org.apache.beam.runners.dataflow.worker.streaming.RefreshableWork;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
+import org.apache.beam.sdk.annotations.Internal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link HeartbeatSender} implementation that sends heartbeats directly on 
the underlying stream if
+ * the stream is not closed.
+ *
+ * @implNote
+ *     <p>{@link #equals(Object)} and {@link #hashCode()} implementations 
delegate to internal

Review Comment:
   add to comment
   This class is a stateless decorator to the underlying stream.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/getdata/ThrottlingGetDataMetricTracker.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.getdata;
+
+import com.google.auto.value.AutoValue;
+import java.io.PrintWriter;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
+import org.apache.beam.sdk.annotations.Internal;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Wraps GetData calls that tracks metrics for the number of in-flight 
requests and throttles
+ * requests when memory pressure is high.
+ */
+@Internal
+@ThreadSafe
+public final class ThrottlingGetDataMetricTracker {
+  private final MemoryMonitor gcThrashingMonitor;
+  private final GetDataMetrics getDataMetrics;
+
+  public ThrottlingGetDataMetricTracker(MemoryMonitor gcThrashingMonitor) {
+    this.gcThrashingMonitor = gcThrashingMonitor;
+    this.getDataMetrics = GetDataMetrics.create();
+  }
+
+  /**
+   * Tracks a GetData call. If there is memory pressure, may throttle 
requests. Returns an {@link
+   * AutoCloseable} that will decrement the metric after the call is finished.
+   */
+  public AutoCloseable trackSingleCallWithThrottling(Type callType) {
+    gcThrashingMonitor.waitForResources(callType.debugName);
+    AtomicInteger getDataMetricTracker = getDataMetrics.getMetricFor(callType);
+    getDataMetricTracker.getAndIncrement();
+    return getDataMetricTracker::getAndDecrement;
+  }
+
+  /**
+   * Tracks heartbeat request metrics. Returns an {@link AutoCloseable} that 
will decrement the
+   * metric after the call is finished.
+   */
+  public AutoCloseable trackHeartbeats(int numHeartbeats) {
+    getDataMetrics
+        .activeHeartbeats()
+        .getAndUpdate(currentActiveHeartbeats -> currentActiveHeartbeats + 
numHeartbeats);
+    // Active heartbeats should never drop below 0.
+    return () ->
+        getDataMetrics
+            .activeHeartbeats()
+            .getAndUpdate(existing -> Math.max(existing - numHeartbeats, 0));

Review Comment:
   think you can get rid of the max now that we're no longer setting to 0



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java:
##########
@@ -190,14 +193,15 @@ public GetWorkStream createGetWorkStream(
   }
 
   public GetWorkStream createDirectGetWorkStream(
-      CloudWindmillServiceV1Alpha1Stub stub,
+      WindmillConnection connection,
       GetWorkRequest request,
       ThrottleTimer getWorkThrottleTimer,
       Supplier<GetDataStream> getDataStream,
       Supplier<WorkCommitter> workCommitter,
       WorkItemScheduler workItemScheduler) {
     return GrpcDirectGetWorkStream.create(
-        responseObserver -> 
withDefaultDeadline(stub).getWorkStream(responseObserver),
+        connection.backendWorkerToken().orElse(NO_BACKEND_WORKER_TOKEN),

Review Comment:
   perhaps we should remove the use of optional for this class?
   It seems like unnecessary tri-state of not-present, empty, set



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/getdata/ApplianceGetDataClient.java:
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.getdata;
+
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.worker.WindmillComputationKey;
+import 
org.apache.beam.runners.dataflow.worker.windmill.ApplianceWindmillClient;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.ComputationGetDataRequest;
+import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.Heartbeat;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender;
+import org.apache.beam.sdk.annotations.Internal;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.SettableFuture;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Appliance implementation of {@link GetDataClient}. */
+@Internal
+@ThreadSafe
+public final class ApplianceGetDataClient implements GetDataClient, 
WorkRefreshClient {
+  private static final int MAX_READS_PER_BATCH = 60;
+  private static final int MAX_ACTIVE_READS = 10;
+
+  private final ApplianceWindmillClient windmillClient;
+  private final ThrottlingGetDataMetricTracker getDataMetricTracker;
+
+  @GuardedBy("this")
+  private final List<ReadBatch> pendingReadBatches;
+
+  @GuardedBy("this")
+  private int activeReadThreads;
+
+  public ApplianceGetDataClient(
+      ApplianceWindmillClient windmillClient, ThrottlingGetDataMetricTracker 
getDataMetricTracker) {
+    this.windmillClient = windmillClient;
+    this.getDataMetricTracker = getDataMetricTracker;
+    this.pendingReadBatches = new ArrayList<>();
+    this.activeReadThreads = 0;
+  }
+
+  public static GetDataClient create(
+      ApplianceWindmillClient windmillClient, ThrottlingGetDataMetricTracker 
getDataMetricTracker) {
+    return new ApplianceGetDataClient(windmillClient, getDataMetricTracker);
+  }
+
+  @Override
+  public Windmill.KeyedGetDataResponse getStateData(
+      String computation, Windmill.KeyedGetDataRequest request) {
+    try (AutoCloseable ignored =
+        getDataMetricTracker.trackSingleCallWithThrottling(
+            ThrottlingGetDataMetricTracker.Type.STATE)) {
+      SettableFuture<Windmill.KeyedGetDataResponse> response = 
SettableFuture.create();
+      ReadBatch batch = addToReadBatch(new QueueEntry(computation, request, 
response));
+      if (batch != null) {
+        issueReadBatch(batch);
+      }
+      return response.get();
+    } catch (Exception e) {
+      throw new GetDataException(
+          "Error occurred fetching state for computation="
+              + computation
+              + ", key="
+              + request.getShardingKey(),
+          e);
+    }
+  }
+
+  @Override
+  public Windmill.GlobalData getSideInputData(Windmill.GlobalDataRequest 
request) {
+    try (AutoCloseable ignored =
+        getDataMetricTracker.trackSingleCallWithThrottling(
+            ThrottlingGetDataMetricTracker.Type.STATE)) {
+      return windmillClient
+          
.getData(Windmill.GetDataRequest.newBuilder().addGlobalDataFetchRequests(request).build())
+          .getGlobalData(0);
+    } catch (Exception e) {
+      throw new GetDataException(
+          "Error occurred fetching side input for tag=" + request.getDataId(), 
e);
+    }
+  }
+
+  /**
+   * Appliance sends heartbeats (used to refresh active work) as 
KeyedGetDataRequests. So we must
+   * translate the HeartbeatRequest to a KeyedGetDataRequest.
+   */
+  @Override
+  public void refreshActiveWork(Map<HeartbeatSender, Heartbeat> heartbeats) {
+    if (heartbeats.isEmpty()) {
+      return;
+    }
+
+    for (Map.Entry<HeartbeatSender, Heartbeat> heartbeatToSend : 
heartbeats.entrySet()) {

Review Comment:
   should jsut be a single sender in this case, could assert and simplify code



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java:
##########
@@ -147,6 +177,29 @@ private static LatencyAttribution.Builder 
createLatencyAttributionWithActiveLate
     return latencyAttribution;
   }
 
+  private static String buildLatencyTrackingId(WorkItem workItem) {
+    return Long.toHexString(workItem.getShardingKey())
+        + '-'
+        + Long.toHexString(workItem.getWorkToken());
+  }
+
+  /** Returns a new {@link Work} instance with the same state and a different 
failure handler. */
+  public Work withFailureHandler(Runnable onFailed) {
+    return new Work(

Review Comment:
   assert there was not already a failure handler? maybe just have a 
setfailurehandler method?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitter.java:
##########
@@ -85,7 +88,7 @@ public static StreamingEngineWorkCommitter create(
   @Override
   @SuppressWarnings("FutureReturnValueIgnored")
   public void start() {
-    if (!commitSenders.isShutdown()) {
+    if (isRunning.compareAndSet(false, true) && !commitSenders.isShutdown()) {

Review Comment:
   do we call start multiple times?
   seems simpler if we just fail on it called multiple times and remove if it 
is possible



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresher.java:
##########
@@ -103,5 +116,35 @@ private void invalidateStuckCommits() {
     }
   }
 
-  protected abstract void refreshActiveWork();
+  private void refreshActiveWork() {
+    Instant refreshDeadline = 
clock.get().minus(Duration.millis(activeWorkRefreshPeriodMillis));
+
+    Map<HeartbeatSender, Heartbeat> fannedOutHeartbeatRequests = new 
HashMap<>();

Review Comment:
   maybe heartbeatRequestsBySender seems to capture intent clearer



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresher.java:
##########
@@ -103,5 +116,35 @@ private void invalidateStuckCommits() {
     }
   }
 
-  protected abstract void refreshActiveWork();
+  private void refreshActiveWork() {
+    Instant refreshDeadline = 
clock.get().minus(Duration.millis(activeWorkRefreshPeriodMillis));
+
+    Map<HeartbeatSender, Heartbeat> fannedOutHeartbeatRequests = new 
HashMap<>();
+
+    for (ComputationState computationState : computations.get()) {
+      String computationId = computationState.getComputationId();
+
+      // Get heartbeat requests for computation's current active work, 
aggregated by GetDataStream

Review Comment:
   update the comments about GetDataStream throughout to heartbeatsender



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java:
##########
@@ -41,6 +45,24 @@ public interface WindmillStream {
   /** Returns when the stream was opened. */
   Instant startTime();
 
+  /**
+   * Shutdown the stream. There should be no further interactions with the 
stream once this has been
+   * called.
+   */
+  void shutdown();

Review Comment:
   clarify how this is different than close()
   
   maybe rename close() to halfClose? that is the grpc terminology and maybe is 
clearer as in java closing is usually final thing



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/DirectHeartbeatSender.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work.refresh;
+
+import org.apache.beam.runners.dataflow.worker.streaming.RefreshableWork;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
+import org.apache.beam.sdk.annotations.Internal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link HeartbeatSender} implementation that sends heartbeats directly on 
the underlying stream if
+ * the stream is not closed.
+ *
+ * @implNote
+ *     <p>{@link #equals(Object)} and {@link #hashCode()} implementations 
delegate to internal
+ *     {@link GetDataStream} implementations so that requests can be grouped 
and sent on the same
+ *     stream instance.
+ */
+@Internal
+public final class DirectHeartbeatSender implements HeartbeatSender {

Review Comment:
   maybe FixedStreamHeartbeatSender/ SingleStreamHeartbeatSender ?
   
   Since the code woudl work if the stream was direct or to the dispatcher



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/Heartbeat.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work.refresh;
+
+import com.google.auto.value.AutoValue;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.dataflow.worker.streaming.RefreshableWork;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+
+/** Heartbeat requests and the work that was used to generate the heartbeat 
requests. */
+@AutoValue
+public abstract class Heartbeat {

Review Comment:
   Seems like this shoudl be Heartbeats or HeartbeatsBatch , something to 
indicate it is multiple.
   
   Can we change the colleciton of work and requests to be immutable? 
   Since just created one place you could use some ImmutalbeMap builder there 
and change create to take those immutable maps.
   
   Or could add some builder class for this that has immutable map builder 
inside of it.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/DirectHeartbeatSender.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work.refresh;
+
+import org.apache.beam.runners.dataflow.worker.streaming.RefreshableWork;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
+import org.apache.beam.sdk.annotations.Internal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link HeartbeatSender} implementation that sends heartbeats directly on 
the underlying stream if
+ * the stream is not closed.
+ *
+ * @implNote
+ *     <p>{@link #equals(Object)} and {@link #hashCode()} implementations 
delegate to internal
+ *     {@link GetDataStream} implementations so that requests can be grouped 
and sent on the same
+ *     stream instance.
+ */
+@Internal
+public final class DirectHeartbeatSender implements HeartbeatSender {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DirectHeartbeatSender.class);
+  private final GetDataStream getDataStream;
+
+  private DirectHeartbeatSender(GetDataStream getDataStream) {
+    this.getDataStream = getDataStream;
+  }
+
+  public static DirectHeartbeatSender create(GetDataStream getDataStream) {
+    return new DirectHeartbeatSender(getDataStream);
+  }
+
+  @Override
+  public void sendHeartbeats(Heartbeat heartbeats) {
+    if (getDataStream.isShutdown()) {
+      LOG.warn(
+          "Trying to refresh work w/ {} heartbeats on stream={} after work has 
moved off of worker."
+              + " heartbeats",
+          getDataStream.backendWorkerToken(),
+          heartbeats.heartbeatRequests().size());
+      heartbeats.work().forEach(RefreshableWork::setFailed);
+    } else {
+      getDataStream.refreshActiveWork(heartbeats.heartbeatRequests());
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    return getDataStream.hashCode();

Review Comment:
   could use Objects.hash(DirectHeartbeatSender.class, getDataStream);



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresher.java:
##########
@@ -103,5 +116,35 @@ private void invalidateStuckCommits() {
     }
   }
 
-  protected abstract void refreshActiveWork();
+  private void refreshActiveWork() {
+    Instant refreshDeadline = 
clock.get().minus(Duration.millis(activeWorkRefreshPeriodMillis));
+
+    Map<HeartbeatSender, Heartbeat> fannedOutHeartbeatRequests = new 
HashMap<>();
+
+    for (ComputationState computationState : computations.get()) {
+      String computationId = computationState.getComputationId();
+
+      // Get heartbeat requests for computation's current active work, 
aggregated by GetDataStream
+      // to correctly fan-out the heartbeat requests.
+      Table<HeartbeatSender, RefreshableWork, HeartbeatRequest> heartbeats =
+          HeartbeatRequests.getRefreshableKeyHeartbeats(

Review Comment:
   this seems to be the only use of this and it doesn't really use the table, 
just iterates over the cells.  Can this method instead be changed to add to the 
heartbeatsender -> heartbeats builder? (see other comment)
   
   Seems like we're giong to have a lot of unneeded allocations to build the 
rows and tables instead of just building into what we want in the end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to