scwhittle commented on code in PR #31504:
URL: https://github.com/apache/beam/pull/31504#discussion_r1638033694
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -129,8 +131,43 @@ public static GrpcGetDataStream create(
return getDataStream;
}
+ private static String createStreamCancelledErrorMessage(QueuedBatch batch) {
+ return batch.requests().stream()
+ .map(
+ request -> {
+ switch (request.getDataRequest().getKind()) {
+ case GLOBAL:
+ return "GetSideInput=" + request.getDataRequest().global();
+ case COMPUTATION:
+ return
request.getDataRequest().computation().getRequestsList().stream()
+ .map(
+ keyedRequest ->
+ "KeyedGetState=["
+ + "key="
+ + keyedRequest.getKey()
Review Comment:
avoid logging customer keys, they might be PII and they can also be large
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -237,32 +244,124 @@ private StreamingDataflowWorker(
dispatchThread.setName("DispatchThread");
this.clientId = clientId;
this.windmillServer = windmillServer;
+
+ WindmillStreamPool<WindmillStream.GetDataStream> getDataStreamPool =
+ WindmillStreamPool.create(
+ Math.max(1, options.getWindmillGetDataStreamCount()),
+ GET_DATA_STREAM_TIMEOUT,
+ windmillServer::getDataStream);
+
this.metricTrackingWindmillServer =
MetricTrackingWindmillServerStub.builder(windmillServer, memoryMonitor)
.setUseStreamingRequests(windmillServiceEnabled)
-
.setUseSeparateHeartbeatStreams(options.getUseSeparateWindmillHeartbeatStreams())
- .setNumGetDataStreams(options.getWindmillGetDataStreamCount())
+ .setGetDataStreamPool(getDataStreamPool)
.build();
// Register standard file systems.
FileSystems.setDefaultPipelineOptions(options);
+ WorkerStatusPages workerStatusPages =
+ WorkerStatusPages.create(DEFAULT_STATUS_PORT, memoryMonitor);
+
+ this.streamingCounters = streamingCounters;
+ this.memoryMonitor = memoryMonitor;
+
+ this.streamingEngineClient = null;
+ this.streamingWorkScheduler =
+ StreamingWorkScheduler.create(
+ options,
+ clock,
+ readerCache,
+ mapTaskExecutorFactory,
+ workUnitExecutor,
+ stateCache::forComputation,
+ request ->
+ streamingEngineClient != null
+ ? metricTrackingWindmillServer.getSideInputData(
+
streamingEngineClient.getGlobalDataStream(request.getDataId().getTag()),
+ request)
+ : metricTrackingWindmillServer.getSideInputData(request),
+ failureTracker,
+ workFailureProcessor,
+ streamingCounters,
+ hotKeyLogger,
+ sampler,
+ maxWorkItemCommitBytes,
+ ID_GENERATOR,
+ stageInfoMap);
+
int stuckCommitDurationMillis =
windmillServiceEnabled && options.getStuckCommitDurationMillis() > 0
? options.getStuckCommitDurationMillis()
: 0;
+ if (isDirectPathPipeline(options)) {
+ this.streamingEngineClient =
Review Comment:
For SE non-direct path, can we still use StreamingEngineClient but just
hard-code the single dispatcher as the worker endpoint? Seems like it would
allow us to remove streamingDispatchLoop and other duplication and also seems
like we want that to work if we need to fallback from non direct path.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -263,6 +305,22 @@ public void sendHealthCheck() {
}
}
+ @Override
+ public synchronized void close() {
+ super.close();
+
+ // Stream has been explicitly closed. Drain pending input streams and
request batches.
+ // Future calls to send RPCs will fail.
+ pending.values().forEach(AppendableInputStream::cancel);
+ pending.clear();
Review Comment:
clearing pending here differs from cancellation in onNewStream and I think
doing so could lead to verify errors in onResponse.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -129,8 +131,43 @@ public static GrpcGetDataStream create(
return getDataStream;
}
+ private static String createStreamCancelledErrorMessage(QueuedBatch batch) {
+ return batch.requests().stream()
+ .map(
+ request -> {
+ switch (request.getDataRequest().getKind()) {
+ case GLOBAL:
+ return "GetSideInput=" + request.getDataRequest().global();
+ case COMPUTATION:
+ return
request.getDataRequest().computation().getRequestsList().stream()
+ .map(
+ keyedRequest ->
+ "KeyedGetState=["
+ + "key="
+ + keyedRequest.getKey()
Review Comment:
But also not sure it's worth having all the information in the exception,
each key can lot itself when it's read finishes with the error instead.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -308,7 +364,12 @@ private <ResponseT> ResponseT issueRequest(QueuedRequest
request, ParseFn<Respon
}
}
- private void queueRequestAndWait(QueuedRequest request) throws
InterruptedException {
+ private void tryQueueRequestAndWaitForSend(QueuedRequest request) throws
InterruptedException {
+ if (isClosed()) {
Review Comment:
move beneath syncrhonized blcok?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -291,14 +349,12 @@ private <ResponseT> ResponseT issueRequest(QueuedRequest
request, ParseFn<Respon
while (true) {
request.resetResponseStream();
try {
- queueRequestAndWait(request);
+ tryQueueRequestAndWaitForSend(request);
return parseFn.parse(request.getResponseStream());
} catch (CancellationException e) {
// Retry issuing the request since the response stream was cancelled.
- continue;
} catch (IOException e) {
Review Comment:
maybe it woudl help document it is expected by explicitly catching the
WindmillStreamClosedException and rethrowing it? you could verify isClosed if
you get that exception
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java:
##########
@@ -278,47 +278,82 @@ public Windmill.GlobalData
getSideInputData(Windmill.GlobalDataRequest request)
}
}
- /** Tells windmill processing is ongoing for the given keys. */
- public void refreshActiveWork(Map<String, List<HeartbeatRequest>>
heartbeats) {
+ public Windmill.GlobalData getSideInputData(
+ GetDataStream getDataStream, Windmill.GlobalDataRequest request) {
+ gcThrashingMonitor.waitForResources("GetSideInputData");
+ activeSideInputs.getAndIncrement();
+ try {
+ return getDataStream.requestGlobalData(request);
+ } catch (Exception e) {
Review Comment:
does this need to handle WindmillStreamClosedException?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamClosedException.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client;
+
+import org.apache.beam.sdk.annotations.Internal;
+
+@Internal
+public final class WindmillStreamClosedException extends RuntimeException {
+
+ public WindmillStreamClosedException(String message) {
+ super(message);
+ }
+
+ /** Returns whether an exception was caused by a {@link
WindmillStreamClosedException}. */
+ public static boolean isWindmillStreamCancelledException(Throwable t) {
Review Comment:
update method name, actually since this is static and nested under class
name maybe the method should just be wasCausedOf so it looks like:
WindmillStreamClosedException.wasCauseOf(e);
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/HeartbeatSender.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work.refresh;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+
+/** Interface for sending heartbeats. */
+@FunctionalInterface
+public interface HeartbeatSender {
+ void sendHeartbeats(Map<String, List<Windmill.HeartbeatRequest>> heartbeats);
+
+ default boolean isInvalid() {
Review Comment:
can we remove this and just handle it internally to the stream?
In either case it's racy checking this and then sending on stream so the
stream will have to handle it anyway
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java:
##########
@@ -278,47 +278,82 @@ public Windmill.GlobalData
getSideInputData(Windmill.GlobalDataRequest request)
}
}
- /** Tells windmill processing is ongoing for the given keys. */
- public void refreshActiveWork(Map<String, List<HeartbeatRequest>>
heartbeats) {
+ public Windmill.GlobalData getSideInputData(
+ GetDataStream getDataStream, Windmill.GlobalDataRequest request) {
+ gcThrashingMonitor.waitForResources("GetSideInputData");
+ activeSideInputs.getAndIncrement();
+ try {
+ return getDataStream.requestGlobalData(request);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to get side input: ", e);
+ } finally {
+ activeSideInputs.getAndDecrement();
+ }
+ }
+
+ public WindmillStreamPool<GetDataStream> getGetDataStreamPool() {
+ return getDataStreamPool;
+ }
+
+ /**
+ * Attempts to refresh active work, fanning out to each {@link
GetDataStream} in parallel.
+ *
+ * @implNote Skips closed {@link GetDataStream}(s).
+ */
+ public void refreshActiveWork(
+ Map<HeartbeatSender, Map<String, List<HeartbeatRequest>>> heartbeats) {
if (heartbeats.isEmpty()) {
return;
}
- activeHeartbeats.set(heartbeats.size());
+
try {
- if (useStreamingRequests) {
- GetDataStream stream = heartbeatStreamPool.getStream();
- try {
- stream.refreshActiveWork(heartbeats);
- } finally {
- heartbeatStreamPool.releaseStream(stream);
- }
- } else {
- // This code path is only used by appliance which sends heartbeats
(used to refresh active
- // work) as KeyedGetDataRequests. So we must translate the
HeartbeatRequest to a
- // KeyedGetDataRequest here regardless of the value of
sendKeyedGetDataRequests.
- Windmill.GetDataRequest.Builder builder =
Windmill.GetDataRequest.newBuilder();
- for (Map.Entry<String, List<HeartbeatRequest>> entry :
heartbeats.entrySet()) {
- Windmill.ComputationGetDataRequest.Builder perComputationBuilder =
- Windmill.ComputationGetDataRequest.newBuilder();
- perComputationBuilder.setComputationId(entry.getKey());
- for (HeartbeatRequest request : entry.getValue()) {
- perComputationBuilder.addRequests(
- Windmill.KeyedGetDataRequest.newBuilder()
- .setShardingKey(request.getShardingKey())
- .setWorkToken(request.getWorkToken())
- .setCacheToken(request.getCacheToken())
-
.addAllLatencyAttribution(request.getLatencyAttributionList())
- .build());
- }
- builder.addRequests(perComputationBuilder.build());
- }
- server.getData(builder.build());
+ // There is 1 destination to send heartbeat requests.
+ if (heartbeats.size() == 1) {
+ Map.Entry<HeartbeatSender, Map<String, List<HeartbeatRequest>>>
heartbeat =
+ Iterables.getOnlyElement(heartbeats.entrySet());
+ HeartbeatSender sender = heartbeat.getKey();
+ sender.sendHeartbeats(heartbeat.getValue());
+ }
+
+ // There are multiple destinations to send heartbeat requests. Fan out
requests in parallel.
Review Comment:
nit: I think
else {
// comment
is easier to read
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]