arunpandianp commented on code in PR #38814:
URL: https://github.com/apache/beam/pull/38814#discussion_r3371045067
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java:
##########
@@ -388,86 +372,143 @@ private ExecuteWorkResult executeWork(
SideInputStateFetcher localSideInputStateFetcher =
sideInputStateFetcherFactory.createSideInputStateFetcher(work::fetchSideInput);
- // If the read output KVs, then we can decode Windmill's byte key into
userland
- // key object and provide it to the execution context for use with
per-key state.
- // Otherwise, we pass null.
- //
- // The coder type that will be present is:
- // WindowedValueCoder(TimerOrElementCoder(KvCoder))
- Optional<Coder<?>> keyCoder = computationWorkExecutor.keyCoder();
- @SuppressWarnings("deprecation")
- @Nullable
- final Object executionKey =
- !keyCoder.isPresent() ? null : keyCoder.get().decode(key.newInput(),
Coder.Context.OUTER);
-
- if (workItem.hasHotKeyInfo()) {
- Windmill.HotKeyInfo hotKeyInfo = workItem.getHotKeyInfo();
- Duration hotKeyAge = Duration.millis(hotKeyInfo.getHotKeyAgeUsec() /
1000);
-
- String stepName =
getShuffleTaskStepName(computationState.getMapTask());
- if (executionKey != null
- && (options.isHotKeyLoggingEnabled()
- || hasExperiment(options, "enable_hot_key_logging"))
- && keyCoder.isPresent()) {
- hotKeyLogger.logHotKeyDetection(stepName, hotKeyAge, executionKey);
- } else {
- hotKeyLogger.logHotKeyDetection(stepName, hotKeyAge);
- }
- }
+ StreamingModeExecutionContext.KeySwitchListener keySwitchListener =
+ createKeySwitchListener(computationState);
// Blocks while executing work.
computationWorkExecutor.executeWork(
- executionKey, work, stateReader, localSideInputStateFetcher,
outputBuilder);
+ work, stateReader, localSideInputStateFetcher, workExecutor, handle,
keySwitchListener);
- if (work.isFailed()) {
- throw new WorkItemCancelledException(workItem.getShardingKey());
+ StreamingModeExecutionContext context =
computationWorkExecutor.context();
+ if (context.workIsFailed()) {
+ throw new
WorkItemCancelledException(work.getWorkItem().getShardingKey());
}
- // Reports source bytes processed to WorkItemCommitRequest if available.
- try {
- long sourceBytesProcessed =
- computationWorkExecutor.computeSourceBytesProcessed(
- computationState.sourceBytesProcessCounterName());
- outputBuilder.setSourceBytesProcessed(sourceBytesProcessed);
- } catch (Exception e) {
- LOG.error("{}", e.toString());
- }
-
-
commitFinalizer.cacheCommitFinalizers(computationWorkExecutor.context().flushState());
+ // Retrieve executed works, output builders, and accumulated callbacks
from execution context
+ List<Work> workBatch = context.getExecutedWorks();
+ List<Windmill.WorkItemCommitRequest.Builder> outputBuilders =
context.getOutputBuilders();
+ Map<Long, Pair<Instant, Runnable>> accumulatedCallbacks =
context.getAccumulatedCallbacks();
+ context.clear();
// Release the execution state for another thread to use.
computationState.releaseComputationWorkExecutor(computationWorkExecutor);
computationWorkExecutor = null;
- work.setState(Work.State.COMMIT_QUEUED);
-
outputBuilder.addAllPerWorkItemLatencyAttributions(work.getLatencyAttributions(sampler));
-
return ExecuteWorkResult.create(
- outputBuilder, stateReader.getBytesRead() +
localSideInputStateFetcher.getBytesRead());
+ workBatch,
+ outputBuilders,
+ accumulatedCallbacks,
+ context.getStateBytesRead() +
localSideInputStateFetcher.getBytesRead());
} catch (Throwable t) {
if (computationWorkExecutor != null) {
// If processing failed due to a thrown exception, close the
executionState. Do not
// return/release the executionState back to computationState as that
will lead to this
// executionState instance being reused.
- LOG.debug("Invalidating executor after work item {} failed",
workItem.getWorkToken(), t);
+ LOG.debug(
+ "Invalidating executor after work item {} failed",
+ work.getWorkItem().getWorkToken(),
+ t);
computationWorkExecutor.invalidate();
}
-
// Re-throw the exception, it will be caught and handled by
workFailureProcessor downstream.
throw t;
}
}
+ private void handleOnlyFinalize(
+ ComputationState computationState, Work work, Windmill.WorkItem
workItem) {
+ Windmill.WorkItemCommitRequest.Builder outputBuilder =
+ initializeOutputBuilder(workItem.getKey(), workItem);
+
outputBuilder.setSourceStateUpdates(Windmill.SourceState.newBuilder().setOnlyFinalize(true));
+ work.setState(Work.State.COMMIT_QUEUED);
+ work.queueCommit(outputBuilder.build(), computationState);
+ }
+
+ private StageInfo getStageInfo(ComputationState computationState) {
+ MapTask mapTask = computationState.getMapTask();
+ return stageInfoMap.computeIfAbsent(
+ mapTask.getStageName(), s -> StageInfo.create(s,
mapTask.getSystemName()));
+ }
+
+ private void commitWorkBatch(
+ ComputationState computationState,
+ List<Work> workBatch,
+ List<Windmill.WorkItemCommitRequest.Builder> outputBuilders) {
+ Preconditions.checkState(
+ workBatch.size() == 1, "Expected single-key work batch, got: " +
workBatch.size());
+ commitSingleKeyWork(computationState, workBatch.get(0),
outputBuilders.get(0));
+ }
+
+ private void commitSingleKeyWork(
+ ComputationState computationState,
+ Work work,
+ Windmill.WorkItemCommitRequest.Builder commitRequestBuilder) {
+ // Validate the commit request, possibly requesting truncation if the
commitSize is too large.
+ Windmill.WorkItemCommitRequest validatedCommitRequest =
+ validateCommitRequestSize(
+ commitRequestBuilder.build(), computationState.getComputationId(),
work.getWorkItem());
+ work.setState(Work.State.COMMIT_QUEUED);
+ validatedCommitRequest =
+ validatedCommitRequest
+ .toBuilder()
+
.addAllPerWorkItemLatencyAttributions(work.getLatencyAttributions(sampler))
+ .build();
+ work.queueCommit(validatedCommitRequest, computationState);
+ }
+
+ private void recordProcessingTime(
+ StageInfo stageInfo,
+ @Nullable List<Work> worksToCleanup,
+ Work work,
+ long processingStartTimeNanos) {
+ // Update total processing time counters. Updating in finally clause
ensures that
+ // work items causing exceptions are also accounted in time spent.
+ long processingTimeMsecs =
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
processingStartTimeNanos);
+ stageInfo.totalProcessingMsecs().addValue(processingTimeMsecs);
+ if (anyWorkHasTimers(worksToCleanup, work)) {
+ // Attribute all the processing to timers if the work item contains any
timers.
+ // Tests show that work items rarely contain both timers and message
bundles. It should
+ // be a fairly close approximation.
+ // Another option: Derive time split between messages and timers based
on recent totals.
+ // either here or in DFE.
+ stageInfo.timerProcessingMsecs().addValue(processingTimeMsecs);
+ }
+ }
+
+ private static boolean anyWorkHasTimers(@Nullable List<Work> works, Work
primaryWork) {
+ if (works != null && !works.isEmpty()) {
+ return works.stream().anyMatch(w -> w.getWorkItem().hasTimers());
+ }
+ return primaryWork.getWorkItem().hasTimers();
+ }
+
+ private StreamingModeExecutionContext.KeySwitchListener
createKeySwitchListener(
+ ComputationState computationState) {
+ return (oldWork, newWork) -> {
+ resetWorkLoggingContext();
+ setUpWorkLoggingContext(newWork.getLatencyTrackingId(),
computationState.getComputationId());
+ newWork.setProcessingThreadName(Thread.currentThread().getName());
Review Comment:
done.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java:
##########
@@ -388,86 +372,143 @@ private ExecuteWorkResult executeWork(
SideInputStateFetcher localSideInputStateFetcher =
sideInputStateFetcherFactory.createSideInputStateFetcher(work::fetchSideInput);
- // If the read output KVs, then we can decode Windmill's byte key into
userland
- // key object and provide it to the execution context for use with
per-key state.
- // Otherwise, we pass null.
- //
- // The coder type that will be present is:
- // WindowedValueCoder(TimerOrElementCoder(KvCoder))
- Optional<Coder<?>> keyCoder = computationWorkExecutor.keyCoder();
- @SuppressWarnings("deprecation")
- @Nullable
- final Object executionKey =
- !keyCoder.isPresent() ? null : keyCoder.get().decode(key.newInput(),
Coder.Context.OUTER);
-
- if (workItem.hasHotKeyInfo()) {
- Windmill.HotKeyInfo hotKeyInfo = workItem.getHotKeyInfo();
- Duration hotKeyAge = Duration.millis(hotKeyInfo.getHotKeyAgeUsec() /
1000);
-
- String stepName =
getShuffleTaskStepName(computationState.getMapTask());
- if (executionKey != null
- && (options.isHotKeyLoggingEnabled()
- || hasExperiment(options, "enable_hot_key_logging"))
- && keyCoder.isPresent()) {
- hotKeyLogger.logHotKeyDetection(stepName, hotKeyAge, executionKey);
- } else {
- hotKeyLogger.logHotKeyDetection(stepName, hotKeyAge);
- }
- }
+ StreamingModeExecutionContext.KeySwitchListener keySwitchListener =
+ createKeySwitchListener(computationState);
// Blocks while executing work.
computationWorkExecutor.executeWork(
- executionKey, work, stateReader, localSideInputStateFetcher,
outputBuilder);
+ work, stateReader, localSideInputStateFetcher, workExecutor, handle,
keySwitchListener);
- if (work.isFailed()) {
- throw new WorkItemCancelledException(workItem.getShardingKey());
+ StreamingModeExecutionContext context =
computationWorkExecutor.context();
+ if (context.workIsFailed()) {
+ throw new
WorkItemCancelledException(work.getWorkItem().getShardingKey());
}
- // Reports source bytes processed to WorkItemCommitRequest if available.
- try {
- long sourceBytesProcessed =
- computationWorkExecutor.computeSourceBytesProcessed(
- computationState.sourceBytesProcessCounterName());
- outputBuilder.setSourceBytesProcessed(sourceBytesProcessed);
- } catch (Exception e) {
- LOG.error("{}", e.toString());
- }
-
-
commitFinalizer.cacheCommitFinalizers(computationWorkExecutor.context().flushState());
+ // Retrieve executed works, output builders, and accumulated callbacks
from execution context
+ List<Work> workBatch = context.getExecutedWorks();
+ List<Windmill.WorkItemCommitRequest.Builder> outputBuilders =
context.getOutputBuilders();
+ Map<Long, Pair<Instant, Runnable>> accumulatedCallbacks =
context.getAccumulatedCallbacks();
+ context.clear();
// Release the execution state for another thread to use.
computationState.releaseComputationWorkExecutor(computationWorkExecutor);
computationWorkExecutor = null;
- work.setState(Work.State.COMMIT_QUEUED);
-
outputBuilder.addAllPerWorkItemLatencyAttributions(work.getLatencyAttributions(sampler));
-
return ExecuteWorkResult.create(
- outputBuilder, stateReader.getBytesRead() +
localSideInputStateFetcher.getBytesRead());
+ workBatch,
+ outputBuilders,
+ accumulatedCallbacks,
+ context.getStateBytesRead() +
localSideInputStateFetcher.getBytesRead());
} catch (Throwable t) {
if (computationWorkExecutor != null) {
// If processing failed due to a thrown exception, close the
executionState. Do not
// return/release the executionState back to computationState as that
will lead to this
// executionState instance being reused.
- LOG.debug("Invalidating executor after work item {} failed",
workItem.getWorkToken(), t);
+ LOG.debug(
+ "Invalidating executor after work item {} failed",
+ work.getWorkItem().getWorkToken(),
+ t);
computationWorkExecutor.invalidate();
}
-
// Re-throw the exception, it will be caught and handled by
workFailureProcessor downstream.
throw t;
}
}
+ private void handleOnlyFinalize(
+ ComputationState computationState, Work work, Windmill.WorkItem
workItem) {
+ Windmill.WorkItemCommitRequest.Builder outputBuilder =
+ initializeOutputBuilder(workItem.getKey(), workItem);
+
outputBuilder.setSourceStateUpdates(Windmill.SourceState.newBuilder().setOnlyFinalize(true));
+ work.setState(Work.State.COMMIT_QUEUED);
+ work.queueCommit(outputBuilder.build(), computationState);
+ }
+
+ private StageInfo getStageInfo(ComputationState computationState) {
+ MapTask mapTask = computationState.getMapTask();
+ return stageInfoMap.computeIfAbsent(
+ mapTask.getStageName(), s -> StageInfo.create(s,
mapTask.getSystemName()));
+ }
+
+ private void commitWorkBatch(
+ ComputationState computationState,
+ List<Work> workBatch,
+ List<Windmill.WorkItemCommitRequest.Builder> outputBuilders) {
+ Preconditions.checkState(
+ workBatch.size() == 1, "Expected single-key work batch, got: " +
workBatch.size());
+ commitSingleKeyWork(computationState, workBatch.get(0),
outputBuilders.get(0));
+ }
+
+ private void commitSingleKeyWork(
+ ComputationState computationState,
+ Work work,
+ Windmill.WorkItemCommitRequest.Builder commitRequestBuilder) {
+ // Validate the commit request, possibly requesting truncation if the
commitSize is too large.
+ Windmill.WorkItemCommitRequest validatedCommitRequest =
+ validateCommitRequestSize(
+ commitRequestBuilder.build(), computationState.getComputationId(),
work.getWorkItem());
+ work.setState(Work.State.COMMIT_QUEUED);
+ validatedCommitRequest =
+ validatedCommitRequest
+ .toBuilder()
+
.addAllPerWorkItemLatencyAttributions(work.getLatencyAttributions(sampler))
+ .build();
+ work.queueCommit(validatedCommitRequest, computationState);
+ }
+
+ private void recordProcessingTime(
+ StageInfo stageInfo,
+ @Nullable List<Work> worksToCleanup,
+ Work work,
+ long processingStartTimeNanos) {
+ // Update total processing time counters. Updating in finally clause
ensures that
+ // work items causing exceptions are also accounted in time spent.
+ long processingTimeMsecs =
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
processingStartTimeNanos);
+ stageInfo.totalProcessingMsecs().addValue(processingTimeMsecs);
+ if (anyWorkHasTimers(worksToCleanup, work)) {
+ // Attribute all the processing to timers if the work item contains any
timers.
+ // Tests show that work items rarely contain both timers and message
bundles. It should
+ // be a fairly close approximation.
+ // Another option: Derive time split between messages and timers based
on recent totals.
+ // either here or in DFE.
+ stageInfo.timerProcessingMsecs().addValue(processingTimeMsecs);
+ }
+ }
+
+ private static boolean anyWorkHasTimers(@Nullable List<Work> works, Work
primaryWork) {
+ if (works != null && !works.isEmpty()) {
+ return works.stream().anyMatch(w -> w.getWorkItem().hasTimers());
+ }
+ return primaryWork.getWorkItem().hasTimers();
+ }
+
+ private StreamingModeExecutionContext.KeySwitchListener
createKeySwitchListener(
+ ComputationState computationState) {
+ return (oldWork, newWork) -> {
+ resetWorkLoggingContext();
+ setUpWorkLoggingContext(newWork.getLatencyTrackingId(),
computationState.getComputationId());
+ newWork.setProcessingThreadName(Thread.currentThread().getName());
+ oldWork.setProcessingThreadName("");
+ };
+ }
+
@AutoValue
abstract static class ExecuteWorkResult {
-
- private static ExecuteWorkResult create(
- Windmill.WorkItemCommitRequest.Builder commitWorkRequest, long
stateBytesRead) {
+ static ExecuteWorkResult create(
+ List<Work> workBatch,
+ List<Windmill.WorkItemCommitRequest.Builder> outputBuilders,
+ Map<Long, Pair<Instant, Runnable>> accumulatedCallbacks,
+ long stateBytesRead) {
return new AutoValue_StreamingWorkScheduler_ExecuteWorkResult(
- commitWorkRequest, stateBytesRead);
+ workBatch, outputBuilders, accumulatedCallbacks, stateBytesRead);
}
- abstract Windmill.WorkItemCommitRequest.Builder commitWorkRequest();
+ abstract List<Work> workBatch();
+
+ abstract List<Windmill.WorkItemCommitRequest.Builder> outputBuilders();
+
+ abstract Map<Long, Pair<Instant, Runnable>> accumulatedCallbacks();
Review Comment:
done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]