kennknowles commented on a change in pull request #12155:
URL: https://github.com/apache/beam/pull/12155#discussion_r450454617
##########
File path:
runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
##########
@@ -1110,17 +1109,7 @@ public void updateWatermarks(
pendingUpdates.offer(
PendingWatermarkUpdate.create(
executable, completed, timerUpdate, unprocessedInputs, outputs,
earliestHold));
- tryApplyPendingUpdates();
- }
-
- private void tryApplyPendingUpdates() {
- if (refreshLock.tryLock()) {
- try {
- applyNUpdates(MAX_INCREMENTAL_UPDATES);
- } finally {
- refreshLock.unlock();
- }
- }
Review comment:
Updates can only move watermarks forward. I think this is just a rate
limiting effect. This is from #1287. I believe it is correct. Whenever a bundle
is completed, it can allow the watermark to move forward - but not past other
holds. In this way, adding updates does not need a shared lock with applying
and removing updates.
##########
File path:
runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java
##########
@@ -70,6 +71,8 @@ public static ExecutionDriver create(
private final Map<AppliedPTransform<?, ?, ?>,
ConcurrentLinkedQueue<CommittedBundle<?>>>
pendingRootBundles;
private final Queue<WorkUpdate> pendingWork = new ConcurrentLinkedQueue<>();
+ private final Map<AppliedPTransform<?, ?, ?>,
Collection<CommittedBundle<?>>> inflightBundles =
+ new ConcurrentHashMap<>();
Review comment:
The watermark should be held by all timestamps in the bundle, until the
bundle is completed and committed.
Can you move your GitHub review comment into a code comment that explains
this field?
##########
File path:
runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
##########
@@ -128,19 +108,6 @@ public void cleanup() throws Exception {
final DoFn<KV<K, InputT>, OutputT> doFn =
application.getTransform().getDoFn();
final DoFnSignature signature =
DoFnSignatures.getSignature(doFn.getClass());
- // If the DoFn is stateful, schedule state clearing.
- // It is semantically correct to schedule any number of redundant clear
tasks; the
- // cache is used to limit the number of tasks to avoid performance
degradation.
- if (signature.stateDeclarations().size() > 0) {
- for (final WindowedValue<?> element : inputBundle.getElements()) {
- for (final BoundedWindow window : element.getWindows()) {
- cleanupRegistry.get(
- AppliedPTransformOutputKeyAndWindow.create(
- application, (StructuralKey<K>) inputBundle.getKey(),
window));
- }
- }
- }
-
Review comment:
Please do make it a separate commit. Because when reading code history
it will make more sense. Also single commits can be rolled back. it is fine to
have it in the same PR in my opinion.
I agree that this seems redundant with
https://github.com/apache/beam/blob/c3d1e5d71ada0097a48d235cb717c61f63b5eb80/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java#L113
This code I think predates `StatefulDoFnRunner` so perhaps this was missed
when it was ported to use `StatefulDoFnRunner`. (I could be wrong; I did not
check the git history)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]