scwhittle commented on code in PR #29963:
URL: https://github.com/apache/beam/pull/29963#discussion_r1461551044
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1864,6 +1889,25 @@ private void sendWorkerUpdatesToDataflowService(
}
}
+ public void
handleHeartbeatResponses(List<Windmill.ComputationHeartbeatResponse> responses)
{
+ for (Windmill.ComputationHeartbeatResponse computationHeartbeatResponse :
responses) {
+ Map<Long, List<FailedTokens>> failedWork = new HashMap<>();
+ for (Windmill.HeartbeatResponse heartbeatResponse :
+ computationHeartbeatResponse.getHeartbeatResponsesList()) {
+ if (heartbeatResponse.getFailed()) {
+ List<FailedTokens> keyTokens =
+ failedWork.putIfAbsent(heartbeatResponse.getShardingKey(), new
ArrayList<>());
+ if (keyTokens == null) keyTokens =
failedWork.get(heartbeatResponse.getShardingKey());
Review Comment:
nit: computeIfAbsent returns the existing or calculated value (not the
existing value like putIfAbsent) so if you use that instead you can remove
this. It also avoids constructing ArrayList if there is one already.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkItemFailedException.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+/** Indicates that the work item was failed and should not be retried. */
+@SuppressWarnings({
+ "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class WorkItemFailedException extends RuntimeException {
+ public WorkItemFailedException(String key) {
+ super("Work item failed for key " + key);
Review Comment:
Keys can be large. I think that instead we should just log sharding key
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -120,6 +122,45 @@ synchronized ActivateWorkResult
activateWorkForKey(ShardedKey shardedKey, Work w
return ActivateWorkResult.QUEUED;
}
+ public static class FailedTokens {
+ public long workToken;
Review Comment:
final
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1872,15 +1916,19 @@ private void sendWorkerUpdatesToDataflowService(
* StreamingDataflowWorkerOptions#getActiveWorkRefreshPeriodMillis}.
*/
private void refreshActiveWork() {
- Map<String, List<Windmill.KeyedGetDataRequest>> active = new HashMap<>();
+ Map<String, List<Windmill.HeartbeatRequest>> heartbeats = new HashMap<>();
Instant refreshDeadline =
clock.get().minus(Duration.millis(options.getActiveWorkRefreshPeriodMillis()));
+ boolean sendKeyedGetDataRequests =
Review Comment:
could this be moved into the stub itself instead of a paramter on
refreshActiveWork?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -496,17 +497,23 @@ public static void main(String[] args) throws Exception {
public static StreamingDataflowWorker fromDataflowWorkerHarnessOptions(
DataflowWorkerHarnessOptions options) throws IOException {
- return new StreamingDataflowWorker(
- Collections.emptyList(),
- IntrinsicMapTaskExecutorFactory.defaultFactory(),
- new DataflowWorkUnitClient(options, LOG),
- options.as(StreamingDataflowWorkerOptions.class),
- true,
- new HotKeyLogger(),
- Instant::now,
- (threadName) ->
- Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder().setNameFormat(threadName).build()));
+ StreamingDataflowWorker worker =
+ new StreamingDataflowWorker(
+ Collections.emptyList(),
+ IntrinsicMapTaskExecutorFactory.defaultFactory(),
+ new DataflowWorkUnitClient(options, LOG),
+ options.as(StreamingDataflowWorkerOptions.class),
+ true,
+ new HotKeyLogger(),
+ Instant::now,
+ (threadName) ->
+ Executors.newSingleThreadScheduledExecutor(
+ new
ThreadFactoryBuilder().setNameFormat(threadName).build()));
+ options
+ .as(StreamingDataflowWorkerOptions.class)
+ .getWindmillServerStub()
+ .setProcessHeartbeatResponses(worker::handleHeartbeatResponses);
Review Comment:
can this be done in the StreamingDataflowWorker constructor instead? Then if
a stub is injected another way, it is still wired up appropriately (and avoids
grabbing from options twice)
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1864,6 +1889,25 @@ private void sendWorkerUpdatesToDataflowService(
}
}
+ public void
handleHeartbeatResponses(List<Windmill.ComputationHeartbeatResponse> responses)
{
+ for (Windmill.ComputationHeartbeatResponse computationHeartbeatResponse :
responses) {
+ Map<Long, List<FailedTokens>> failedWork = new HashMap<>();
Review Comment:
add comment, maps from sharding_key to failed tokens for the key.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -120,6 +122,45 @@ synchronized ActivateWorkResult
activateWorkForKey(ShardedKey shardedKey, Work w
return ActivateWorkResult.QUEUED;
}
+ public static class FailedTokens {
+ public long workToken;
+ public long cacheToken;
+
+ public FailedTokens(long workToken, long cacheToken) {
+ this.workToken = workToken;
+ this.cacheToken = cacheToken;
+ }
+ }
+
+ synchronized void failWorkForKey(Map<Long, List<FailedTokens>> failedWork) {
Review Comment:
comment on what the key is
##########
runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto:
##########
@@ -91,6 +91,7 @@ message LatencyAttribution {
// GetWorkResponse and ends with the user worker receiving the
// GetWorkResponse.
GET_WORK_IN_TRANSIT_TO_USER_WORKER = 7;
+ FAILED = 8;
Review Comment:
remove? we won't actually send more heartbeats or a commit to communicate
this
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java:
##########
@@ -180,6 +182,10 @@ private static LatencyAttribution.Builder
addActiveLatencyBreakdownToBuilder(
return builder;
}
+ public boolean isFailed() {
+ return currentState.state() == Work.State.FAILED;
Review Comment:
What if instead we should have a bool for failed instead of a new state type.
The latency analysis probably doesn't make sense for failed and it could
help with debugging to see what state the item is in if it is failed and not
cancelling.
Then we can still allow state transitions after failing as well. Since we
expose getState() it seems unintuitive that setState doesn't change the state
if there are asserts on expected state (for example READ state if going to
transition back to processing after read completes).
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -171,30 +182,72 @@ public GlobalData requestGlobalData(GlobalDataRequest
request) {
}
@Override
- public void refreshActiveWork(Map<String, List<KeyedGetDataRequest>> active)
{
- long builderBytes = 0;
+ public void refreshActiveWork(
+ Map<String, List<HeartbeatRequest>> heartbeats, boolean
sendKeyedGetDataRequests) {
StreamingGetDataRequest.Builder builder =
StreamingGetDataRequest.newBuilder();
- for (Map.Entry<String, List<KeyedGetDataRequest>> entry :
active.entrySet()) {
- for (KeyedGetDataRequest request : entry.getValue()) {
- // Calculate the bytes with some overhead for proto encoding.
- long bytes = (long) entry.getKey().length() +
request.getSerializedSize() + 10;
- if (builderBytes > 0
- && (builderBytes + bytes >
AbstractWindmillStream.RPC_STREAM_CHUNK_SIZE
- || builder.getRequestIdCount() >= streamingRpcBatchLimit)) {
- send(builder.build());
- builderBytes = 0;
- builder.clear();
+ if (sendKeyedGetDataRequests) {
+ long builderBytes = 0;
+ for (Map.Entry<String, List<HeartbeatRequest>> entry :
heartbeats.entrySet()) {
+ for (HeartbeatRequest request : entry.getValue()) {
+ // Calculate the bytes with some overhead for proto encoding.
+ long bytes = (long) entry.getKey().length() +
request.getSerializedSize() + 10;
+ if (builderBytes > 0
+ && (builderBytes + bytes >
AbstractWindmillStream.RPC_STREAM_CHUNK_SIZE
+ || builder.getRequestIdCount() >= streamingRpcBatchLimit)) {
+ send(builder.build());
+ builderBytes = 0;
+ builder.clear();
+ }
+ builderBytes += bytes;
+ builder.addStateRequest(
+ ComputationGetDataRequest.newBuilder()
+ .setComputationId(entry.getKey())
+ .addRequests(
+ Windmill.KeyedGetDataRequest.newBuilder()
Review Comment:
would be nice to share this conversion with the appliance path.
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java:
##########
@@ -304,17 +319,41 @@ public Windmill.GlobalData
requestGlobalData(Windmill.GlobalDataRequest request)
}
@Override
- public void refreshActiveWork(Map<String, List<KeyedGetDataRequest>>
active) {
+ public void refreshActiveWork(
+ Map<String, List<HeartbeatRequest>> heartbeats, boolean
sendKeyedGetDataRequests) {
Windmill.GetDataRequest.Builder builder =
Windmill.GetDataRequest.newBuilder();
- for (Map.Entry<String, List<KeyedGetDataRequest>> entry :
active.entrySet()) {
- builder.addRequests(
- ComputationGetDataRequest.newBuilder()
- .setComputationId(entry.getKey())
- .addAllRequests(entry.getValue()));
+ if (sendKeyedGetDataRequests) {
+ for (Map.Entry<String, List<HeartbeatRequest>> entry :
heartbeats.entrySet()) {
Review Comment:
I would remove this conversion, it is just testing this version of the
conversion in the fake.
If you remove the boolean parameter as suggested in other comment it's more
obviously unnecessary.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -50,7 +52,7 @@
* activate, queue, and complete {@link Work} (including invalidating stuck
{@link Work}).
*/
@ThreadSafe
-final class ActiveWorkState {
+public final class ActiveWorkState {
Review Comment:
mark with `@Internal` which means we can change it later
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkItemFailedException.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+/** Indicates that the work item was failed and should not be retried. */
+@SuppressWarnings({
+ "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class WorkItemFailedException extends RuntimeException {
Review Comment:
Failed is a little generic, should it WorkItemAbortedException
WorkItemCancelledException to indicate we chose to stop processing it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]