scwhittle commented on code in PR #29963:
URL: https://github.com/apache/beam/pull/29963#discussion_r1450262899
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java:
##########
@@ -239,19 +240,23 @@ public Windmill.GlobalData
getSideInputData(Windmill.GlobalDataRequest request)
}
/** Tells windmill processing is ongoing for the given keys. */
Review Comment:
Can you update the comment? Unclear what heartbeats is
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1363,6 +1385,10 @@ private void commitLoop() {
// Adds the commit to the commitStream if it fits, returning true iff it is
consumed.
private boolean addCommitToStream(Commit commit, CommitWorkStream
commitStream) {
Preconditions.checkNotNull(commit);
+ // Drop commits for failed work. Such commits will be dropped by Windmill
anyway.
+ if (commit.work().isFailed()) {
+ return true;
Review Comment:
shoudl we count to show on status page?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -120,6 +123,42 @@ synchronized ActivateWorkResult
activateWorkForKey(ShardedKey shardedKey, Work w
return ActivateWorkResult.QUEUED;
}
+ public static class FailedTokens {
+ public long workToken;
+ public long cacheToken;
+
+ public FailedTokens(long workToken, long cacheToken) {
+ this.workToken = workToken;
+ this.cacheToken = cacheToken;
+ }
+ }
+
+ synchronized void failWorkForKey(Map<Long, List<FailedTokens>> failedWork) {
+ // Note we can't construct a ShardedKey and look it up in activeWork
directly since
+ // HeartbeatResponse doesn't include the user key.
+ for (Entry<ShardedKey, Deque<Work>> entry : activeWork.entrySet()) {
+ List<FailedTokens> failedTokens =
failedWork.get(entry.getKey().shardingKey());
+ if (failedTokens == null) continue;
+ for (FailedTokens failedToken : failedTokens) {
+ for (Work queuedWork : entry.getValue()) {
+ WorkItem workItem = queuedWork.getWorkItem();
+ if (workItem.getWorkToken() == failedToken.workToken
+ && workItem.getCacheToken() == failedToken.cacheToken) {
+ LOG.error(
Review Comment:
warning/info? users don't like errors
We've also added reassuring text like, "This happens during autocaling and
backend events, the data will be reprocessed and not lost" in other cases and
might want somethign like that here too.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -192,11 +205,35 @@ public void refreshActiveWork(Map<String,
List<KeyedGetDataRequest>> active) {
.addRequests(request));
}
}
+ for (Map.Entry<String, List<HeartbeatRequest>> entry :
heartbeats.entrySet()) {
+ for (HeartbeatRequest request : entry.getValue()) {
+ // Calculate the bytes with some overhead for proto encoding.
+ long bytes = (long) entry.getKey().length() +
request.getSerializedSize() + 10;
+ if (builderBytes > 0
+ && (builderBytes + bytes >
AbstractWindmillStream.RPC_STREAM_CHUNK_SIZE
+ || builder.getRequestIdCount() >= streamingRpcBatchLimit)) {
Review Comment:
we don't need the batch limit check here. we're not adding request ids for
heartbeats
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -192,11 +205,35 @@ public void refreshActiveWork(Map<String,
List<KeyedGetDataRequest>> active) {
.addRequests(request));
}
}
+ for (Map.Entry<String, List<HeartbeatRequest>> entry :
heartbeats.entrySet()) {
+ for (HeartbeatRequest request : entry.getValue()) {
+ // Calculate the bytes with some overhead for proto encoding.
+ long bytes = (long) entry.getKey().length() +
request.getSerializedSize() + 10;
+ if (builderBytes > 0
+ && (builderBytes + bytes >
AbstractWindmillStream.RPC_STREAM_CHUNK_SIZE
+ || builder.getRequestIdCount() >= streamingRpcBatchLimit)) {
+ send(builder.build());
+ builderBytes = 0;
+ builder.clear();
+ }
+ builderBytes += bytes;
+ builder.addComputationHeartbeatRequest(
+ ComputationHeartbeatRequest.newBuilder()
+ .setComputationId(entry.getKey())
Review Comment:
this adds a new computationheartbeatrequest for each heartbeat, we should
just have one per computation with many heartbeats inside them (add test to
catch that).
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -120,6 +123,42 @@ synchronized ActivateWorkResult
activateWorkForKey(ShardedKey shardedKey, Work w
return ActivateWorkResult.QUEUED;
}
+ public static class FailedTokens {
+ public long workToken;
+ public long cacheToken;
+
+ public FailedTokens(long workToken, long cacheToken) {
+ this.workToken = workToken;
+ this.cacheToken = cacheToken;
+ }
+ }
+
+ synchronized void failWorkForKey(Map<Long, List<FailedTokens>> failedWork) {
+ // Note we can't construct a ShardedKey and look it up in activeWork
directly since
Review Comment:
we could change activeMap to a multimap keyed by sharding_key.
Lookup would have to find all entries with same sharding_key and compare
user key, but we don't expect collisions.
could also keep map keyed by sharding_key and use the existing queue. We'd
want to change the queueing logic to support possibly different keys though.
But I don't think serializing procesisng in the case of such a collision is a
real problem.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1864,6 +1890,22 @@ private void sendWorkerUpdatesToDataflowService(
}
}
+ public void
handleHeartbeatResponses(List<Windmill.ComputationHeartbeatResponse> responses)
{
+ for (Windmill.ComputationHeartbeatResponse computationHeartbeatResponse :
responses) {
+ Map<Long, List<FailedTokens>> failedWork = new HashMap<>();
+ for (Windmill.HeartbeatResponse heartbeatResponse :
+ computationHeartbeatResponse.getHeartbeatResponsesList()) {
+ failedWork.putIfAbsent(heartbeatResponse.getShardingKey(), new
ArrayList<>());
+ failedWork
+ .get(heartbeatResponse.getShardingKey())
+ .add(
+ new FailedTokens(
+ heartbeatResponse.getWorkToken(),
heartbeatResponse.getCacheToken()));
+ }
+
computationMap.get(computationHeartbeatResponse.getComputationId()).failWork(failedWork);
Review Comment:
could get nullptr here if computaitonid isn't in the map
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -150,9 +159,13 @@ public void start(
@Nullable Instant synchronizedProcessingTime,
WindmillStateReader stateReader,
SideInputStateFetcher sideInputStateFetcher,
- Windmill.WorkItemCommitRequest.Builder outputBuilder) {
+ Windmill.WorkItemCommitRequest.Builder outputBuilder,
+ @Nullable Supplier<Boolean> workFailed) {
this.key = key;
this.work = work;
+ if (workFailed != null) {
+ this.workIsFailed = workFailed;
+ }
Review Comment:
this shoudl clear the existing value in the else case, otherwise it could be
examining previous work item failure state.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -946,6 +953,10 @@ private void process(
final Windmill.WorkItem workItem = work.getWorkItem();
final String computationId = computationState.getComputationId();
final ByteString key = workItem.getKey();
+ if (work.isFailed()) {
+ LOG.debug("Not processing failed work for {}:\n{}", computationId, work);
+ return;
Review Comment:
I think just returning will leave things in a possibly odd state. I think
that
computationState.completeWorkAndScheduleNextWorkForKey(
needs to be called at least.
Perhaps we could move this down to just where we would do the execution and
throw the special exception then instead of executing. I don't think the work
saved getting to that point is that expensive and that makes the error more
common to existing processing errors.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -120,6 +123,42 @@ synchronized ActivateWorkResult
activateWorkForKey(ShardedKey shardedKey, Work w
return ActivateWorkResult.QUEUED;
}
+ public static class FailedTokens {
+ public long workToken;
+ public long cacheToken;
+
+ public FailedTokens(long workToken, long cacheToken) {
+ this.workToken = workToken;
+ this.cacheToken = cacheToken;
+ }
+ }
+
+ synchronized void failWorkForKey(Map<Long, List<FailedTokens>> failedWork) {
+ // Note we can't construct a ShardedKey and look it up in activeWork
directly since
+ // HeartbeatResponse doesn't include the user key.
+ for (Entry<ShardedKey, Deque<Work>> entry : activeWork.entrySet()) {
+ List<FailedTokens> failedTokens =
failedWork.get(entry.getKey().shardingKey());
+ if (failedTokens == null) continue;
+ for (FailedTokens failedToken : failedTokens) {
+ for (Work queuedWork : entry.getValue()) {
+ WorkItem workItem = queuedWork.getWorkItem();
+ if (workItem.getWorkToken() == failedToken.workToken
+ && workItem.getCacheToken() == failedToken.cacheToken) {
+ LOG.error(
+ "failing work "
+ + entry.getKey().shardingKey()
Review Comment:
include computation
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1864,6 +1890,22 @@ private void sendWorkerUpdatesToDataflowService(
}
}
+ public void
handleHeartbeatResponses(List<Windmill.ComputationHeartbeatResponse> responses)
{
+ for (Windmill.ComputationHeartbeatResponse computationHeartbeatResponse :
responses) {
+ Map<Long, List<FailedTokens>> failedWork = new HashMap<>();
+ for (Windmill.HeartbeatResponse heartbeatResponse :
+ computationHeartbeatResponse.getHeartbeatResponsesList()) {
+ failedWork.putIfAbsent(heartbeatResponse.getShardingKey(), new
ArrayList<>());
+ failedWork
+ .get(heartbeatResponse.getShardingKey())
Review Comment:
putIfAbsent returns the value so you can avoid this get
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1873,14 +1915,20 @@ private void sendWorkerUpdatesToDataflowService(
*/
private void refreshActiveWork() {
Map<String, List<Windmill.KeyedGetDataRequest>> active = new HashMap<>();
+ Map<String, List<Windmill.HeartbeatRequest>> heartbeats = new HashMap<>();
Instant refreshDeadline =
clock.get().minus(Duration.millis(options.getActiveWorkRefreshPeriodMillis()));
for (Map.Entry<String, ComputationState> entry :
computationMap.entrySet()) {
- active.put(entry.getKey(),
entry.getValue().getKeysToRefresh(refreshDeadline, sampler));
+ if (windmillServiceEnabled
+ && DataflowRunner.hasExperiment(options,
"send_new_heartbeat_requests")) {
+ heartbeats.put(entry.getKey(),
entry.getValue().getKeyHeartbeats(refreshDeadline, sampler));
+ } else {
+ active.put(entry.getKey(),
entry.getValue().getKeysToRefresh(refreshDeadline, sampler));
Review Comment:
since it is the same information, can the switch on which type of request is
sent to windmill be moved into the actual rpc layer? Then you could just use
the heartbeat objects internally and covert there if neeeded.
I think that woudl be simpler than the somewhat duplicated methods and
params and keeps it more self-contained.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateReader.java:
##########
@@ -404,6 +415,9 @@ public void performReads() {
private KeyedGetDataResponse tryGetDataFromWindmill(HashSet<StateTag<?>>
stateTags)
throws Exception {
+ if (workItemIsFailed.get()) {
+ throw new WorkItemFailedException("Windmill failed work item.");
Review Comment:
since the exception name captures it's failed, perhaps text woudl be better
to indicate location
"Skipping state read as work item is no longer valid in backend"
##########
runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto:
##########
@@ -784,6 +827,17 @@ message StreamingGetDataResponse {
repeated bytes serialized_response = 2;
// Remaining bytes field applies only to the last serialized_response
optional int64 remaining_bytes_for_response = 3;
+
+ // Work item fingerprints available only for complete items. So if
+ // remaining_bytes_for_response > 0 the size of farmhash_fingerprint array is
+ // one less than the size of serialized_response as the fingerprint for the
+ // last message is not yet available.
+ // Will not be populated when heartbeat_response is set.
+ repeated uint64 farmhash_fingerprint = 4;
Review Comment:
I'd say omit it, and either reserve or just skip the #.
Since it's matching the internal proto, I'm not really that worried someone
will add a bad field # here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]