scwhittle commented on code in PR #25186:
URL: https://github.com/apache/beam/pull/25186#discussion_r1235538475


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java:
##########
@@ -117,75 +188,216 @@ public BeamFnLoggingClient(
       rootLogger.removeHandler(handler);
     }
     // configure loggers from default sdk harness log level and log level 
overrides
-    this.configuredLoggers =
-        
SdkHarnessOptions.getConfiguredLoggerFromOptions(options.as(SdkHarnessOptions.class));
+    
this.configuredLoggers.addAll(SdkHarnessOptions.getConfiguredLoggerFromOptions(options));
 
-    BeamFnLoggingGrpc.BeamFnLoggingStub stub = 
BeamFnLoggingGrpc.newStub(channel);
-    inboundObserver = new LogControlObserver();
-    logRecordHandler = new LogRecordHandler();
-    logRecordHandler.setLevel(Level.ALL);
-    logRecordHandler.setFormatter(DEFAULT_FORMATTER);
-    
logRecordHandler.executeOn(options.as(ExecutorOptions.class).getScheduledExecutorService());
-    boolean logMdc = options.as(SdkHarnessOptions.class).getLogMdc();
-    logRecordHandler.setLogMdc(logMdc);
-    QuotaEvent.setEnabled(logMdc);
-    outboundObserver = (CallStreamObserver<BeamFnApi.LogEntry.List>) 
stub.logging(inboundObserver);
+    // Install a handler that queues to the buffer read by the background 
thread.
     rootLogger.addHandler(logRecordHandler);
   }
 
-  public void setProcessBundleHandler(ProcessBundleHandler 
processBundleHandler) {
-    this.processBundleHandler = processBundleHandler;
+  private static class StreamWriter {
+    private final ManagedChannel channel;
+    private final StreamObserver<BeamFnApi.LogEntry.List> outboundObserver;
+    private final LogControlObserver inboundObserver;
+
+    private final CompletableFuture<Object> inboundObserverCompletion;
+    private final AdvancingPhaser streamPhaser;
+
+    // Used to note we are attempting to close the logging client and to 
gracefully drain the
+    // current logs to the stream.
+    private final CompletableFuture<Object> softClosing = new 
CompletableFuture<>();
+
+    public StreamWriter(ManagedChannel channel) {
+      this.inboundObserverCompletion = new CompletableFuture<>();
+      this.streamPhaser = new AdvancingPhaser(1);
+      this.channel = channel;
+
+      BeamFnLoggingGrpc.BeamFnLoggingStub stub = 
BeamFnLoggingGrpc.newStub(channel);
+      this.inboundObserver = new LogControlObserver();
+      this.outboundObserver =
+          new DirectStreamObserver<BeamFnApi.LogEntry.List>(
+              this.streamPhaser,
+              (CallStreamObserver<BeamFnApi.LogEntry.List>) 
stub.logging(inboundObserver));
+    }
+
+    public void drainQueueToStream(BlockingQueue<BeamFnApi.LogEntry> 
bufferedLogEntries) {
+      Throwable thrown = null;
+      try {
+        List<BeamFnApi.LogEntry> additionalLogEntries =
+            new ArrayList<>(MAX_BUFFERED_LOG_ENTRY_COUNT);
+        // As long as we haven't yet terminated the stream, then attempt to 
send on it.
+        while (!streamPhaser.isTerminated()) {
+          // We wait for a limited period so that we can evaluate if the 
stream closed or if
+          // we are gracefully closing the client.
+          BeamFnApi.LogEntry logEntry = bufferedLogEntries.poll(1, SECONDS);
+          if (logEntry == null) {
+            if (softClosing.isDone()) {
+              break;
+            }
+            continue;
+          }
+
+          // Batch together as many log messages as possible that are held 
within the buffer
+          BeamFnApi.LogEntry.List.Builder builder =
+              BeamFnApi.LogEntry.List.newBuilder().addLogEntries(logEntry);
+          bufferedLogEntries.drainTo(additionalLogEntries);
+          builder.addAllLogEntries(additionalLogEntries);
+          outboundObserver.onNext(builder.build());
+          additionalLogEntries.clear();
+        }
+        if (inboundObserverCompletion.isDone()) {
+          try {
+            // If the inbound observer failed with an exception, get() will 
throw an
+            // ExecutionException.
+            inboundObserverCompletion.get();
+            // Otherwise it is an error for the server to close the stream 
before we closed our end.
+            throw new IllegalStateException(
+                "Logging stream terminated unexpectedly with success before it 
was closed by the client.");
+          } catch (ExecutionException e) {
+            throw new IllegalStateException(
+                "Logging stream terminated unexpectedly before it was closed 
by the client with error: "
+                    + e.getCause());
+          } catch (InterruptedException e) {
+            // Should never happen because of the isDone check.
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e);
+          }
+        }
+      } catch (Throwable t) {
+        thrown = t;
+        throw new RuntimeException(t);
+      } finally {
+        if (thrown == null) {
+          outboundObserver.onCompleted();
+        } else {
+          outboundObserver.onError(thrown);
+        }
+        channel.shutdown();
+        boolean shutdownFinished = false;
+        try {
+          shutdownFinished = channel.awaitTermination(10, SECONDS);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        } finally {
+          if (!shutdownFinished) {
+            channel.shutdownNow();
+          }
+        }
+      }
+    }
+
+    public void softClose() {
+      softClosing.complete(COMPLETED);
+    }
+
+    public void hardClose() {
+      streamPhaser.forceTermination();
+    }
+
+    private class LogControlObserver
+        implements ClientResponseObserver<BeamFnApi.LogEntry, 
BeamFnApi.LogControl> {
+
+      @Override
+      public void beforeStart(ClientCallStreamObserver<BeamFnApi.LogEntry> 
requestStream) {
+        requestStream.setOnReadyHandler(streamPhaser::arrive);
+      }
+
+      @Override
+      public void onNext(BeamFnApi.LogControl value) {}
+
+      @Override
+      public void onError(Throwable t) {
+        inboundObserverCompletion.completeExceptionally(t);
+        hardClose();
+      }
+
+      @Override
+      public void onCompleted() {
+        inboundObserverCompletion.complete(COMPLETED);
+        hardClose();
+      }
+    }
   }
 
   @Override
+  @SuppressWarnings("nullness")

Review Comment:
   For some reason with the Preconditions.checkNotNull I still got errors. 
Interestingly I tried casting explicitly to @NonNull afterwards and it said 
both that it wasn't safe and that it was unnecessary :P
   I tried going to CompletableFuture<Object> instead of CompletableFuture<?> 
in case that was messing it up but it didn't help.
   I was able to get it to work by removing the Nullable by initializing it in 
the constructor instead of via the helper start method.  To do so I had to add 
some additional underinitialization annotations.
   PTAL



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to