scwhittle commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1840157947
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java:
##########
@@ -121,7 +121,7 @@ public void onNext(T t) throws StreamClosedException,
WindmillStreamShutdownExce
try {
delegate.onError(e);
- } catch (RuntimeException ignored) {
+ } catch (IllegalStateException ignored) {
Review Comment:
add the ignored to the exception too? just in case we catch something
unexpected and it helps debugging?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##########
@@ -161,19 +162,27 @@ public void onNext(T value) throws
StreamObserverCancelledException {
public void onError(Throwable t) {
isReadyNotifier.forceTermination();
synchronized (lock) {
- isClosed = true;
+ markClosedOrThrow();
outboundObserver.onError(t);
}
}
@Override
public void onCompleted() {
+ isReadyNotifier.forceTermination();
synchronized (lock) {
- isClosed = true;
+ markClosedOrThrow();
outboundObserver.onCompleted();
}
}
+ private void markClosedOrThrow() {
+ synchronized (lock) {
+ Preconditions.checkState(!isClosed);
Review Comment:
this is going to throw if we have sequence
T1: onNext()
T2: terminate()
T1: onError()
It seems like that could happen if we're terminating from other threads than
the one driving the observer generally. We could have a separate bool tracking
if userClosed or not, and change this exception to be based upon that as that
is using the class wrong. having a terminate before/during a
onCompleted/onError doesn't necessarily seem like misuse and I think we should
avoid throwing an exception.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##########
@@ -161,19 +162,27 @@ public void onNext(T value) throws
StreamObserverCancelledException {
public void onError(Throwable t) {
isReadyNotifier.forceTermination();
synchronized (lock) {
- isClosed = true;
+ markClosedOrThrow();
outboundObserver.onError(t);
}
}
@Override
public void onCompleted() {
+ isReadyNotifier.forceTermination();
synchronized (lock) {
- isClosed = true;
+ markClosedOrThrow();
outboundObserver.onCompleted();
}
}
+ private void markClosedOrThrow() {
Review Comment:
just make method synchronized (though with suggestion below, probably easier
to just duplicate in onCompleted/onError and use the same if block.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -188,7 +188,9 @@ protected void onResponse(StreamingGetDataResponse chunk) {
for (int i = 0; i < chunk.getRequestIdCount(); ++i) {
AppendableInputStream responseStream =
pending.get(chunk.getRequestId(i));
- verify(responseStream != null, "No pending response stream");
+ synchronized (this) {
Review Comment:
avoid synchronizing if not necessary and you need to handle the null case or
you will just get an exception in following line.
if (responseStream == null) {
sychronized (this) { verify(isShutdown); }
continue;
}
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java:
##########
@@ -131,13 +136,27 @@ public void testShutdown_abortsQueuedCommits() throws
InterruptedException {
commitProcessed.countDown();
});
}
+ } catch (StreamObserverCancelledException ignored) {
}
// Verify that we sent the commits above in a request + the initial header.
- verify(requestObserver,
times(2)).onNext(any(Windmill.StreamingCommitWorkRequest.class));
+ verify(requestObserver, times(2))
+ .onNext(
+ argThat(
+ request -> {
+ if (request.getHeader().equals(TEST_JOB_HEADER)) {
Review Comment:
nit: how about separate onNext? as is 2 headers would pass. may need some
sequence object, not sure.
.onNext(argThat(r -> return r.getHeader().equals(TEST_JOB_HEADER))
.onNext(argThat(r - > !return request.getCommitChunkList().empty())
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##########
@@ -128,7 +128,9 @@ public void onNext(T value) throws
StreamObserverCancelledException {
// Phaser is terminated so don't use the outboundObserver. Since
onError and onCompleted
// are synchronized after terminating the phaser if we observe that
the phaser is not
// terminated the onNext calls below are guaranteed to not be called
on a closed observer.
- if (currentPhase < 0) return;
+ if (currentPhase < 0) {
+ throw new StreamObserverCancelledException("StreamObserver was
terminated.");
+ }
Review Comment:
assert(!isClosed);
since we terminate before closing seems like it is guaranteed but clearer to
reader to assert I think
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStreamTest.java:
##########
@@ -104,6 +106,10 @@ public void setMessageCompression(boolean b) {}
() ->
assertThrows(WindmillStreamShutdownException.class, () ->
testStream.testSend(1)));
testStream.shutdown();
+
+ // Sleep a bit to give sendExecutor time to execute the send().
+ Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
Review Comment:
sleep for less, 100ms maybe? Tests take long enough already
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##########
@@ -128,7 +128,9 @@ public void onNext(T value) throws
StreamObserverCancelledException {
// Phaser is terminated so don't use the outboundObserver. Since
onError and onCompleted
// are synchronized after terminating the phaser if we observe that
the phaser is not
// terminated the onNext calls below are guaranteed to not be called
on a closed observer.
- if (currentPhase < 0) return;
+ if (currentPhase < 0) {
+ throw new StreamObserverCancelledException("StreamObserver was
terminated.");
+ }
messagesSinceReady = 0;
Review Comment:
assert(!isClosed);
since we terminate before closing seems like it is guaranteed but clearer to
reader to assert I think
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##########
@@ -83,16 +85,14 @@ public void onNext(T value) throws
StreamObserverCancelledException {
// Phaser is terminated so don't use the outboundObserver. Since
onError and onCompleted
// are synchronized after terminating the phaser if we observe that
the phaser is not
// terminated the onNext calls below are guaranteed to not be called
on a closed observer.
- if (currentPhase < 0) return;
+ if (currentPhase < 0) {
+ throw new StreamObserverCancelledException("StreamObserver was
terminated.");
+ }
Review Comment:
assert(!isClosed);
since we terminate before grabbing synchronized to close it is guaranteed
but clearer to reader to assert I think
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamTest.java:
##########
@@ -102,6 +105,48 @@ private GrpcGetDataStream
createGetDataStream(GetDataStreamTestStub testStub) {
return getDataStream;
}
+ @Test
+ public void testRequestKeyedData() {
+ GetDataStreamTestStub testStub =
+ new GetDataStreamTestStub(new TestGetDataStreamRequestObserver());
+ GrpcGetDataStream getDataStream = createGetDataStream(testStub);
+ // These will block until they are successfully sent.
+ CompletableFuture<Windmill.KeyedGetDataResponse> sendFuture =
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return getDataStream.requestKeyedData(
+ "computationId",
+ Windmill.KeyedGetDataRequest.newBuilder()
+ .setKey(ByteString.EMPTY)
+ .setShardingKey(1)
+ .setCacheToken(1)
+ .setWorkToken(1)
+ .build());
+ } catch (WindmillStreamShutdownException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ // Sleep a bit to allow future to run.
+ Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
Review Comment:
ditto less
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamRequestsTest.java:
##########
@@ -142,7 +144,8 @@ public void
testQueuedBatch_notifyFailed_throwsWindmillStreamShutdownExceptionOn
assertThrows(
WindmillStreamShutdownException.class,
queuedBatch::waitForSendOrFailNotification));
-
+ // Wait a few seconds for the above future to get scheduled and run.
+ Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
Review Comment:
ditto how about 100ms
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]