scwhittle commented on code in PR #25186:
URL: https://github.com/apache/beam/pull/25186#discussion_r1235538475
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java:
##########
@@ -117,75 +188,216 @@ public BeamFnLoggingClient(
rootLogger.removeHandler(handler);
}
// configure loggers from default sdk harness log level and log level
overrides
- this.configuredLoggers =
-
SdkHarnessOptions.getConfiguredLoggerFromOptions(options.as(SdkHarnessOptions.class));
+
this.configuredLoggers.addAll(SdkHarnessOptions.getConfiguredLoggerFromOptions(options));
- BeamFnLoggingGrpc.BeamFnLoggingStub stub =
BeamFnLoggingGrpc.newStub(channel);
- inboundObserver = new LogControlObserver();
- logRecordHandler = new LogRecordHandler();
- logRecordHandler.setLevel(Level.ALL);
- logRecordHandler.setFormatter(DEFAULT_FORMATTER);
-
logRecordHandler.executeOn(options.as(ExecutorOptions.class).getScheduledExecutorService());
- boolean logMdc = options.as(SdkHarnessOptions.class).getLogMdc();
- logRecordHandler.setLogMdc(logMdc);
- QuotaEvent.setEnabled(logMdc);
- outboundObserver = (CallStreamObserver<BeamFnApi.LogEntry.List>)
stub.logging(inboundObserver);
+ // Install a handler that queues to the buffer read by the background
thread.
rootLogger.addHandler(logRecordHandler);
}
- public void setProcessBundleHandler(ProcessBundleHandler
processBundleHandler) {
- this.processBundleHandler = processBundleHandler;
+ private static class StreamWriter {
+ private final ManagedChannel channel;
+ private final StreamObserver<BeamFnApi.LogEntry.List> outboundObserver;
+ private final LogControlObserver inboundObserver;
+
+ private final CompletableFuture<Object> inboundObserverCompletion;
+ private final AdvancingPhaser streamPhaser;
+
+ // Used to note we are attempting to close the logging client and to
gracefully drain the
+ // current logs to the stream.
+ private final CompletableFuture<Object> softClosing = new
CompletableFuture<>();
+
+ public StreamWriter(ManagedChannel channel) {
+ this.inboundObserverCompletion = new CompletableFuture<>();
+ this.streamPhaser = new AdvancingPhaser(1);
+ this.channel = channel;
+
+ BeamFnLoggingGrpc.BeamFnLoggingStub stub =
BeamFnLoggingGrpc.newStub(channel);
+ this.inboundObserver = new LogControlObserver();
+ this.outboundObserver =
+ new DirectStreamObserver<BeamFnApi.LogEntry.List>(
+ this.streamPhaser,
+ (CallStreamObserver<BeamFnApi.LogEntry.List>)
stub.logging(inboundObserver));
+ }
+
+ public void drainQueueToStream(BlockingQueue<BeamFnApi.LogEntry>
bufferedLogEntries) {
+ Throwable thrown = null;
+ try {
+ List<BeamFnApi.LogEntry> additionalLogEntries =
+ new ArrayList<>(MAX_BUFFERED_LOG_ENTRY_COUNT);
+ // As long as we haven't yet terminated the stream, then attempt to
send on it.
+ while (!streamPhaser.isTerminated()) {
+ // We wait for a limited period so that we can evaluate if the
stream closed or if
+ // we are gracefully closing the client.
+ BeamFnApi.LogEntry logEntry = bufferedLogEntries.poll(1, SECONDS);
+ if (logEntry == null) {
+ if (softClosing.isDone()) {
+ break;
+ }
+ continue;
+ }
+
+ // Batch together as many log messages as possible that are held
within the buffer
+ BeamFnApi.LogEntry.List.Builder builder =
+ BeamFnApi.LogEntry.List.newBuilder().addLogEntries(logEntry);
+ bufferedLogEntries.drainTo(additionalLogEntries);
+ builder.addAllLogEntries(additionalLogEntries);
+ outboundObserver.onNext(builder.build());
+ additionalLogEntries.clear();
+ }
+ if (inboundObserverCompletion.isDone()) {
+ try {
+ // If the inbound observer failed with an exception, get() will
throw an
+ // ExecutionException.
+ inboundObserverCompletion.get();
+ // Otherwise it is an error for the server to close the stream
before we closed our end.
+ throw new IllegalStateException(
+ "Logging stream terminated unexpectedly with success before it
was closed by the client.");
+ } catch (ExecutionException e) {
+ throw new IllegalStateException(
+ "Logging stream terminated unexpectedly before it was closed
by the client with error: "
+ + e.getCause());
+ } catch (InterruptedException e) {
+ // Should never happen because of the isDone check.
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+ } catch (Throwable t) {
+ thrown = t;
+ throw new RuntimeException(t);
+ } finally {
+ if (thrown == null) {
+ outboundObserver.onCompleted();
+ } else {
+ outboundObserver.onError(thrown);
+ }
+ channel.shutdown();
+ boolean shutdownFinished = false;
+ try {
+ shutdownFinished = channel.awaitTermination(10, SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ if (!shutdownFinished) {
+ channel.shutdownNow();
+ }
+ }
+ }
+ }
+
+ public void softClose() {
+ softClosing.complete(COMPLETED);
+ }
+
+ public void hardClose() {
+ streamPhaser.forceTermination();
+ }
+
+ private class LogControlObserver
+ implements ClientResponseObserver<BeamFnApi.LogEntry,
BeamFnApi.LogControl> {
+
+ @Override
+ public void beforeStart(ClientCallStreamObserver<BeamFnApi.LogEntry>
requestStream) {
+ requestStream.setOnReadyHandler(streamPhaser::arrive);
+ }
+
+ @Override
+ public void onNext(BeamFnApi.LogControl value) {}
+
+ @Override
+ public void onError(Throwable t) {
+ inboundObserverCompletion.completeExceptionally(t);
+ hardClose();
+ }
+
+ @Override
+ public void onCompleted() {
+ inboundObserverCompletion.complete(COMPLETED);
+ hardClose();
+ }
+ }
}
@Override
+ @SuppressWarnings("nullness")
Review Comment:
For some reason with the Preconditions.checkNotNull I still got errors.
Interestingly I tried casting explicitly to @NonNull afterwards and it said
both that it wasn't safe and that it was unnecessary :P
I tried going to CompletableFuture<Object> instead of CompletableFuture<?>
in case that was messing it up but it didn't help.
I was able to get it to work by removing the Nullable by initializing it in
the constructor instead of via the helper start method. To do so I had to add
some additional underinitialization annotations.
PTAL
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]