This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e942ae3790 Ensure that the BeamFnLoggingClient terminates process if 
stream breaks (#25186)
5e942ae3790 is described below

commit 5e942ae3790bc95148413c43ab7e43a01a2d82ae
Author: Sam Whittle <[email protected]>
AuthorDate: Wed Jun 21 02:19:32 2023 +0200

    Ensure that the BeamFnLoggingClient terminates process if stream breaks 
(#25186)
    
    * Ensure that a failure of the logging stream in BeamFnLoggingClient
    does not cause logging to block indefinitely but instead triggers
    SDK teardown.
    
    * fix test
    
    * fix racy test
    
    * address comments
---
 .../beam/sdk/fn/stream/DirectStreamObserver.java   |  24 +-
 .../jmh/logging/BeamFnLoggingClientBenchmark.java  |   2 +-
 .../java/org/apache/beam/fn/harness/FnHarness.java |   7 +-
 .../fn/harness/control/BeamFnControlClient.java    |   5 +-
 .../fn/harness/logging/BeamFnLoggingClient.java    | 454 +++++++++++++--------
 .../harness/control/BeamFnControlClientTest.java   |   4 +-
 .../harness/logging/BeamFnLoggingClientTest.java   | 200 ++++++++-
 7 files changed, 497 insertions(+), 199 deletions(-)

diff --git 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java
 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java
index 16677a18e39..99a831c20ee 100644
--- 
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java
+++ 
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java
@@ -33,7 +33,8 @@ import org.slf4j.LoggerFactory;
  * <p>Flow control with the underlying {@link CallStreamObserver} is handled 
with a {@link Phaser}
  * which waits for advancement of the phase if the {@link CallStreamObserver} 
is not ready. Creator
  * is expected to advance the {@link Phaser} whenever the underlying {@link 
CallStreamObserver}
- * becomes ready.
+ * becomes ready. If the {@link Phaser} is terminated, {@link 
DirectStreamObserver<T>.onNext(T)}
+ * will no longer wait for the {@link CallStreamObserver} to become ready.
  */
 @ThreadSafe
 public final class DirectStreamObserver<T> implements StreamObserver<T> {
@@ -70,24 +71,27 @@ public final class DirectStreamObserver<T> implements 
StreamObserver<T> {
     synchronized (lock) {
       if (++numMessages >= maxMessagesBeforeCheck) {
         numMessages = 0;
-        int waitTime = 1;
-        int totalTimeWaited = 0;
+        int waitSeconds = 1;
+        int totalSecondsWaited = 0;
         int phase = phaser.getPhase();
         // Record the initial phase in case we are in the inbound gRPC thread 
where the phase won't
         // advance.
         int initialPhase = phase;
-        while (!outboundObserver.isReady()) {
+        // A negative phase indicates that the phaser is terminated.
+        while (phase >= 0 && !outboundObserver.isReady()) {
           try {
-            phase = phaser.awaitAdvanceInterruptibly(phase, waitTime, 
TimeUnit.SECONDS);
+            phase = phaser.awaitAdvanceInterruptibly(phase, waitSeconds, 
TimeUnit.SECONDS);
           } catch (TimeoutException e) {
-            totalTimeWaited += waitTime;
-            waitTime = waitTime * 2;
+            totalSecondsWaited += waitSeconds;
+            // Double the backoff for re-evaluating the isReady bit up to a 
maximum of once per
+            // minute. This bounds the waiting if the onReady callback is not 
called as expected.
+            waitSeconds = Math.min(waitSeconds * 2, 60);
           } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new RuntimeException(e);
           }
         }
-        if (totalTimeWaited > 0) {
+        if (totalSecondsWaited > 0) {
           // If the phase didn't change, this means that the installed onReady 
callback had not
           // been invoked.
           if (initialPhase == phase) {
@@ -95,12 +99,12 @@ public final class DirectStreamObserver<T> implements 
StreamObserver<T> {
                 "Output channel stalled for {}s, outbound thread {}. See: "
                     + "https://issues.apache.org/jira/browse/BEAM-4280 for the 
history for "
                     + "this issue.",
-                totalTimeWaited,
+                totalSecondsWaited,
                 Thread.currentThread().getName());
           } else {
             LOG.debug(
                 "Output channel stalled for {}s, outbound thread {}.",
-                totalTimeWaited,
+                totalSecondsWaited,
                 Thread.currentThread().getName());
           }
         }
diff --git 
a/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/logging/BeamFnLoggingClientBenchmark.java
 
b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/logging/BeamFnLoggingClientBenchmark.java
index d7c18059dc2..1a5558ed670 100644
--- 
a/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/logging/BeamFnLoggingClientBenchmark.java
+++ 
b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/logging/BeamFnLoggingClientBenchmark.java
@@ -98,7 +98,7 @@ public class BeamFnLoggingClientBenchmark {
                 .build();
         server.start();
         loggingClient =
-            new BeamFnLoggingClient(
+            BeamFnLoggingClient.createAndStart(
                 PipelineOptionsFactory.create(),
                 apiServiceDescriptor,
                 managedChannelFactory::forDescriptor);
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index 59d21f25391..0d25137beef 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -20,6 +20,7 @@ package org.apache.beam.fn.harness;
 import java.util.Collections;
 import java.util.EnumMap;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Function;
 import javax.annotation.Nullable;
@@ -228,7 +229,7 @@ public class FnHarness {
     // The logging client variable is not used per se, but during its lifetime 
(until close()) it
     // intercepts logging and sends it to the logging service.
     try (BeamFnLoggingClient logging =
-        new BeamFnLoggingClient(
+        BeamFnLoggingClient.createAndStart(
             options, loggingApiServiceDescriptor, 
channelFactory::forDescriptor)) {
       LOG.info("Fn Harness started");
       // Register standard file systems.
@@ -353,11 +354,13 @@ public class FnHarness {
               outboundObserverFactory,
               executorService,
               handlers);
-      control.waitForTermination();
+      CompletableFuture.anyOf(control.terminationFuture(), 
logging.terminationFuture()).get();
       if (beamFnStatusClient != null) {
         beamFnStatusClient.close();
       }
       processBundleHandler.shutdown();
+    } catch (Exception e) {
+      System.out.println("Shutting down harness due to exception: " + 
e.toString());
     } finally {
       System.out.println("Shutting SDK harness down.");
       executionStateSampler.stop();
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
index ca4428a3462..ffac6d7aa98 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
@@ -21,7 +21,6 @@ import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Thro
 
 import java.util.EnumMap;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import org.apache.beam.fn.harness.logging.BeamFnLoggingMDC;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
@@ -139,8 +138,8 @@ public class BeamFnControlClient {
   }
 
   /** This method blocks until the control stream has completed. */
-  public void waitForTermination() throws InterruptedException, 
ExecutionException {
-    onFinish.get();
+  public CompletableFuture<Object> terminationFuture() {
+    return onFinish;
   }
 
   public BeamFnApi.InstructionResponse delegateOnInstructionRequestType(
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
index 68ad6727c4d..3b76f7fc4d0 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
@@ -17,22 +17,21 @@
  */
 package org.apache.beam.fn.harness.logging;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
 import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables.getStackTraceAsString;
 
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.Phaser;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
 import java.util.logging.Formatter;
 import java.util.logging.Handler;
@@ -47,6 +46,8 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.LogEntry;
 import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
 import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.sdk.fn.stream.AdvancingPhaser;
+import org.apache.beam.sdk.fn.stream.DirectStreamObserver;
 import org.apache.beam.sdk.options.ExecutorOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.SdkHarnessOptions;
@@ -54,13 +55,15 @@ import 
org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.Struct;
 import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.Timestamp;
 import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.Value;
 import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ManagedChannel;
-import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Status;
 import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.CallStreamObserver;
 import 
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ClientCallStreamObserver;
 import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ClientResponseObserver;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.initialization.qual.UnderInitialization;
 import org.checkerframework.checker.nullness.qual.Nullable;
+import org.checkerframework.checker.nullness.qual.RequiresNonNull;
 import org.slf4j.MDC;
 
 /**
@@ -76,6 +79,10 @@ public class BeamFnLoggingClient implements AutoCloseable {
           .put(Level.FINE, BeamFnApi.LogEntry.Severity.Enum.DEBUG)
           .put(Level.FINEST, BeamFnApi.LogEntry.Severity.Enum.TRACE)
           .build();
+  private static final ImmutableMap<BeamFnApi.LogEntry.Severity.Enum, Level> 
REVERSE_LOG_LEVEL_MAP =
+      ImmutableMap.<BeamFnApi.LogEntry.Severity.Enum, Level>builder()
+          .putAll(LOG_LEVEL_MAP.asMultimap().inverse().entries())
+          .build();
 
   private static final Formatter DEFAULT_FORMATTER = new SimpleFormatter();
 
@@ -87,28 +94,98 @@ public class BeamFnLoggingClient implements AutoCloseable {
 
   private static final Object COMPLETED = new Object();
 
+  private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
+
+  private final StreamWriter streamWriter;
+
+  private final LogRecordHandler logRecordHandler;
+
   /* We need to store a reference to the configured loggers so that they are 
not
    * garbage collected. java.util.logging only has weak references to the 
loggers
    * so if they are garbage collected, our hierarchical configuration will be 
lost. */
-  private final Collection<Logger> configuredLoggers;
-  private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
-  private final ManagedChannel channel;
-  private final CallStreamObserver<BeamFnApi.LogEntry.List> outboundObserver;
-  private final LogControlObserver inboundObserver;
-  private final LogRecordHandler logRecordHandler;
-  private final CompletableFuture<Object> inboundObserverCompletion;
-  private final Phaser phaser;
+  private final Collection<Logger> configuredLoggers = new ArrayList<>();
+
   private @Nullable ProcessBundleHandler processBundleHandler;
 
-  public BeamFnLoggingClient(
+  private final BlockingQueue<LogEntry> bufferedLogEntries =
+      new ArrayBlockingQueue<>(MAX_BUFFERED_LOG_ENTRY_COUNT);
+
+  /**
+   * Future that completes with the background thread consuming logs from 
bufferedLogEntries.
+   * Completes with COMPLETED or with exception.
+   */
+  private final CompletableFuture<?> bufferedLogConsumer;
+
+  /**
+   * Safe object publishing is not required since we only care if the thread 
that set this field is
+   * equal to the thread also attempting to add a log entry.
+   */
+  private @Nullable Thread logEntryHandlerThread = null;
+
+  public static BeamFnLoggingClient createAndStart(
       PipelineOptions options,
       Endpoints.ApiServiceDescriptor apiServiceDescriptor,
       Function<Endpoints.ApiServiceDescriptor, ManagedChannel> channelFactory) 
{
+    BeamFnLoggingClient client =
+        new BeamFnLoggingClient(
+            apiServiceDescriptor,
+            new StreamWriter(channelFactory.apply(apiServiceDescriptor)),
+            options.as(SdkHarnessOptions.class).getLogMdc(),
+            options.as(ExecutorOptions.class).getScheduledExecutorService(),
+            options.as(SdkHarnessOptions.class));
+    return client;
+  }
+
+  private BeamFnLoggingClient(
+      Endpoints.ApiServiceDescriptor apiServiceDescriptor,
+      StreamWriter streamWriter,
+      boolean logMdc,
+      ScheduledExecutorService executorService,
+      SdkHarnessOptions options) {
     this.apiServiceDescriptor = apiServiceDescriptor;
-    this.inboundObserverCompletion = new CompletableFuture<>();
-    this.phaser = new Phaser(1);
-    this.channel = channelFactory.apply(apiServiceDescriptor);
+    this.streamWriter = streamWriter;
+    this.logRecordHandler = new LogRecordHandler(logMdc);
+    logRecordHandler.setLevel(Level.ALL);
+    logRecordHandler.setFormatter(DEFAULT_FORMATTER);
 
+    CompletableFuture<Object> started = new CompletableFuture<>();
+    this.bufferedLogConsumer =
+        CompletableFuture.supplyAsync(
+            () -> {
+              try {
+                logEntryHandlerThread = Thread.currentThread();
+                installLogging(options);
+                started.complete(COMPLETED);
+
+                // Logging which occurs in this thread will attempt to publish 
log entries into the
+                // above handler which should never block if the queue is full 
otherwise
+                // this thread will get stuck.
+                streamWriter.drainQueueToStream(bufferedLogEntries);
+              } finally {
+                restoreLoggers();
+                // Now that loggers are restored, do a final flush of any 
buffered logs
+                // in case they help with understanding above failures.
+                flushFinalLogs();
+              }
+              return COMPLETED;
+            },
+            executorService);
+    try {
+      // Wait for the thread to be running and log handlers installed or an 
error with the thread
+      // that is supposed to be consuming logs.
+      CompletableFuture.anyOf(this.bufferedLogConsumer, started).get();
+    } catch (ExecutionException e) {
+      throw new RuntimeException("Error starting background log thread " + 
e.getCause());
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    }
+  }
+
+  @RequiresNonNull("logRecordHandler")
+  @RequiresNonNull("configuredLoggers")
+  private void installLogging(
+      @UnderInitialization BeamFnLoggingClient this, SdkHarnessOptions 
options) {
     // Reset the global log manager, get the root logger and remove the 
default log handlers.
     LogManager logManager = LogManager.getLogManager();
     logManager.reset();
@@ -117,50 +194,205 @@ public class BeamFnLoggingClient implements 
AutoCloseable {
       rootLogger.removeHandler(handler);
     }
     // configure loggers from default sdk harness log level and log level 
overrides
-    this.configuredLoggers =
-        
SdkHarnessOptions.getConfiguredLoggerFromOptions(options.as(SdkHarnessOptions.class));
+    
this.configuredLoggers.addAll(SdkHarnessOptions.getConfiguredLoggerFromOptions(options));
 
-    BeamFnLoggingGrpc.BeamFnLoggingStub stub = 
BeamFnLoggingGrpc.newStub(channel);
-    inboundObserver = new LogControlObserver();
-    logRecordHandler = new LogRecordHandler();
-    logRecordHandler.setLevel(Level.ALL);
-    logRecordHandler.setFormatter(DEFAULT_FORMATTER);
-    
logRecordHandler.executeOn(options.as(ExecutorOptions.class).getScheduledExecutorService());
-    boolean logMdc = options.as(SdkHarnessOptions.class).getLogMdc();
-    logRecordHandler.setLogMdc(logMdc);
-    QuotaEvent.setEnabled(logMdc);
-    outboundObserver = (CallStreamObserver<BeamFnApi.LogEntry.List>) 
stub.logging(inboundObserver);
+    // Install a handler that queues to the buffer read by the background 
thread.
     rootLogger.addHandler(logRecordHandler);
   }
 
-  public void setProcessBundleHandler(ProcessBundleHandler 
processBundleHandler) {
-    this.processBundleHandler = processBundleHandler;
+  private static class StreamWriter {
+    private final ManagedChannel channel;
+    private final StreamObserver<BeamFnApi.LogEntry.List> outboundObserver;
+    private final LogControlObserver inboundObserver;
+
+    private final CompletableFuture<Object> inboundObserverCompletion;
+    private final AdvancingPhaser streamPhaser;
+
+    // Used to note we are attempting to close the logging client and to 
gracefully drain the
+    // current logs to the stream.
+    private final CompletableFuture<Object> softClosing = new 
CompletableFuture<>();
+
+    public StreamWriter(ManagedChannel channel) {
+      this.inboundObserverCompletion = new CompletableFuture<>();
+      this.streamPhaser = new AdvancingPhaser(1);
+      this.channel = channel;
+
+      BeamFnLoggingGrpc.BeamFnLoggingStub stub = 
BeamFnLoggingGrpc.newStub(channel);
+      this.inboundObserver = new LogControlObserver();
+      this.outboundObserver =
+          new DirectStreamObserver<BeamFnApi.LogEntry.List>(
+              this.streamPhaser,
+              (CallStreamObserver<BeamFnApi.LogEntry.List>) 
stub.logging(inboundObserver));
+    }
+
+    public void drainQueueToStream(BlockingQueue<BeamFnApi.LogEntry> 
bufferedLogEntries) {
+      Throwable thrown = null;
+      try {
+        List<BeamFnApi.LogEntry> additionalLogEntries =
+            new ArrayList<>(MAX_BUFFERED_LOG_ENTRY_COUNT);
+        // As long as we haven't yet terminated the stream, then attempt to 
send on it.
+        while (!streamPhaser.isTerminated()) {
+          // We wait for a limited period so that we can evaluate if the 
stream closed or if
+          // we are gracefully closing the client.
+          BeamFnApi.LogEntry logEntry = bufferedLogEntries.poll(1, SECONDS);
+          if (logEntry == null) {
+            if (softClosing.isDone()) {
+              break;
+            }
+            continue;
+          }
+
+          // Batch together as many log messages as possible that are held 
within the buffer
+          BeamFnApi.LogEntry.List.Builder builder =
+              BeamFnApi.LogEntry.List.newBuilder().addLogEntries(logEntry);
+          bufferedLogEntries.drainTo(additionalLogEntries);
+          builder.addAllLogEntries(additionalLogEntries);
+          outboundObserver.onNext(builder.build());
+          additionalLogEntries.clear();
+        }
+        if (inboundObserverCompletion.isDone()) {
+          try {
+            // If the inbound observer failed with an exception, get() will 
throw an
+            // ExecutionException.
+            inboundObserverCompletion.get();
+            // Otherwise it is an error for the server to close the stream 
before we closed our end.
+            throw new IllegalStateException(
+                "Logging stream terminated unexpectedly with success before it 
was closed by the client.");
+          } catch (ExecutionException e) {
+            throw new IllegalStateException(
+                "Logging stream terminated unexpectedly before it was closed 
by the client with error: "
+                    + e.getCause());
+          } catch (InterruptedException e) {
+            // Should never happen because of the isDone check.
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e);
+          }
+        }
+      } catch (Throwable t) {
+        thrown = t;
+        throw new RuntimeException(t);
+      } finally {
+        if (thrown == null) {
+          outboundObserver.onCompleted();
+        } else {
+          outboundObserver.onError(thrown);
+        }
+        channel.shutdown();
+        boolean shutdownFinished = false;
+        try {
+          shutdownFinished = channel.awaitTermination(10, SECONDS);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        } finally {
+          if (!shutdownFinished) {
+            channel.shutdownNow();
+          }
+        }
+      }
+    }
+
+    public void softClose() {
+      softClosing.complete(COMPLETED);
+    }
+
+    public void hardClose() {
+      streamPhaser.forceTermination();
+    }
+
+    private class LogControlObserver
+        implements ClientResponseObserver<BeamFnApi.LogEntry, 
BeamFnApi.LogControl> {
+
+      @Override
+      public void beforeStart(ClientCallStreamObserver<BeamFnApi.LogEntry> 
requestStream) {
+        requestStream.setOnReadyHandler(streamPhaser::arrive);
+      }
+
+      @Override
+      public void onNext(BeamFnApi.LogControl value) {}
+
+      @Override
+      public void onError(Throwable t) {
+        inboundObserverCompletion.completeExceptionally(t);
+        hardClose();
+      }
+
+      @Override
+      public void onCompleted() {
+        inboundObserverCompletion.complete(COMPLETED);
+        hardClose();
+      }
+    }
   }
 
   @Override
   public void close() throws Exception {
+    checkNotNull(bufferedLogConsumer, "BeamFnLoggingClient not fully started");
     try {
-      // Reset the logging configuration to what it is at startup
-      for (Logger logger : configuredLoggers) {
-        logger.setLevel(null);
+      try {
+        // Wait for buffered log messages to drain for a short period.
+        streamWriter.softClose();
+        bufferedLogConsumer.get(10, SECONDS);
+      } catch (TimeoutException e) {
+        // Terminate the phaser that we block on when attempting to honor flow 
control on the
+        // outbound observer.
+        streamWriter.hardClose();
+        // Wait for the sending thread to exit.
+        bufferedLogConsumer.get();
       }
-      configuredLoggers.clear();
-      LogManager.getLogManager().readConfiguration();
+    } catch (ExecutionException e) {
+      if (e.getCause() instanceof Exception) {
+        throw (Exception) e.getCause();
+      }
+      throw e;
+    }
+  }
 
-      // Hang up with the server
-      logRecordHandler.close();
+  public void setProcessBundleHandler(ProcessBundleHandler 
processBundleHandler) {
+    this.processBundleHandler = processBundleHandler;
+  }
+
+  // Reset the logging configuration to what it is at startup.
+  @RequiresNonNull("configuredLoggers")
+  @RequiresNonNull("logRecordHandler")
+  private void restoreLoggers(@UnderInitialization BeamFnLoggingClient this) {
+    for (Logger logger : configuredLoggers) {
+      logger.setLevel(null);
+      // Explicitly remove the installed handler in case reading the 
configuration fails.
+      logger.removeHandler(logRecordHandler);
+    }
+    configuredLoggers.clear();
+    
LogManager.getLogManager().getLogger(ROOT_LOGGER_NAME).removeHandler(logRecordHandler);
+    try {
+      LogManager.getLogManager().readConfiguration();
+    } catch (IOException e) {
+      System.out.print("Unable to restore log managers from configuration: " + 
e.toString());
+    }
+  }
 
-      // Wait for the server to hang up
-      inboundObserverCompletion.get();
-    } finally {
-      // Shut the channel down
-      channel.shutdown();
-      if (!channel.awaitTermination(10, TimeUnit.SECONDS)) {
-        channel.shutdownNow();
+  @RequiresNonNull("bufferedLogEntries")
+  void flushFinalLogs(@UnderInitialization BeamFnLoggingClient this) {
+    List<BeamFnApi.LogEntry> finalLogEntries = new 
ArrayList<>(MAX_BUFFERED_LOG_ENTRY_COUNT);
+    bufferedLogEntries.drainTo(finalLogEntries);
+    for (BeamFnApi.LogEntry logEntry : finalLogEntries) {
+      LogRecord logRecord =
+          new LogRecord(REVERSE_LOG_LEVEL_MAP.get(logEntry.getSeverity()), 
logEntry.getMessage());
+      logRecord.setLoggerName(logEntry.getLogLocation());
+      logRecord.setMillis(
+          logEntry.getTimestamp().getSeconds() * 1000
+              + logEntry.getTimestamp().getNanos() / 1_000_000);
+      logRecord.setThreadID(Integer.parseInt(logEntry.getThread()));
+      if (!logEntry.getTrace().isEmpty()) {
+        logRecord.setThrown(new Throwable(logEntry.getTrace()));
       }
+      LogManager.getLogManager().getLogger(ROOT_LOGGER_NAME).log(logRecord);
     }
   }
 
+  public CompletableFuture<?> terminationFuture() {
+    checkNotNull(bufferedLogConsumer, "BeamFnLoggingClient not fully started");
+    return bufferedLogConsumer;
+  }
+
   @Override
   public String toString() {
     return MoreObjects.toStringHelper(BeamFnLoggingClient.class)
@@ -168,24 +400,11 @@ public class BeamFnLoggingClient implements AutoCloseable 
{
         .toString();
   }
 
-  private class LogRecordHandler extends Handler implements Runnable {
-    private final BlockingQueue<LogEntry> bufferedLogEntries =
-        new ArrayBlockingQueue<>(MAX_BUFFERED_LOG_ENTRY_COUNT);
-    private @Nullable Future<?> bufferedLogWriter = null;
-    /**
-     * Safe object publishing is not required since we only care if the thread 
that set this field
-     * is equal to the thread also attempting to add a log entry.
-     */
-    private @Nullable Thread logEntryHandlerThread = null;
-
-    private boolean logMdc = true;
-
-    private void setLogMdc(boolean value) {
-      logMdc = value;
-    }
+  private class LogRecordHandler extends Handler {
+    private final boolean logMdc;
 
-    private void executeOn(ExecutorService executorService) {
-      bufferedLogWriter = executorService.submit(this);
+    LogRecordHandler(boolean logMdc) {
+      this.logMdc = logMdc;
     }
 
     @Override
@@ -242,7 +461,6 @@ public class BeamFnLoggingClient implements AutoCloseable {
       }
 
       // The thread that sends log records should never perform a blocking 
publish and
-      // only insert log records best effort.
       if (Thread.currentThread() != logEntryHandlerThread) {
         // Blocks caller till enough space exists to publish this log entry.
         try {
@@ -257,114 +475,14 @@ public class BeamFnLoggingClient implements 
AutoCloseable {
       }
     }
 
-    @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_BAD_PRACTICE")
-    private void dropIfBufferFull(BeamFnApi.LogEntry logEntry) {
-      bufferedLogEntries.offer(logEntry);
-    }
-
-    @Override
-    public void run() {
-      // Logging which occurs in this thread will attempt to publish log 
entries into the
-      // above handler which should never block if the queue is full otherwise
-      // this thread will get stuck.
-      logEntryHandlerThread = Thread.currentThread();
-
-      List<BeamFnApi.LogEntry> additionalLogEntries = new 
ArrayList<>(MAX_BUFFERED_LOG_ENTRY_COUNT);
-      Throwable thrown = null;
-      try {
-        // As long as we haven't yet terminated, then attempt
-        while (!phaser.isTerminated()) {
-          // Try to wait for a message to show up.
-          BeamFnApi.LogEntry logEntry = bufferedLogEntries.poll(1, 
TimeUnit.SECONDS);
-          // If we don't have a message then we need to try this loop again.
-          if (logEntry == null) {
-            continue;
-          }
-
-          // Attempt to honor flow control. Phaser termination causes await 
advance to return
-          // immediately.
-          int phase = phaser.getPhase();
-          if (!outboundObserver.isReady()) {
-            phaser.awaitAdvance(phase);
-          }
-
-          // Batch together as many log messages as possible that are held 
within the buffer
-          BeamFnApi.LogEntry.List.Builder builder =
-              BeamFnApi.LogEntry.List.newBuilder().addLogEntries(logEntry);
-          bufferedLogEntries.drainTo(additionalLogEntries);
-          builder.addAllLogEntries(additionalLogEntries);
-          outboundObserver.onNext(builder.build());
-          additionalLogEntries.clear();
-        }
-
-        // Perform one more final check to see if there are any log entries to 
guarantee that
-        // if a log entry was added on the thread performing termination that 
we will send it.
-        bufferedLogEntries.drainTo(additionalLogEntries);
-        if (!additionalLogEntries.isEmpty()) {
-          outboundObserver.onNext(
-              
BeamFnApi.LogEntry.List.newBuilder().addAllLogEntries(additionalLogEntries).build());
-        }
-      } catch (Throwable t) {
-        thrown = t;
-      }
-      if (thrown != null) {
-        outboundObserver.onError(
-            
Status.INTERNAL.withDescription(getStackTraceAsString(thrown)).asException());
-        throw new IllegalStateException(thrown);
-      } else {
-        outboundObserver.onCompleted();
-      }
+    private boolean dropIfBufferFull(BeamFnApi.LogEntry logEntry) {
+      return bufferedLogEntries.offer(logEntry);
     }
 
     @Override
     public void flush() {}
 
     @Override
-    public synchronized void close() {
-      // If we are done, then a previous caller has already shutdown the queue 
processing thread
-      // hence we don't need to do it again.
-      if (phaser.isTerminated()) {
-        return;
-      }
-
-      // Terminate the phaser that we block on when attempting to honor flow 
control on the
-      // outbound observer.
-      phaser.forceTermination();
-
-      if (bufferedLogWriter != null) {
-        try {
-          bufferedLogWriter.get();
-        } catch (CancellationException e) {
-          // Ignore cancellations
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new RuntimeException(e);
-        } catch (ExecutionException e) {
-          throw new RuntimeException(e);
-        }
-      }
-    }
-  }
-
-  private class LogControlObserver
-      implements ClientResponseObserver<BeamFnApi.LogEntry, 
BeamFnApi.LogControl> {
-
-    @Override
-    public void beforeStart(ClientCallStreamObserver<BeamFnApi.LogEntry> 
requestStream) {
-      requestStream.setOnReadyHandler(phaser::arrive);
-    }
-
-    @Override
-    public void onNext(BeamFnApi.LogControl value) {}
-
-    @Override
-    public void onError(Throwable t) {
-      inboundObserverCompletion.completeExceptionally(t);
-    }
-
-    @Override
-    public void onCompleted() {
-      inboundObserverCompletion.complete(COMPLETED);
-    }
+    public synchronized void close() {}
   }
 }
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
index ad763a58887..5157e124b31 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
@@ -166,7 +166,7 @@ public class BeamFnControlClientTest {
       // Ensure that the server completing the stream translates to the 
completable future
       // being completed allowing for a successful shutdown of the client.
       outboundServerObserver.onCompleted();
-      client.waitForTermination();
+      client.terminationFuture().get();
     } finally {
       server.shutdownNow();
     }
@@ -236,7 +236,7 @@ public class BeamFnControlClientTest {
 
       // Ensure that the client shuts down when an Error is thrown from the 
harness
       try {
-        client.waitForTermination();
+        client.terminationFuture().get();
         throw new IllegalStateException("The future should have terminated 
with an error");
       } catch (ExecutionException errorWrapper) {
         assertThat(errorWrapper.getCause().getMessage(), containsString("Test 
Error"));
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
index 66b9246d619..1fd8e249dd0 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
@@ -28,6 +28,7 @@ import static org.junit.Assert.assertTrue;
 import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Phaser;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.logging.Handler;
@@ -44,7 +45,13 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.Struct;
 import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.Timestamp;
 import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.Value;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.CallOptions;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Channel;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ClientCall;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ClientInterceptor;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ForwardingClientCall;
 import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.MethodDescriptor;
 import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Server;
 import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Status;
 import 
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.inprocess.InProcessChannelBuilder;
@@ -157,7 +164,7 @@ public class BeamFnLoggingClientTest {
     try {
 
       BeamFnLoggingClient client =
-          new BeamFnLoggingClient(
+          BeamFnLoggingClient.createAndStart(
               PipelineOptionsFactory.fromArgs(
                       new String[] {
                         "--defaultSdkHarnessLogLevel=OFF",
@@ -223,6 +230,12 @@ public class BeamFnLoggingClientTest {
                     values.addAll(logEntries.getLogEntriesList()))
             .build();
 
+    // Keep a strong reference to the loggers. Otherwise the call to 
client.close()
+    // removes the only reference and the logger may get GC'd before the 
assertions (BEAM-4136).
+    Logger rootLogger = null;
+    Logger configuredLogger = null;
+    Phaser streamBlocker = new Phaser(1);
+
     Endpoints.ApiServiceDescriptor apiServiceDescriptor =
         Endpoints.ApiServiceDescriptor.newBuilder()
             .setUrl(this.getClass().getName() + "-" + 
UUID.randomUUID().toString())
@@ -234,6 +247,9 @@ public class BeamFnLoggingClientTest {
                   @Override
                   public StreamObserver<BeamFnApi.LogEntry.List> logging(
                       StreamObserver<BeamFnApi.LogControl> outboundObserver) {
+                    // Block before returning an error on the stream so that 
we can observe the
+                    // loggers before they are reset.
+                    streamBlocker.awaitAdvance(1);
                     outboundServerObserver.set(outboundObserver);
                     outboundObserver.onError(
                         Status.INTERNAL.withDescription("TEST 
ERROR").asException());
@@ -244,15 +260,9 @@ public class BeamFnLoggingClientTest {
     server.start();
 
     ManagedChannel channel = 
InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build();
-
-    // Keep a strong reference to the loggers. Otherwise the call to 
client.close()
-    // removes the only reference and the logger may get GC'd before the 
assertions (BEAM-4136).
-    Logger rootLogger = null;
-    Logger configuredLogger = null;
-
     try {
       BeamFnLoggingClient client =
-          new BeamFnLoggingClient(
+          BeamFnLoggingClient.createAndStart(
               PipelineOptionsFactory.fromArgs(
                       new String[] {
                         "--defaultSdkHarnessLogLevel=OFF",
@@ -261,15 +271,16 @@ public class BeamFnLoggingClientTest {
                   .create(),
               apiServiceDescriptor,
               (Endpoints.ApiServiceDescriptor descriptor) -> channel);
-
+      // The loggers should be installed before createAndStart returns.
       rootLogger = LogManager.getLogManager().getLogger("");
       configuredLogger = 
LogManager.getLogManager().getLogger("ConfiguredLogger");
-
+      // Allow the stream to return with an error.
+      assertEquals(0, streamBlocker.arrive());
       thrown.expectMessage("TEST ERROR");
       client.close();
     } finally {
       assertNotNull("rootLogger should be initialized before exception", 
rootLogger);
-      assertNotNull("configuredLogger should be initialized before exception", 
rootLogger);
+      assertNotNull("configuredLogger should be initialized before exception", 
configuredLogger);
 
       // Verify that after close, log levels are reset.
       assertEquals(Level.INFO, rootLogger.getLevel());
@@ -311,11 +322,12 @@ public class BeamFnLoggingClientTest {
                 })
             .build();
     server.start();
+    thrown.expectMessage("Logging stream terminated unexpectedly");
 
     ManagedChannel channel = 
InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build();
     try {
       BeamFnLoggingClient client =
-          new BeamFnLoggingClient(
+          BeamFnLoggingClient.createAndStart(
               PipelineOptionsFactory.fromArgs(
                       new String[] {
                         "--defaultSdkHarnessLogLevel=OFF",
@@ -329,7 +341,6 @@ public class BeamFnLoggingClientTest {
       // removes the only reference and the logger may get GC'd before the 
assertions (BEAM-4136).
       Logger rootLogger = LogManager.getLogManager().getLogger("");
       Logger configuredLogger = 
LogManager.getLogManager().getLogger("ConfiguredLogger");
-
       client.close();
 
       // Verify that after close, log levels are reset.
@@ -341,4 +352,167 @@ public class BeamFnLoggingClientTest {
       server.shutdownNow();
     }
   }
+
+  @Test
+  public void testClosableWhenBlockingForOnReady() throws Exception {
+    BeamFnLoggingMDC.setInstructionId("instruction-1");
+    Collection<BeamFnApi.LogEntry> values = new ConcurrentLinkedQueue<>();
+    AtomicReference<StreamObserver<BeamFnApi.LogControl>> 
outboundServerObserver =
+        new AtomicReference<>();
+
+    final AtomicBoolean elementsAllowed = new AtomicBoolean(true);
+    CallStreamObserver<BeamFnApi.LogEntry.List> inboundServerObserver =
+        TestStreams.withOnNext(
+                (BeamFnApi.LogEntry.List logEntries) ->
+                    values.addAll(logEntries.getLogEntriesList()))
+            .build();
+
+    Endpoints.ApiServiceDescriptor apiServiceDescriptor =
+        Endpoints.ApiServiceDescriptor.newBuilder()
+            .setUrl(this.getClass().getName() + "-" + 
UUID.randomUUID().toString())
+            .build();
+    Server server =
+        InProcessServerBuilder.forName(apiServiceDescriptor.getUrl())
+            .addService(
+                new BeamFnLoggingGrpc.BeamFnLoggingImplBase() {
+                  @Override
+                  public StreamObserver<BeamFnApi.LogEntry.List> logging(
+                      StreamObserver<BeamFnApi.LogControl> outboundObserver) {
+                    outboundServerObserver.set(outboundObserver);
+                    return inboundServerObserver;
+                  }
+                })
+            .build();
+    server.start();
+
+    ManagedChannel channel =
+        InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl())
+            .intercept(
+                new ClientInterceptor() {
+                  @Override
+                  public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
+                      MethodDescriptor<ReqT, RespT> method, CallOptions 
callOptions, Channel next) {
+                    ClientCall<ReqT, RespT> delegate = next.newCall(method, 
callOptions);
+                    return new 
ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
+                        delegate) {
+                      @Override
+                      public boolean isReady() {
+                        return elementsAllowed.get();
+                      }
+                    };
+                  }
+                })
+            .build();
+
+    // Keep a strong reference to the loggers. Otherwise the call to 
client.close()
+    // removes the only reference and the logger may get GC'd before the 
assertions (BEAM-4136).
+    Logger rootLogger = null;
+    Logger configuredLogger = null;
+
+    try {
+      BeamFnLoggingClient client =
+          BeamFnLoggingClient.createAndStart(
+              PipelineOptionsFactory.fromArgs(
+                      new String[] {
+                        "--defaultSdkHarnessLogLevel=OFF",
+                        "--sdkHarnessLogLevelOverrides={\"ConfiguredLogger\": 
\"DEBUG\"}"
+                      })
+                  .create(),
+              apiServiceDescriptor,
+              (Endpoints.ApiServiceDescriptor descriptor) -> channel);
+
+      rootLogger = LogManager.getLogManager().getLogger("");
+      configuredLogger = 
LogManager.getLogManager().getLogger("ConfiguredLogger");
+
+      long numEntries = 2000;
+      for (int i = 0; i < numEntries; ++i) {
+        configuredLogger.log(TEST_RECORD);
+      }
+      // Measure how long it takes all the logs to appear.
+      int sleepTime = 0;
+      while (values.size() < numEntries) {
+        ++sleepTime;
+        Thread.sleep(1);
+      }
+      // Attempt to enter the blocking state by pushing back on the stream, 
publishing records and
+      // then giving them time for it to block.
+      elementsAllowed.set(false);
+      for (int i = 0; i < numEntries; ++i) {
+        configuredLogger.log(TEST_RECORD);
+      }
+      Thread.sleep(sleepTime * 3);
+      // At this point, the background thread is either blocking as intended 
or the background
+      // thread hasn't yet observed all the input. In either case the test 
should pass.
+      assertTrue(values.size() < numEntries * 2);
+
+      client.close();
+
+      assertNotNull("rootLogger should be initialized before exception", 
rootLogger);
+      assertNotNull("configuredLogger should be initialized before exception", 
rootLogger);
+
+      // Verify that after stream terminates, log levels are reset.
+      assertEquals(Level.INFO, rootLogger.getLevel());
+      assertNull(configuredLogger.getLevel());
+
+      assertTrue(channel.isShutdown());
+    } finally {
+      server.shutdownNow();
+    }
+  }
+
+  @Test
+  public void testServerCloseNotifiesTermination() throws Exception {
+    BeamFnLoggingMDC.setInstructionId("instruction-1");
+    Collection<BeamFnApi.LogEntry> values = new ConcurrentLinkedQueue<>();
+    AtomicReference<StreamObserver<BeamFnApi.LogControl>> 
outboundServerObserver =
+        new AtomicReference<>();
+    CallStreamObserver<BeamFnApi.LogEntry.List> inboundServerObserver =
+        TestStreams.withOnNext(
+                (BeamFnApi.LogEntry.List logEntries) ->
+                    values.addAll(logEntries.getLogEntriesList()))
+            .build();
+
+    Endpoints.ApiServiceDescriptor apiServiceDescriptor =
+        Endpoints.ApiServiceDescriptor.newBuilder()
+            .setUrl(this.getClass().getName() + "-" + 
UUID.randomUUID().toString())
+            .build();
+    Server server =
+        InProcessServerBuilder.forName(apiServiceDescriptor.getUrl())
+            .addService(
+                new BeamFnLoggingGrpc.BeamFnLoggingImplBase() {
+                  @Override
+                  public StreamObserver<BeamFnApi.LogEntry.List> logging(
+                      StreamObserver<BeamFnApi.LogControl> outboundObserver) {
+                    outboundServerObserver.set(outboundObserver);
+                    outboundObserver.onCompleted();
+                    return inboundServerObserver;
+                  }
+                })
+            .build();
+    server.start();
+
+    ManagedChannel channel = 
InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build();
+    try {
+      BeamFnLoggingClient client =
+          BeamFnLoggingClient.createAndStart(
+              PipelineOptionsFactory.fromArgs(
+                      new String[] {
+                        "--defaultSdkHarnessLogLevel=OFF",
+                        "--sdkHarnessLogLevelOverrides={\"ConfiguredLogger\": 
\"DEBUG\"}"
+                      })
+                  .create(),
+              apiServiceDescriptor,
+              (Endpoints.ApiServiceDescriptor descriptor) -> channel);
+
+      thrown.expectMessage("Logging stream terminated unexpectedly");
+      client.terminationFuture().get();
+    } finally {
+      // Verify that after termination, log levels are reset.
+      assertEquals(Level.INFO, 
LogManager.getLogManager().getLogger("").getLevel());
+      
assertNull(LogManager.getLogManager().getLogger("ConfiguredLogger").getLevel());
+
+      assertTrue(channel.isShutdown());
+      server.shutdownNow();
+    }
+  }
 }


Reply via email to