scwhittle commented on code in PR #29082:
URL: https://github.com/apache/beam/pull/29082#discussion_r1384859967
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -107,10 +140,26 @@ synchronized ActivateWorkResult
activateWorkForKey(ShardedKey shardedKey, Work w
return ActivateWorkResult.EXECUTE;
}
- // Ensure we don't already have this work token queued.
- for (Work queuedWork : workQueue) {
- if (queuedWork.getWorkItem().getWorkToken() ==
work.getWorkItem().getWorkToken()) {
+ // Check to see if we have this work token queued.
+ Iterator<Work> workIterator = workQueue.iterator();
+ while (workIterator.hasNext()) {
+ Work queuedWork = workIterator.next();
+ if (queuedWork.id().equals(work.id())) {
return ActivateWorkResult.DUPLICATE;
+ } else if (queuedWork.id().cacheToken() == work.id().cacheToken()) {
+ if (work.id().workToken() > queuedWork.id().workToken()) {
+ removeIfNotActive(queuedWork, workIterator, workQueue);
+ workQueue.addLast(work);
Review Comment:
could instead continue, it might be able to remove other queued items
for example if all same cache token,
[1 active] [2 queued]
if
[3] arrives
it would currently not remove [1] since it's active but by returning we
aren't removing [2]
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -83,6 +85,33 @@ static ActiveWorkState forTesting(
return new ActiveWorkState(activeWork, computationStateCache);
}
+ private static Stream<KeyedGetDataRequest> toKeyedGetDataRequests(
Review Comment:
nit: this is just moved, but since further from context now how about a
better name capturing that this is for heartbeats
makeHeartbeatKeyedGetDataRequests?
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -2725,14 +2725,14 @@ public void testActiveWorkForShardedKeys() throws
Exception {
// Verify a different shard of key is a separate queue.
Work m4 = createMockWork(3);
- assertFalse(computationState.activateWork(key1Shard1, m4));
+ assertTrue(computationState.activateWork(key1Shard1, m4));
Review Comment:
why did this change? it seems like it should be duplicate of m3. Think this
might be related to early return I commented on in code.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -147,16 +199,19 @@ private synchronized void removeCompletedWorkFromQueue(
() ->
new IllegalStateException(
String.format(
- "Active key %s without work, expected token %d",
- shardedKey, workToken)));
+ "Active key %s without work, expected work_token
%d, expected cache_token %d",
Review Comment:
log workId instead of separate fields?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -107,10 +140,26 @@ synchronized ActivateWorkResult
activateWorkForKey(ShardedKey shardedKey, Work w
return ActivateWorkResult.EXECUTE;
}
- // Ensure we don't already have this work token queued.
- for (Work queuedWork : workQueue) {
- if (queuedWork.getWorkItem().getWorkToken() ==
work.getWorkItem().getWorkToken()) {
+ // Check to see if we have this work token queued.
+ Iterator<Work> workIterator = workQueue.iterator();
+ while (workIterator.hasNext()) {
+ Work queuedWork = workIterator.next();
+ if (queuedWork.id().equals(work.id())) {
return ActivateWorkResult.DUPLICATE;
+ } else if (queuedWork.id().cacheToken() == work.id().cacheToken()) {
+ if (work.id().workToken() > queuedWork.id().workToken()) {
+ removeIfNotActive(queuedWork, workIterator, workQueue);
+ workQueue.addLast(work);
+ return ActivateWorkResult.QUEUED;
+ } else {
+ return ActivateWorkResult.STALE;
+ }
+ } else if (queuedWork.id().workToken() == work.id().workToken()) {
+ if (queuedWork.id().cacheToken() != work.id().cacheToken()) {
Review Comment:
this is known true from previous statement in else if chain
I think the case
queuedWork.id().workToken() == work.id().workToken()
could just be removed though and fall through to the bottom. If the cache
tokens are different we're not actually sure which is the valid item in the
backend. And in that case it is safer to keep both since then we will
eventually complete the right one.
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java:
##########
@@ -58,26 +57,27 @@ private static ShardedKey shardedKey(String str, long
shardKey) {
return ShardedKey.create(ByteString.copyFromUtf8(str), shardKey);
}
- private static Work emptyWork() {
- return createWork(null);
- }
-
- private static Work createWork(@Nullable Windmill.WorkItem workItem) {
+ private static Work createWork(WorkItem workItem) {
return Work.create(workItem, Instant::now, Collections.emptyList(), unused
-> {});
}
- private static Work expiredWork(Windmill.WorkItem workItem) {
+ private static Work expiredWork(WorkItem workItem) {
return Work.create(workItem, () -> Instant.EPOCH, Collections.emptyList(),
unused -> {});
}
- private static Windmill.WorkItem createWorkItem(long workToken) {
- return Windmill.WorkItem.newBuilder()
+ private static WorkItem createWorkItem(long workToken, long cacheToken) {
+ return WorkItem.newBuilder()
.setKey(ByteString.copyFromUtf8(""))
.setShardingKey(1)
.setWorkToken(workToken)
+ .setCacheToken(cacheToken)
.build();
}
+ private static WorkId workDedupeToken(long workToken, long cacheToken) {
Review Comment:
nit: name workId?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -83,6 +85,33 @@ static ActiveWorkState forTesting(
return new ActiveWorkState(activeWork, computationStateCache);
}
+ private static Stream<KeyedGetDataRequest> toKeyedGetDataRequests(
+ ShardedKey shardedKey, Collection<Work> workQueue, Instant
refreshDeadline) {
+ return workQueue.stream()
+ .filter(work -> work.getStartTime().isBefore(refreshDeadline))
+ .map(
+ work ->
+ Windmill.KeyedGetDataRequest.newBuilder()
+ .setKey(shardedKey.key())
Review Comment:
could do separately (and possibly in other places where we make reads or
commits) since this is just moved
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]