m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1806032259
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -63,32 +68,48 @@
*/
public abstract class AbstractWindmillStream<RequestT, ResponseT> implements
WindmillStream {
- public static final long DEFAULT_STREAM_RPC_DEADLINE_SECONDS = 300;
// Default gRPC streams to 2MB chunks, which has shown to be a large enough
chunk size to reduce
// per-chunk overhead, and small enough that we can still perform granular
flow-control.
protected static final int RPC_STREAM_CHUNK_SIZE = 2 << 20;
- private static final Logger LOG =
LoggerFactory.getLogger(AbstractWindmillStream.class);
+ private static final Status OK_STATUS = Status.fromCode(Status.Code.OK);
+
protected final AtomicBoolean clientClosed;
- private final AtomicBoolean isShutdown;
+ protected final Sleeper sleeper;
private final AtomicLong lastSendTimeMs;
- private final Executor executor;
+ private final ExecutorService executor;
private final BackOff backoff;
private final AtomicLong startTimeMs;
private final AtomicLong lastResponseTimeMs;
+ private final AtomicInteger restartCount;
private final AtomicInteger errorCount;
- private final AtomicReference<String> lastError;
- private final AtomicReference<DateTime> lastErrorTime;
+ private final AtomicReference<String> lastRestartReason;
+ private final AtomicReference<DateTime> lastRestartTime;
private final AtomicLong sleepUntil;
private final CountDownLatch finishLatch;
private final Set<AbstractWindmillStream<?, ?>> streamRegistry;
private final int logEveryNStreamFailures;
- private final Supplier<StreamObserver<RequestT>> requestObserverSupplier;
- // Indicates if the current stream in requestObserver is closed by calling
close() method
- private final AtomicBoolean streamClosed;
private final String backendWorkerToken;
- private @Nullable StreamObserver<RequestT> requestObserver;
+ private final ResettableRequestObserver<RequestT> requestObserver;
+
+ /** Guards {@link #start()} and {@link #shutdown()} methods. */
+ private final Object shutdownLock = new Object();
+
+ /** Reads are lock free, writes are guarded by shutdownLock. */
+ private final AtomicBoolean isShutdown;
+
+ private final AtomicBoolean started;
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]