arunpandianp commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1814061667
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -342,64 +379,92 @@ private void queueRequestAndWait(QueuedRequest request)
throws InterruptedExcept
batch.addRequest(request);
}
if (responsibleForSend) {
- if (waitForSendLatch == null) {
+ if (prevBatch == null) {
// If there was not a previous batch wait a little while to improve
// batching.
- Thread.sleep(1);
+ sleeper.sleep(1);
} else {
- waitForSendLatch.await();
+ prevBatch.waitForSendOrFailNotification();
}
// Finalize the batch so that no additional requests will be added.
Leave the batch in the
// queue so that a subsequent batch will wait for its completion.
synchronized (batches) {
- verify(batch == batches.peekFirst());
+ verify(batch == batches.peekFirst(), "GetDataStream request batch
removed before send().");
batch.markFinalized();
}
- sendBatch(batch.requests());
+ trySendBatch(batch);
+ } else {
+ // Wait for this batch to be sent before parsing the response.
+ batch.waitForSendOrFailNotification();
+ }
+ }
+
+ void trySendBatch(QueuedBatch batch) {
+ try {
+ sendBatch(batch.sortedRequestsReadOnly());
synchronized (batches) {
Review Comment:
changing all `synchronized (batches)` blocks to be like the following should
fix mutations to `batches` and shutdown racing
```
synchronized (shutdownLock) {
if (isShutdown()) {
// cleanup anything that needs to be cleaned up;
return;
// returning here should be safe as shutDown cleared all existing
`batches`
}
synchronized (batches) {
...
}
}
}
```
Maybe we could use only `shutdownLock` to guard access to `batches` and
remove `synchronized (batches)`, will there be any downsides in doing so?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamRequests.java:
##########
@@ -91,18 +110,26 @@ void
addToStreamingGetDataRequest(Windmill.StreamingGetDataRequest.Builder build
}
}
+ /** Represents a batch of queued requests. Methods are thread-safe unless
commented otherwise. */
static class QueuedBatch {
private final List<QueuedRequest> requests = new ArrayList<>();
private final CountDownLatch sent = new CountDownLatch(1);
private long byteSize = 0;
- private boolean finalized = false;
+ private volatile boolean finalized = false;
+ private volatile boolean failed = false;
- CountDownLatch getLatch() {
- return sent;
+ /**
+ * Returns a read-only view of requests sorted with {@link
QueuedRequest#globalRequestsFirst()}.
+ */
+ List<QueuedRequest> sortedRequestsReadOnly() {
+ // Put all global data requests first because there is only a single
repeated field for
+ // request ids and the initial ids correspond to global data requests if
they are present.
+ requests.sort(QueuedRequest.globalRequestsFirst());
+ return Collections.unmodifiableList(requests);
Review Comment:
why do we need this now and not before?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamRequests.java:
##########
@@ -91,18 +110,26 @@ void
addToStreamingGetDataRequest(Windmill.StreamingGetDataRequest.Builder build
}
}
+ /** Represents a batch of queued requests. Methods are thread-safe unless
commented otherwise. */
Review Comment:
shoud this be "Methods are **not** thread-safe unless commented otherwise"?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -283,21 +312,105 @@ private void issueMultiChunkRequest(final long id,
PendingRequest pendingRequest
}
}
- private static class PendingRequest {
+ /**
+ * Returns true if the request should be failed due to stream shutdown, else
tracks the request to
+ * be sent and returns false.
+ */
+ private boolean isPrepareForSendFailed(long id, PendingRequest request) {
Review Comment:
nit:
```suggestion
private boolean prepareForSend(long id, PendingRequest request) {
```
and return true if prepare succeeded.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]