Abacn commented on code in PR #25186:
URL: https://github.com/apache/beam/pull/25186#discussion_r1235381896
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java:
##########
@@ -87,28 +93,93 @@ public class BeamFnLoggingClient implements AutoCloseable {
private static final Object COMPLETED = new Object();
+ private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
+
+ private final StreamWriter streamWriter;
+
+ private final LogRecordHandler logRecordHandler;
+
/* We need to store a reference to the configured loggers so that they are
not
* garbage collected. java.util.logging only has weak references to the
loggers
* so if they are garbage collected, our hierarchical configuration will be
lost. */
- private final Collection<Logger> configuredLoggers;
- private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
- private final ManagedChannel channel;
- private final CallStreamObserver<BeamFnApi.LogEntry.List> outboundObserver;
- private final LogControlObserver inboundObserver;
- private final LogRecordHandler logRecordHandler;
- private final CompletableFuture<Object> inboundObserverCompletion;
- private final Phaser phaser;
+ private final Collection<Logger> configuredLoggers = new ArrayList<>();
+
private @Nullable ProcessBundleHandler processBundleHandler;
- public BeamFnLoggingClient(
+ private final BlockingQueue<LogEntry> bufferedLogEntries =
+ new ArrayBlockingQueue<>(MAX_BUFFERED_LOG_ENTRY_COUNT);
+
+ //
Review Comment:
possible leftover - empty comment line
##########
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java:
##########
@@ -76,12 +77,13 @@ public void onNext(T value) {
// Record the initial phase in case we are in the inbound gRPC thread
where the phase won't
// advance.
int initialPhase = phase;
- while (!outboundObserver.isReady()) {
+ // A negative phase indicates that the phaser is terminated.
+ while (phase >= 0 && !outboundObserver.isReady()) {
try {
phase = phaser.awaitAdvanceInterruptibly(phase, waitTime,
TimeUnit.SECONDS);
} catch (TimeoutException e) {
totalTimeWaited += waitTime;
- waitTime = waitTime * 2;
+ waitTime = Math.min(waitTime * 2, 60);
Review Comment:
there is a hard coded constant here (60 s) possibly add some comments for
its purpose?
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java:
##########
@@ -117,75 +188,216 @@ public BeamFnLoggingClient(
rootLogger.removeHandler(handler);
}
// configure loggers from default sdk harness log level and log level
overrides
- this.configuredLoggers =
-
SdkHarnessOptions.getConfiguredLoggerFromOptions(options.as(SdkHarnessOptions.class));
+
this.configuredLoggers.addAll(SdkHarnessOptions.getConfiguredLoggerFromOptions(options));
- BeamFnLoggingGrpc.BeamFnLoggingStub stub =
BeamFnLoggingGrpc.newStub(channel);
- inboundObserver = new LogControlObserver();
- logRecordHandler = new LogRecordHandler();
- logRecordHandler.setLevel(Level.ALL);
- logRecordHandler.setFormatter(DEFAULT_FORMATTER);
-
logRecordHandler.executeOn(options.as(ExecutorOptions.class).getScheduledExecutorService());
- boolean logMdc = options.as(SdkHarnessOptions.class).getLogMdc();
- logRecordHandler.setLogMdc(logMdc);
- QuotaEvent.setEnabled(logMdc);
- outboundObserver = (CallStreamObserver<BeamFnApi.LogEntry.List>)
stub.logging(inboundObserver);
+ // Install a handler that queues to the buffer read by the background
thread.
rootLogger.addHandler(logRecordHandler);
}
- public void setProcessBundleHandler(ProcessBundleHandler
processBundleHandler) {
- this.processBundleHandler = processBundleHandler;
+ private static class StreamWriter {
+ private final ManagedChannel channel;
+ private final StreamObserver<BeamFnApi.LogEntry.List> outboundObserver;
+ private final LogControlObserver inboundObserver;
+
+ private final CompletableFuture<Object> inboundObserverCompletion;
+ private final AdvancingPhaser streamPhaser;
+
+ // Used to note we are attempting to close the logging client and to
gracefully drain the
+ // current logs to the stream.
+ private final CompletableFuture<Object> softClosing = new
CompletableFuture<>();
+
+ public StreamWriter(ManagedChannel channel) {
+ this.inboundObserverCompletion = new CompletableFuture<>();
+ this.streamPhaser = new AdvancingPhaser(1);
+ this.channel = channel;
+
+ BeamFnLoggingGrpc.BeamFnLoggingStub stub =
BeamFnLoggingGrpc.newStub(channel);
+ this.inboundObserver = new LogControlObserver();
+ this.outboundObserver =
+ new DirectStreamObserver<BeamFnApi.LogEntry.List>(
+ this.streamPhaser,
+ (CallStreamObserver<BeamFnApi.LogEntry.List>)
stub.logging(inboundObserver));
+ }
+
+ public void drainQueueToStream(BlockingQueue<BeamFnApi.LogEntry>
bufferedLogEntries) {
+ Throwable thrown = null;
+ try {
+ List<BeamFnApi.LogEntry> additionalLogEntries =
+ new ArrayList<>(MAX_BUFFERED_LOG_ENTRY_COUNT);
+ // As long as we haven't yet terminated the stream, then attempt to
send on it.
+ while (!streamPhaser.isTerminated()) {
+ // We wait for a limited period so that we can evaluate if the
stream closed or if
+ // we are gracefully closing the client.
+ BeamFnApi.LogEntry logEntry = bufferedLogEntries.poll(1, SECONDS);
+ if (logEntry == null) {
+ if (softClosing.isDone()) {
+ break;
+ }
+ continue;
+ }
+
+ // Batch together as many log messages as possible that are held
within the buffer
+ BeamFnApi.LogEntry.List.Builder builder =
+ BeamFnApi.LogEntry.List.newBuilder().addLogEntries(logEntry);
+ bufferedLogEntries.drainTo(additionalLogEntries);
+ builder.addAllLogEntries(additionalLogEntries);
+ outboundObserver.onNext(builder.build());
+ additionalLogEntries.clear();
+ }
+ if (inboundObserverCompletion.isDone()) {
+ try {
+ // If the inbound observer failed with an exception, get() will
throw an
+ // ExecutionException.
+ inboundObserverCompletion.get();
+ // Otherwise it is an error for the server to close the stream
before we closed our end.
+ throw new IllegalStateException(
+ "Logging stream terminated unexpectedly with success before it
was closed by the client.");
+ } catch (ExecutionException e) {
+ throw new IllegalStateException(
+ "Logging stream terminated unexpectedly before it was closed
by the client with error: "
+ + e.getCause());
+ } catch (InterruptedException e) {
+ // Should never happen because of the isDone check.
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+ } catch (Throwable t) {
+ thrown = t;
+ throw new RuntimeException(t);
+ } finally {
+ if (thrown == null) {
+ outboundObserver.onCompleted();
+ } else {
+ outboundObserver.onError(thrown);
+ }
+ channel.shutdown();
+ boolean shutdownFinished = false;
+ try {
+ shutdownFinished = channel.awaitTermination(10, SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ if (!shutdownFinished) {
+ channel.shutdownNow();
+ }
+ }
+ }
+ }
+
+ public void softClose() {
+ softClosing.complete(COMPLETED);
+ }
+
+ public void hardClose() {
+ streamPhaser.forceTermination();
+ }
+
+ private class LogControlObserver
+ implements ClientResponseObserver<BeamFnApi.LogEntry,
BeamFnApi.LogControl> {
+
+ @Override
+ public void beforeStart(ClientCallStreamObserver<BeamFnApi.LogEntry>
requestStream) {
+ requestStream.setOnReadyHandler(streamPhaser::arrive);
+ }
+
+ @Override
+ public void onNext(BeamFnApi.LogControl value) {}
+
+ @Override
+ public void onError(Throwable t) {
+ inboundObserverCompletion.completeExceptionally(t);
+ hardClose();
+ }
+
+ @Override
+ public void onCompleted() {
+ inboundObserverCompletion.complete(COMPLETED);
+ hardClose();
+ }
+ }
}
@Override
+ @SuppressWarnings("nullness")
Review Comment:
there is `Preconditions.checkNotNull` below, is this SuppressWarnings still
needed (same as `terminationFuture` below)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]