arunpandianp commented on code in PR #38814:
URL: https://github.com/apache/beam/pull/38814#discussion_r3431487658
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java:
##########
@@ -248,46 +246,39 @@ private void processWork(
}
private void processWork(
- ComputationState computationState, Work work,
BoundedQueueExecutorWorkHandle unusedHandle) {
+ ComputationState computationState, Work work,
BoundedQueueExecutorWorkHandle handle) {
Windmill.WorkItem workItem = work.getWorkItem();
String computationId = computationState.getComputationId();
- ByteString key = workItem.getKey();
work.setProcessingThreadName(Thread.currentThread().getName());
work.setState(Work.State.PROCESSING);
setUpWorkLoggingContext(work.getLatencyTrackingId(), computationId);
LOG.debug("Starting processing for {}:\n{}", computationId, work);
if (workItem.getSourceState().getOnlyFinalize()) {
- Windmill.WorkItemCommitRequest.Builder outputBuilder =
initializeOutputBuilder(key, workItem);
-
outputBuilder.setSourceStateUpdates(Windmill.SourceState.newBuilder().setOnlyFinalize(true));
- work.setState(Work.State.COMMIT_QUEUED);
- work.queueCommit(outputBuilder.build(), computationState);
+ handleOnlyFinalize(computationState, work, workItem);
return;
}
long processingStartTimeNanos = System.nanoTime();
- MapTask mapTask = computationState.getMapTask();
- StageInfo stageInfo =
- stageInfoMap.computeIfAbsent(
- mapTask.getStageName(), s -> StageInfo.create(s,
mapTask.getSystemName()));
+ StageInfo stageInfo = getStageInfo(computationState);
+ List<Work> workBatch = null;
Review Comment:
Checker framework does "flow-sensitive type refinement" and determines that
workBatch is Nullable here. The usages are null safe, so it didn't complain.
Added Nullable to make it explicit.
credits: gemini
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]