scwhittle commented on code in PR #25186:
URL: https://github.com/apache/beam/pull/25186#discussion_r1235534965


##########
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java:
##########
@@ -76,12 +77,13 @@ public void onNext(T value) {
         // Record the initial phase in case we are in the inbound gRPC thread 
where the phase won't
         // advance.
         int initialPhase = phase;
-        while (!outboundObserver.isReady()) {
+        // A negative phase indicates that the phaser is terminated.
+        while (phase >= 0 && !outboundObserver.isReady()) {
           try {
             phase = phaser.awaitAdvanceInterruptibly(phase, waitTime, 
TimeUnit.SECONDS);
           } catch (TimeoutException e) {
             totalTimeWaited += waitTime;
-            waitTime = waitTime * 2;
+            waitTime = Math.min(waitTime * 2, 60);

Review Comment:
   Done



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java:
##########
@@ -87,28 +93,93 @@ public class BeamFnLoggingClient implements AutoCloseable {
 
   private static final Object COMPLETED = new Object();
 
+  private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
+
+  private final StreamWriter streamWriter;
+
+  private final LogRecordHandler logRecordHandler;
+
   /* We need to store a reference to the configured loggers so that they are 
not
    * garbage collected. java.util.logging only has weak references to the 
loggers
    * so if they are garbage collected, our hierarchical configuration will be 
lost. */
-  private final Collection<Logger> configuredLoggers;
-  private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
-  private final ManagedChannel channel;
-  private final CallStreamObserver<BeamFnApi.LogEntry.List> outboundObserver;
-  private final LogControlObserver inboundObserver;
-  private final LogRecordHandler logRecordHandler;
-  private final CompletableFuture<Object> inboundObserverCompletion;
-  private final Phaser phaser;
+  private final Collection<Logger> configuredLoggers = new ArrayList<>();
+
   private @Nullable ProcessBundleHandler processBundleHandler;
 
-  public BeamFnLoggingClient(
+  private final BlockingQueue<LogEntry> bufferedLogEntries =
+      new ArrayBlockingQueue<>(MAX_BUFFERED_LOG_ENTRY_COUNT);
+
+  //

Review Comment:
   added comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to