m-trieu commented on code in PR #30312:
URL: https://github.com/apache/beam/pull/30312#discussion_r1508322579
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -1396,163 +1392,36 @@ private WorkItemCommitRequest
buildWorkItemTruncationRequest(
return outputBuilder.build();
}
- private void commitLoop() {
- Map<ComputationState, Windmill.ComputationCommitWorkRequest.Builder>
computationRequestMap =
- new HashMap<>();
- while (running.get()) {
- computationRequestMap.clear();
- Windmill.CommitWorkRequest.Builder commitRequestBuilder =
- Windmill.CommitWorkRequest.newBuilder();
- long commitBytes = 0;
- // Block until we have a commit, then batch with additional commits.
- Commit commit = null;
- try {
- commit = commitQueue.take();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- continue;
- }
- while (commit != null) {
- ComputationState computationState = commit.computationState();
- commit.work().setState(Work.State.COMMITTING);
- Windmill.ComputationCommitWorkRequest.Builder
computationRequestBuilder =
- computationRequestMap.get(computationState);
- if (computationRequestBuilder == null) {
- computationRequestBuilder =
commitRequestBuilder.addRequestsBuilder();
-
computationRequestBuilder.setComputationId(computationState.getComputationId());
- computationRequestMap.put(computationState,
computationRequestBuilder);
- }
- computationRequestBuilder.addRequests(commit.request());
- // Send the request if we've exceeded the bytes or there is no more
- // pending work. commitBytes is a long, so this cannot overflow.
- commitBytes += commit.getSize();
- if (commitBytes >= TARGET_COMMIT_BUNDLE_BYTES) {
- break;
- }
- commit = commitQueue.poll();
- }
- Windmill.CommitWorkRequest commitRequest = commitRequestBuilder.build();
- LOG.trace("Commit: {}", commitRequest);
- activeCommitBytes.addAndGet(commitBytes);
- windmillServer.commitWork(commitRequest);
- activeCommitBytes.addAndGet(-commitBytes);
- for (Map.Entry<ComputationState,
Windmill.ComputationCommitWorkRequest.Builder> entry :
- computationRequestMap.entrySet()) {
- ComputationState computationState = entry.getKey();
- for (Windmill.WorkItemCommitRequest workRequest :
entry.getValue().getRequestsList()) {
- computationState.completeWorkAndScheduleNextWorkForKey(
- ShardedKey.create(workRequest.getKey(),
workRequest.getShardingKey()),
- WorkId.builder()
- .setCacheToken(workRequest.getCacheToken())
- .setWorkToken(workRequest.getWorkToken())
- .build());
- }
- }
- }
- }
-
- // Adds the commit to the commitStream if it fits, returning true iff it is
consumed.
- private boolean addCommitToStream(Commit commit, CommitWorkStream
commitStream) {
- Preconditions.checkNotNull(commit);
- final ComputationState state = commit.computationState();
- final Windmill.WorkItemCommitRequest request = commit.request();
- // Drop commits for failed work. Such commits will be dropped by Windmill
anyway.
- if (commit.work().isFailed()) {
- readerCache.invalidateReader(
- WindmillComputationKey.create(
- state.getComputationId(), request.getKey(),
request.getShardingKey()));
- stateCache
- .forComputation(state.getComputationId())
- .invalidate(request.getKey(), request.getShardingKey());
- state.completeWorkAndScheduleNextWorkForKey(
- ShardedKey.create(request.getKey(), request.getShardingKey()),
- WorkId.builder()
- .setWorkToken(request.getWorkToken())
- .setCacheToken(request.getCacheToken())
- .build());
- return true;
- }
-
- final int size = commit.getSize();
- commit.work().setState(Work.State.COMMITTING);
- activeCommitBytes.addAndGet(size);
- if (commitStream.commitWorkItem(
- state.getComputationId(),
- request,
- (Windmill.CommitStatus status) -> {
- if (status != Windmill.CommitStatus.OK) {
- readerCache.invalidateReader(
- WindmillComputationKey.create(
- state.getComputationId(), request.getKey(),
request.getShardingKey()));
- stateCache
- .forComputation(state.getComputationId())
- .invalidate(request.getKey(), request.getShardingKey());
- }
- activeCommitBytes.addAndGet(-size);
- state.completeWorkAndScheduleNextWorkForKey(
- ShardedKey.create(request.getKey(), request.getShardingKey()),
- WorkId.builder()
- .setCacheToken(request.getCacheToken())
- .setWorkToken(request.getWorkToken())
- .build());
- })) {
- return true;
- } else {
- // Back out the stats changes since the commit wasn't consumed.
- commit.work().setState(Work.State.COMMIT_QUEUED);
- activeCommitBytes.addAndGet(-size);
- return false;
- }
+ private void onStreamingCommitFailed(Commit commit) {
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]