Abacn commented on code in PR #25186:
URL: https://github.com/apache/beam/pull/25186#discussion_r1235381896


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java:
##########
@@ -87,28 +93,93 @@ public class BeamFnLoggingClient implements AutoCloseable {
 
   private static final Object COMPLETED = new Object();
 
+  private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
+
+  private final StreamWriter streamWriter;
+
+  private final LogRecordHandler logRecordHandler;
+
   /* We need to store a reference to the configured loggers so that they are 
not
    * garbage collected. java.util.logging only has weak references to the 
loggers
    * so if they are garbage collected, our hierarchical configuration will be 
lost. */
-  private final Collection<Logger> configuredLoggers;
-  private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
-  private final ManagedChannel channel;
-  private final CallStreamObserver<BeamFnApi.LogEntry.List> outboundObserver;
-  private final LogControlObserver inboundObserver;
-  private final LogRecordHandler logRecordHandler;
-  private final CompletableFuture<Object> inboundObserverCompletion;
-  private final Phaser phaser;
+  private final Collection<Logger> configuredLoggers = new ArrayList<>();
+
   private @Nullable ProcessBundleHandler processBundleHandler;
 
-  public BeamFnLoggingClient(
+  private final BlockingQueue<LogEntry> bufferedLogEntries =
+      new ArrayBlockingQueue<>(MAX_BUFFERED_LOG_ENTRY_COUNT);
+
+  //

Review Comment:
   possible leftover - empty comment line



##########
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java:
##########
@@ -76,12 +77,13 @@ public void onNext(T value) {
         // Record the initial phase in case we are in the inbound gRPC thread 
where the phase won't
         // advance.
         int initialPhase = phase;
-        while (!outboundObserver.isReady()) {
+        // A negative phase indicates that the phaser is terminated.
+        while (phase >= 0 && !outboundObserver.isReady()) {
           try {
             phase = phaser.awaitAdvanceInterruptibly(phase, waitTime, 
TimeUnit.SECONDS);
           } catch (TimeoutException e) {
             totalTimeWaited += waitTime;
-            waitTime = waitTime * 2;
+            waitTime = Math.min(waitTime * 2, 60);

Review Comment:
   there is a hard coded constant here (60 s) possibly add some comments for 
its purpose?



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java:
##########
@@ -117,75 +188,216 @@ public BeamFnLoggingClient(
       rootLogger.removeHandler(handler);
     }
     // configure loggers from default sdk harness log level and log level 
overrides
-    this.configuredLoggers =
-        
SdkHarnessOptions.getConfiguredLoggerFromOptions(options.as(SdkHarnessOptions.class));
+    
this.configuredLoggers.addAll(SdkHarnessOptions.getConfiguredLoggerFromOptions(options));
 
-    BeamFnLoggingGrpc.BeamFnLoggingStub stub = 
BeamFnLoggingGrpc.newStub(channel);
-    inboundObserver = new LogControlObserver();
-    logRecordHandler = new LogRecordHandler();
-    logRecordHandler.setLevel(Level.ALL);
-    logRecordHandler.setFormatter(DEFAULT_FORMATTER);
-    
logRecordHandler.executeOn(options.as(ExecutorOptions.class).getScheduledExecutorService());
-    boolean logMdc = options.as(SdkHarnessOptions.class).getLogMdc();
-    logRecordHandler.setLogMdc(logMdc);
-    QuotaEvent.setEnabled(logMdc);
-    outboundObserver = (CallStreamObserver<BeamFnApi.LogEntry.List>) 
stub.logging(inboundObserver);
+    // Install a handler that queues to the buffer read by the background 
thread.
     rootLogger.addHandler(logRecordHandler);
   }
 
-  public void setProcessBundleHandler(ProcessBundleHandler 
processBundleHandler) {
-    this.processBundleHandler = processBundleHandler;
+  private static class StreamWriter {
+    private final ManagedChannel channel;
+    private final StreamObserver<BeamFnApi.LogEntry.List> outboundObserver;
+    private final LogControlObserver inboundObserver;
+
+    private final CompletableFuture<Object> inboundObserverCompletion;
+    private final AdvancingPhaser streamPhaser;
+
+    // Used to note we are attempting to close the logging client and to 
gracefully drain the
+    // current logs to the stream.
+    private final CompletableFuture<Object> softClosing = new 
CompletableFuture<>();
+
+    public StreamWriter(ManagedChannel channel) {
+      this.inboundObserverCompletion = new CompletableFuture<>();
+      this.streamPhaser = new AdvancingPhaser(1);
+      this.channel = channel;
+
+      BeamFnLoggingGrpc.BeamFnLoggingStub stub = 
BeamFnLoggingGrpc.newStub(channel);
+      this.inboundObserver = new LogControlObserver();
+      this.outboundObserver =
+          new DirectStreamObserver<BeamFnApi.LogEntry.List>(
+              this.streamPhaser,
+              (CallStreamObserver<BeamFnApi.LogEntry.List>) 
stub.logging(inboundObserver));
+    }
+
+    public void drainQueueToStream(BlockingQueue<BeamFnApi.LogEntry> 
bufferedLogEntries) {
+      Throwable thrown = null;
+      try {
+        List<BeamFnApi.LogEntry> additionalLogEntries =
+            new ArrayList<>(MAX_BUFFERED_LOG_ENTRY_COUNT);
+        // As long as we haven't yet terminated the stream, then attempt to 
send on it.
+        while (!streamPhaser.isTerminated()) {
+          // We wait for a limited period so that we can evaluate if the 
stream closed or if
+          // we are gracefully closing the client.
+          BeamFnApi.LogEntry logEntry = bufferedLogEntries.poll(1, SECONDS);
+          if (logEntry == null) {
+            if (softClosing.isDone()) {
+              break;
+            }
+            continue;
+          }
+
+          // Batch together as many log messages as possible that are held 
within the buffer
+          BeamFnApi.LogEntry.List.Builder builder =
+              BeamFnApi.LogEntry.List.newBuilder().addLogEntries(logEntry);
+          bufferedLogEntries.drainTo(additionalLogEntries);
+          builder.addAllLogEntries(additionalLogEntries);
+          outboundObserver.onNext(builder.build());
+          additionalLogEntries.clear();
+        }
+        if (inboundObserverCompletion.isDone()) {
+          try {
+            // If the inbound observer failed with an exception, get() will 
throw an
+            // ExecutionException.
+            inboundObserverCompletion.get();
+            // Otherwise it is an error for the server to close the stream 
before we closed our end.
+            throw new IllegalStateException(
+                "Logging stream terminated unexpectedly with success before it 
was closed by the client.");
+          } catch (ExecutionException e) {
+            throw new IllegalStateException(
+                "Logging stream terminated unexpectedly before it was closed 
by the client with error: "
+                    + e.getCause());
+          } catch (InterruptedException e) {
+            // Should never happen because of the isDone check.
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e);
+          }
+        }
+      } catch (Throwable t) {
+        thrown = t;
+        throw new RuntimeException(t);
+      } finally {
+        if (thrown == null) {
+          outboundObserver.onCompleted();
+        } else {
+          outboundObserver.onError(thrown);
+        }
+        channel.shutdown();
+        boolean shutdownFinished = false;
+        try {
+          shutdownFinished = channel.awaitTermination(10, SECONDS);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        } finally {
+          if (!shutdownFinished) {
+            channel.shutdownNow();
+          }
+        }
+      }
+    }
+
+    public void softClose() {
+      softClosing.complete(COMPLETED);
+    }
+
+    public void hardClose() {
+      streamPhaser.forceTermination();
+    }
+
+    private class LogControlObserver
+        implements ClientResponseObserver<BeamFnApi.LogEntry, 
BeamFnApi.LogControl> {
+
+      @Override
+      public void beforeStart(ClientCallStreamObserver<BeamFnApi.LogEntry> 
requestStream) {
+        requestStream.setOnReadyHandler(streamPhaser::arrive);
+      }
+
+      @Override
+      public void onNext(BeamFnApi.LogControl value) {}
+
+      @Override
+      public void onError(Throwable t) {
+        inboundObserverCompletion.completeExceptionally(t);
+        hardClose();
+      }
+
+      @Override
+      public void onCompleted() {
+        inboundObserverCompletion.complete(COMPLETED);
+        hardClose();
+      }
+    }
   }
 
   @Override
+  @SuppressWarnings("nullness")

Review Comment:
   there is `Preconditions.checkNotNull` below, is this SuppressWarnings still 
needed (same as `terminationFuture`  below)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to