m-trieu commented on code in PR #31504:
URL: https://github.com/apache/beam/pull/31504#discussion_r1643812956
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java:
##########
@@ -278,47 +278,82 @@ public Windmill.GlobalData
getSideInputData(Windmill.GlobalDataRequest request)
}
}
- /** Tells windmill processing is ongoing for the given keys. */
- public void refreshActiveWork(Map<String, List<HeartbeatRequest>>
heartbeats) {
+ public Windmill.GlobalData getSideInputData(
+ GetDataStream getDataStream, Windmill.GlobalDataRequest request) {
+ gcThrashingMonitor.waitForResources("GetSideInputData");
+ activeSideInputs.getAndIncrement();
+ try {
+ return getDataStream.requestGlobalData(request);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to get side input: ", e);
+ } finally {
+ activeSideInputs.getAndDecrement();
+ }
+ }
+
+ public WindmillStreamPool<GetDataStream> getGetDataStreamPool() {
+ return getDataStreamPool;
+ }
+
+ /**
+ * Attempts to refresh active work, fanning out to each {@link
GetDataStream} in parallel.
+ *
+ * @implNote Skips closed {@link GetDataStream}(s).
+ */
+ public void refreshActiveWork(
+ Map<HeartbeatSender, Map<String, List<HeartbeatRequest>>> heartbeats) {
if (heartbeats.isEmpty()) {
return;
}
- activeHeartbeats.set(heartbeats.size());
+
try {
- if (useStreamingRequests) {
- GetDataStream stream = heartbeatStreamPool.getStream();
- try {
- stream.refreshActiveWork(heartbeats);
- } finally {
- heartbeatStreamPool.releaseStream(stream);
- }
- } else {
- // This code path is only used by appliance which sends heartbeats
(used to refresh active
- // work) as KeyedGetDataRequests. So we must translate the
HeartbeatRequest to a
- // KeyedGetDataRequest here regardless of the value of
sendKeyedGetDataRequests.
- Windmill.GetDataRequest.Builder builder =
Windmill.GetDataRequest.newBuilder();
- for (Map.Entry<String, List<HeartbeatRequest>> entry :
heartbeats.entrySet()) {
- Windmill.ComputationGetDataRequest.Builder perComputationBuilder =
- Windmill.ComputationGetDataRequest.newBuilder();
- perComputationBuilder.setComputationId(entry.getKey());
- for (HeartbeatRequest request : entry.getValue()) {
- perComputationBuilder.addRequests(
- Windmill.KeyedGetDataRequest.newBuilder()
- .setShardingKey(request.getShardingKey())
- .setWorkToken(request.getWorkToken())
- .setCacheToken(request.getCacheToken())
-
.addAllLatencyAttribution(request.getLatencyAttributionList())
- .build());
- }
- builder.addRequests(perComputationBuilder.build());
- }
- server.getData(builder.build());
+ // There is 1 destination to send heartbeat requests.
+ if (heartbeats.size() == 1) {
+ Map.Entry<HeartbeatSender, Map<String, List<HeartbeatRequest>>>
heartbeat =
+ Iterables.getOnlyElement(heartbeats.entrySet());
+ HeartbeatSender sender = heartbeat.getKey();
+ sender.sendHeartbeats(heartbeat.getValue());
+ }
+
+ // There are multiple destinations to send heartbeat requests. Fan out
requests in parallel.
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]