This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5e942ae3790 Ensure that the BeamFnLoggingClient terminates process if
stream breaks (#25186)
5e942ae3790 is described below
commit 5e942ae3790bc95148413c43ab7e43a01a2d82ae
Author: Sam Whittle <[email protected]>
AuthorDate: Wed Jun 21 02:19:32 2023 +0200
Ensure that the BeamFnLoggingClient terminates process if stream breaks
(#25186)
* Ensure that a failure of the logging stream in BeamFnLoggingClient
does not cause logging to block indefinitely but instead triggers
SDK teardown.
* fix test
* fix racy test
* address comments
---
.../beam/sdk/fn/stream/DirectStreamObserver.java | 24 +-
.../jmh/logging/BeamFnLoggingClientBenchmark.java | 2 +-
.../java/org/apache/beam/fn/harness/FnHarness.java | 7 +-
.../fn/harness/control/BeamFnControlClient.java | 5 +-
.../fn/harness/logging/BeamFnLoggingClient.java | 454 +++++++++++++--------
.../harness/control/BeamFnControlClientTest.java | 4 +-
.../harness/logging/BeamFnLoggingClientTest.java | 200 ++++++++-
7 files changed, 497 insertions(+), 199 deletions(-)
diff --git
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java
index 16677a18e39..99a831c20ee 100644
---
a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java
+++
b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java
@@ -33,7 +33,8 @@ import org.slf4j.LoggerFactory;
* <p>Flow control with the underlying {@link CallStreamObserver} is handled
with a {@link Phaser}
* which waits for advancement of the phase if the {@link CallStreamObserver}
is not ready. Creator
* is expected to advance the {@link Phaser} whenever the underlying {@link
CallStreamObserver}
- * becomes ready.
+ * becomes ready. If the {@link Phaser} is terminated, {@link
DirectStreamObserver<T>.onNext(T)}
+ * will no longer wait for the {@link CallStreamObserver} to become ready.
*/
@ThreadSafe
public final class DirectStreamObserver<T> implements StreamObserver<T> {
@@ -70,24 +71,27 @@ public final class DirectStreamObserver<T> implements
StreamObserver<T> {
synchronized (lock) {
if (++numMessages >= maxMessagesBeforeCheck) {
numMessages = 0;
- int waitTime = 1;
- int totalTimeWaited = 0;
+ int waitSeconds = 1;
+ int totalSecondsWaited = 0;
int phase = phaser.getPhase();
// Record the initial phase in case we are in the inbound gRPC thread
where the phase won't
// advance.
int initialPhase = phase;
- while (!outboundObserver.isReady()) {
+ // A negative phase indicates that the phaser is terminated.
+ while (phase >= 0 && !outboundObserver.isReady()) {
try {
- phase = phaser.awaitAdvanceInterruptibly(phase, waitTime,
TimeUnit.SECONDS);
+ phase = phaser.awaitAdvanceInterruptibly(phase, waitSeconds,
TimeUnit.SECONDS);
} catch (TimeoutException e) {
- totalTimeWaited += waitTime;
- waitTime = waitTime * 2;
+ totalSecondsWaited += waitSeconds;
+ // Double the backoff for re-evaluating the isReady bit up to a
maximum of once per
+ // minute. This bounds the waiting if the onReady callback is not
called as expected.
+ waitSeconds = Math.min(waitSeconds * 2, 60);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
- if (totalTimeWaited > 0) {
+ if (totalSecondsWaited > 0) {
// If the phase didn't change, this means that the installed onReady
callback had not
// been invoked.
if (initialPhase == phase) {
@@ -95,12 +99,12 @@ public final class DirectStreamObserver<T> implements
StreamObserver<T> {
"Output channel stalled for {}s, outbound thread {}. See: "
+ "https://issues.apache.org/jira/browse/BEAM-4280 for the
history for "
+ "this issue.",
- totalTimeWaited,
+ totalSecondsWaited,
Thread.currentThread().getName());
} else {
LOG.debug(
"Output channel stalled for {}s, outbound thread {}.",
- totalTimeWaited,
+ totalSecondsWaited,
Thread.currentThread().getName());
}
}
diff --git
a/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/logging/BeamFnLoggingClientBenchmark.java
b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/logging/BeamFnLoggingClientBenchmark.java
index d7c18059dc2..1a5558ed670 100644
---
a/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/logging/BeamFnLoggingClientBenchmark.java
+++
b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/logging/BeamFnLoggingClientBenchmark.java
@@ -98,7 +98,7 @@ public class BeamFnLoggingClientBenchmark {
.build();
server.start();
loggingClient =
- new BeamFnLoggingClient(
+ BeamFnLoggingClient.createAndStart(
PipelineOptionsFactory.create(),
apiServiceDescriptor,
managedChannelFactory::forDescriptor);
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index 59d21f25391..0d25137beef 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -20,6 +20,7 @@ package org.apache.beam.fn.harness;
import java.util.Collections;
import java.util.EnumMap;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import javax.annotation.Nullable;
@@ -228,7 +229,7 @@ public class FnHarness {
// The logging client variable is not used per se, but during its lifetime
(until close()) it
// intercepts logging and sends it to the logging service.
try (BeamFnLoggingClient logging =
- new BeamFnLoggingClient(
+ BeamFnLoggingClient.createAndStart(
options, loggingApiServiceDescriptor,
channelFactory::forDescriptor)) {
LOG.info("Fn Harness started");
// Register standard file systems.
@@ -353,11 +354,13 @@ public class FnHarness {
outboundObserverFactory,
executorService,
handlers);
- control.waitForTermination();
+ CompletableFuture.anyOf(control.terminationFuture(),
logging.terminationFuture()).get();
if (beamFnStatusClient != null) {
beamFnStatusClient.close();
}
processBundleHandler.shutdown();
+ } catch (Exception e) {
+ System.out.println("Shutting down harness due to exception: " +
e.toString());
} finally {
System.out.println("Shutting SDK harness down.");
executionStateSampler.stop();
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
index ca4428a3462..ffac6d7aa98 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
@@ -21,7 +21,6 @@ import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Thro
import java.util.EnumMap;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import org.apache.beam.fn.harness.logging.BeamFnLoggingMDC;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
@@ -139,8 +138,8 @@ public class BeamFnControlClient {
}
/** This method blocks until the control stream has completed. */
- public void waitForTermination() throws InterruptedException,
ExecutionException {
- onFinish.get();
+ public CompletableFuture<Object> terminationFuture() {
+ return onFinish;
}
public BeamFnApi.InstructionResponse delegateOnInstructionRequestType(
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
index 68ad6727c4d..3b76f7fc4d0 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
@@ -17,22 +17,21 @@
*/
package org.apache.beam.fn.harness.logging;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables.getStackTraceAsString;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.Phaser;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.logging.Formatter;
import java.util.logging.Handler;
@@ -47,6 +46,8 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.LogEntry;
import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.sdk.fn.stream.AdvancingPhaser;
+import org.apache.beam.sdk.fn.stream.DirectStreamObserver;
import org.apache.beam.sdk.options.ExecutorOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.SdkHarnessOptions;
@@ -54,13 +55,15 @@ import
org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.Struct;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.Timestamp;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.Value;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ManagedChannel;
-import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.CallStreamObserver;
import
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ClientCallStreamObserver;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ClientResponseObserver;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.initialization.qual.UnderInitialization;
import org.checkerframework.checker.nullness.qual.Nullable;
+import org.checkerframework.checker.nullness.qual.RequiresNonNull;
import org.slf4j.MDC;
/**
@@ -76,6 +79,10 @@ public class BeamFnLoggingClient implements AutoCloseable {
.put(Level.FINE, BeamFnApi.LogEntry.Severity.Enum.DEBUG)
.put(Level.FINEST, BeamFnApi.LogEntry.Severity.Enum.TRACE)
.build();
+ private static final ImmutableMap<BeamFnApi.LogEntry.Severity.Enum, Level>
REVERSE_LOG_LEVEL_MAP =
+ ImmutableMap.<BeamFnApi.LogEntry.Severity.Enum, Level>builder()
+ .putAll(LOG_LEVEL_MAP.asMultimap().inverse().entries())
+ .build();
private static final Formatter DEFAULT_FORMATTER = new SimpleFormatter();
@@ -87,28 +94,98 @@ public class BeamFnLoggingClient implements AutoCloseable {
private static final Object COMPLETED = new Object();
+ private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
+
+ private final StreamWriter streamWriter;
+
+ private final LogRecordHandler logRecordHandler;
+
/* We need to store a reference to the configured loggers so that they are
not
* garbage collected. java.util.logging only has weak references to the
loggers
* so if they are garbage collected, our hierarchical configuration will be
lost. */
- private final Collection<Logger> configuredLoggers;
- private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
- private final ManagedChannel channel;
- private final CallStreamObserver<BeamFnApi.LogEntry.List> outboundObserver;
- private final LogControlObserver inboundObserver;
- private final LogRecordHandler logRecordHandler;
- private final CompletableFuture<Object> inboundObserverCompletion;
- private final Phaser phaser;
+ private final Collection<Logger> configuredLoggers = new ArrayList<>();
+
private @Nullable ProcessBundleHandler processBundleHandler;
- public BeamFnLoggingClient(
+ private final BlockingQueue<LogEntry> bufferedLogEntries =
+ new ArrayBlockingQueue<>(MAX_BUFFERED_LOG_ENTRY_COUNT);
+
+ /**
+ * Future that completes with the background thread consuming logs from
bufferedLogEntries.
+ * Completes with COMPLETED or with exception.
+ */
+ private final CompletableFuture<?> bufferedLogConsumer;
+
+ /**
+ * Safe object publishing is not required since we only care if the thread
that set this field is
+ * equal to the thread also attempting to add a log entry.
+ */
+ private @Nullable Thread logEntryHandlerThread = null;
+
+ public static BeamFnLoggingClient createAndStart(
PipelineOptions options,
Endpoints.ApiServiceDescriptor apiServiceDescriptor,
Function<Endpoints.ApiServiceDescriptor, ManagedChannel> channelFactory)
{
+ BeamFnLoggingClient client =
+ new BeamFnLoggingClient(
+ apiServiceDescriptor,
+ new StreamWriter(channelFactory.apply(apiServiceDescriptor)),
+ options.as(SdkHarnessOptions.class).getLogMdc(),
+ options.as(ExecutorOptions.class).getScheduledExecutorService(),
+ options.as(SdkHarnessOptions.class));
+ return client;
+ }
+
+ private BeamFnLoggingClient(
+ Endpoints.ApiServiceDescriptor apiServiceDescriptor,
+ StreamWriter streamWriter,
+ boolean logMdc,
+ ScheduledExecutorService executorService,
+ SdkHarnessOptions options) {
this.apiServiceDescriptor = apiServiceDescriptor;
- this.inboundObserverCompletion = new CompletableFuture<>();
- this.phaser = new Phaser(1);
- this.channel = channelFactory.apply(apiServiceDescriptor);
+ this.streamWriter = streamWriter;
+ this.logRecordHandler = new LogRecordHandler(logMdc);
+ logRecordHandler.setLevel(Level.ALL);
+ logRecordHandler.setFormatter(DEFAULT_FORMATTER);
+ CompletableFuture<Object> started = new CompletableFuture<>();
+ this.bufferedLogConsumer =
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ logEntryHandlerThread = Thread.currentThread();
+ installLogging(options);
+ started.complete(COMPLETED);
+
+ // Logging which occurs in this thread will attempt to publish
log entries into the
+ // above handler which should never block if the queue is full
otherwise
+ // this thread will get stuck.
+ streamWriter.drainQueueToStream(bufferedLogEntries);
+ } finally {
+ restoreLoggers();
+ // Now that loggers are restored, do a final flush of any
buffered logs
+ // in case they help with understanding above failures.
+ flushFinalLogs();
+ }
+ return COMPLETED;
+ },
+ executorService);
+ try {
+ // Wait for the thread to be running and log handlers installed or an
error with the thread
+ // that is supposed to be consuming logs.
+ CompletableFuture.anyOf(this.bufferedLogConsumer, started).get();
+ } catch (ExecutionException e) {
+ throw new RuntimeException("Error starting background log thread " +
e.getCause());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+
+ @RequiresNonNull("logRecordHandler")
+ @RequiresNonNull("configuredLoggers")
+ private void installLogging(
+ @UnderInitialization BeamFnLoggingClient this, SdkHarnessOptions
options) {
// Reset the global log manager, get the root logger and remove the
default log handlers.
LogManager logManager = LogManager.getLogManager();
logManager.reset();
@@ -117,50 +194,205 @@ public class BeamFnLoggingClient implements
AutoCloseable {
rootLogger.removeHandler(handler);
}
// configure loggers from default sdk harness log level and log level
overrides
- this.configuredLoggers =
-
SdkHarnessOptions.getConfiguredLoggerFromOptions(options.as(SdkHarnessOptions.class));
+
this.configuredLoggers.addAll(SdkHarnessOptions.getConfiguredLoggerFromOptions(options));
- BeamFnLoggingGrpc.BeamFnLoggingStub stub =
BeamFnLoggingGrpc.newStub(channel);
- inboundObserver = new LogControlObserver();
- logRecordHandler = new LogRecordHandler();
- logRecordHandler.setLevel(Level.ALL);
- logRecordHandler.setFormatter(DEFAULT_FORMATTER);
-
logRecordHandler.executeOn(options.as(ExecutorOptions.class).getScheduledExecutorService());
- boolean logMdc = options.as(SdkHarnessOptions.class).getLogMdc();
- logRecordHandler.setLogMdc(logMdc);
- QuotaEvent.setEnabled(logMdc);
- outboundObserver = (CallStreamObserver<BeamFnApi.LogEntry.List>)
stub.logging(inboundObserver);
+ // Install a handler that queues to the buffer read by the background
thread.
rootLogger.addHandler(logRecordHandler);
}
- public void setProcessBundleHandler(ProcessBundleHandler
processBundleHandler) {
- this.processBundleHandler = processBundleHandler;
+ private static class StreamWriter {
+ private final ManagedChannel channel;
+ private final StreamObserver<BeamFnApi.LogEntry.List> outboundObserver;
+ private final LogControlObserver inboundObserver;
+
+ private final CompletableFuture<Object> inboundObserverCompletion;
+ private final AdvancingPhaser streamPhaser;
+
+ // Used to note we are attempting to close the logging client and to
gracefully drain the
+ // current logs to the stream.
+ private final CompletableFuture<Object> softClosing = new
CompletableFuture<>();
+
+ public StreamWriter(ManagedChannel channel) {
+ this.inboundObserverCompletion = new CompletableFuture<>();
+ this.streamPhaser = new AdvancingPhaser(1);
+ this.channel = channel;
+
+ BeamFnLoggingGrpc.BeamFnLoggingStub stub =
BeamFnLoggingGrpc.newStub(channel);
+ this.inboundObserver = new LogControlObserver();
+ this.outboundObserver =
+ new DirectStreamObserver<BeamFnApi.LogEntry.List>(
+ this.streamPhaser,
+ (CallStreamObserver<BeamFnApi.LogEntry.List>)
stub.logging(inboundObserver));
+ }
+
+ public void drainQueueToStream(BlockingQueue<BeamFnApi.LogEntry>
bufferedLogEntries) {
+ Throwable thrown = null;
+ try {
+ List<BeamFnApi.LogEntry> additionalLogEntries =
+ new ArrayList<>(MAX_BUFFERED_LOG_ENTRY_COUNT);
+ // As long as we haven't yet terminated the stream, then attempt to
send on it.
+ while (!streamPhaser.isTerminated()) {
+ // We wait for a limited period so that we can evaluate if the
stream closed or if
+ // we are gracefully closing the client.
+ BeamFnApi.LogEntry logEntry = bufferedLogEntries.poll(1, SECONDS);
+ if (logEntry == null) {
+ if (softClosing.isDone()) {
+ break;
+ }
+ continue;
+ }
+
+ // Batch together as many log messages as possible that are held
within the buffer
+ BeamFnApi.LogEntry.List.Builder builder =
+ BeamFnApi.LogEntry.List.newBuilder().addLogEntries(logEntry);
+ bufferedLogEntries.drainTo(additionalLogEntries);
+ builder.addAllLogEntries(additionalLogEntries);
+ outboundObserver.onNext(builder.build());
+ additionalLogEntries.clear();
+ }
+ if (inboundObserverCompletion.isDone()) {
+ try {
+ // If the inbound observer failed with an exception, get() will
throw an
+ // ExecutionException.
+ inboundObserverCompletion.get();
+ // Otherwise it is an error for the server to close the stream
before we closed our end.
+ throw new IllegalStateException(
+ "Logging stream terminated unexpectedly with success before it
was closed by the client.");
+ } catch (ExecutionException e) {
+ throw new IllegalStateException(
+ "Logging stream terminated unexpectedly before it was closed
by the client with error: "
+ + e.getCause());
+ } catch (InterruptedException e) {
+ // Should never happen because of the isDone check.
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+ } catch (Throwable t) {
+ thrown = t;
+ throw new RuntimeException(t);
+ } finally {
+ if (thrown == null) {
+ outboundObserver.onCompleted();
+ } else {
+ outboundObserver.onError(thrown);
+ }
+ channel.shutdown();
+ boolean shutdownFinished = false;
+ try {
+ shutdownFinished = channel.awaitTermination(10, SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ if (!shutdownFinished) {
+ channel.shutdownNow();
+ }
+ }
+ }
+ }
+
+ public void softClose() {
+ softClosing.complete(COMPLETED);
+ }
+
+ public void hardClose() {
+ streamPhaser.forceTermination();
+ }
+
+ private class LogControlObserver
+ implements ClientResponseObserver<BeamFnApi.LogEntry,
BeamFnApi.LogControl> {
+
+ @Override
+ public void beforeStart(ClientCallStreamObserver<BeamFnApi.LogEntry>
requestStream) {
+ requestStream.setOnReadyHandler(streamPhaser::arrive);
+ }
+
+ @Override
+ public void onNext(BeamFnApi.LogControl value) {}
+
+ @Override
+ public void onError(Throwable t) {
+ inboundObserverCompletion.completeExceptionally(t);
+ hardClose();
+ }
+
+ @Override
+ public void onCompleted() {
+ inboundObserverCompletion.complete(COMPLETED);
+ hardClose();
+ }
+ }
}
@Override
public void close() throws Exception {
+ checkNotNull(bufferedLogConsumer, "BeamFnLoggingClient not fully started");
try {
- // Reset the logging configuration to what it is at startup
- for (Logger logger : configuredLoggers) {
- logger.setLevel(null);
+ try {
+ // Wait for buffered log messages to drain for a short period.
+ streamWriter.softClose();
+ bufferedLogConsumer.get(10, SECONDS);
+ } catch (TimeoutException e) {
+ // Terminate the phaser that we block on when attempting to honor flow
control on the
+ // outbound observer.
+ streamWriter.hardClose();
+ // Wait for the sending thread to exit.
+ bufferedLogConsumer.get();
}
- configuredLoggers.clear();
- LogManager.getLogManager().readConfiguration();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof Exception) {
+ throw (Exception) e.getCause();
+ }
+ throw e;
+ }
+ }
- // Hang up with the server
- logRecordHandler.close();
+ public void setProcessBundleHandler(ProcessBundleHandler
processBundleHandler) {
+ this.processBundleHandler = processBundleHandler;
+ }
+
+ // Reset the logging configuration to what it is at startup.
+ @RequiresNonNull("configuredLoggers")
+ @RequiresNonNull("logRecordHandler")
+ private void restoreLoggers(@UnderInitialization BeamFnLoggingClient this) {
+ for (Logger logger : configuredLoggers) {
+ logger.setLevel(null);
+ // Explicitly remove the installed handler in case reading the
configuration fails.
+ logger.removeHandler(logRecordHandler);
+ }
+ configuredLoggers.clear();
+
LogManager.getLogManager().getLogger(ROOT_LOGGER_NAME).removeHandler(logRecordHandler);
+ try {
+ LogManager.getLogManager().readConfiguration();
+ } catch (IOException e) {
+ System.out.print("Unable to restore log managers from configuration: " +
e.toString());
+ }
+ }
- // Wait for the server to hang up
- inboundObserverCompletion.get();
- } finally {
- // Shut the channel down
- channel.shutdown();
- if (!channel.awaitTermination(10, TimeUnit.SECONDS)) {
- channel.shutdownNow();
+ @RequiresNonNull("bufferedLogEntries")
+ void flushFinalLogs(@UnderInitialization BeamFnLoggingClient this) {
+ List<BeamFnApi.LogEntry> finalLogEntries = new
ArrayList<>(MAX_BUFFERED_LOG_ENTRY_COUNT);
+ bufferedLogEntries.drainTo(finalLogEntries);
+ for (BeamFnApi.LogEntry logEntry : finalLogEntries) {
+ LogRecord logRecord =
+ new LogRecord(REVERSE_LOG_LEVEL_MAP.get(logEntry.getSeverity()),
logEntry.getMessage());
+ logRecord.setLoggerName(logEntry.getLogLocation());
+ logRecord.setMillis(
+ logEntry.getTimestamp().getSeconds() * 1000
+ + logEntry.getTimestamp().getNanos() / 1_000_000);
+ logRecord.setThreadID(Integer.parseInt(logEntry.getThread()));
+ if (!logEntry.getTrace().isEmpty()) {
+ logRecord.setThrown(new Throwable(logEntry.getTrace()));
}
+ LogManager.getLogManager().getLogger(ROOT_LOGGER_NAME).log(logRecord);
}
}
+ public CompletableFuture<?> terminationFuture() {
+ checkNotNull(bufferedLogConsumer, "BeamFnLoggingClient not fully started");
+ return bufferedLogConsumer;
+ }
+
@Override
public String toString() {
return MoreObjects.toStringHelper(BeamFnLoggingClient.class)
@@ -168,24 +400,11 @@ public class BeamFnLoggingClient implements AutoCloseable
{
.toString();
}
- private class LogRecordHandler extends Handler implements Runnable {
- private final BlockingQueue<LogEntry> bufferedLogEntries =
- new ArrayBlockingQueue<>(MAX_BUFFERED_LOG_ENTRY_COUNT);
- private @Nullable Future<?> bufferedLogWriter = null;
- /**
- * Safe object publishing is not required since we only care if the thread
that set this field
- * is equal to the thread also attempting to add a log entry.
- */
- private @Nullable Thread logEntryHandlerThread = null;
-
- private boolean logMdc = true;
-
- private void setLogMdc(boolean value) {
- logMdc = value;
- }
+ private class LogRecordHandler extends Handler {
+ private final boolean logMdc;
- private void executeOn(ExecutorService executorService) {
- bufferedLogWriter = executorService.submit(this);
+ LogRecordHandler(boolean logMdc) {
+ this.logMdc = logMdc;
}
@Override
@@ -242,7 +461,6 @@ public class BeamFnLoggingClient implements AutoCloseable {
}
// The thread that sends log records should never perform a blocking
publish and
- // only insert log records best effort.
if (Thread.currentThread() != logEntryHandlerThread) {
// Blocks caller till enough space exists to publish this log entry.
try {
@@ -257,114 +475,14 @@ public class BeamFnLoggingClient implements
AutoCloseable {
}
}
- @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_BAD_PRACTICE")
- private void dropIfBufferFull(BeamFnApi.LogEntry logEntry) {
- bufferedLogEntries.offer(logEntry);
- }
-
- @Override
- public void run() {
- // Logging which occurs in this thread will attempt to publish log
entries into the
- // above handler which should never block if the queue is full otherwise
- // this thread will get stuck.
- logEntryHandlerThread = Thread.currentThread();
-
- List<BeamFnApi.LogEntry> additionalLogEntries = new
ArrayList<>(MAX_BUFFERED_LOG_ENTRY_COUNT);
- Throwable thrown = null;
- try {
- // As long as we haven't yet terminated, then attempt
- while (!phaser.isTerminated()) {
- // Try to wait for a message to show up.
- BeamFnApi.LogEntry logEntry = bufferedLogEntries.poll(1,
TimeUnit.SECONDS);
- // If we don't have a message then we need to try this loop again.
- if (logEntry == null) {
- continue;
- }
-
- // Attempt to honor flow control. Phaser termination causes await
advance to return
- // immediately.
- int phase = phaser.getPhase();
- if (!outboundObserver.isReady()) {
- phaser.awaitAdvance(phase);
- }
-
- // Batch together as many log messages as possible that are held
within the buffer
- BeamFnApi.LogEntry.List.Builder builder =
- BeamFnApi.LogEntry.List.newBuilder().addLogEntries(logEntry);
- bufferedLogEntries.drainTo(additionalLogEntries);
- builder.addAllLogEntries(additionalLogEntries);
- outboundObserver.onNext(builder.build());
- additionalLogEntries.clear();
- }
-
- // Perform one more final check to see if there are any log entries to
guarantee that
- // if a log entry was added on the thread performing termination that
we will send it.
- bufferedLogEntries.drainTo(additionalLogEntries);
- if (!additionalLogEntries.isEmpty()) {
- outboundObserver.onNext(
-
BeamFnApi.LogEntry.List.newBuilder().addAllLogEntries(additionalLogEntries).build());
- }
- } catch (Throwable t) {
- thrown = t;
- }
- if (thrown != null) {
- outboundObserver.onError(
-
Status.INTERNAL.withDescription(getStackTraceAsString(thrown)).asException());
- throw new IllegalStateException(thrown);
- } else {
- outboundObserver.onCompleted();
- }
+ private boolean dropIfBufferFull(BeamFnApi.LogEntry logEntry) {
+ return bufferedLogEntries.offer(logEntry);
}
@Override
public void flush() {}
@Override
- public synchronized void close() {
- // If we are done, then a previous caller has already shutdown the queue
processing thread
- // hence we don't need to do it again.
- if (phaser.isTerminated()) {
- return;
- }
-
- // Terminate the phaser that we block on when attempting to honor flow
control on the
- // outbound observer.
- phaser.forceTermination();
-
- if (bufferedLogWriter != null) {
- try {
- bufferedLogWriter.get();
- } catch (CancellationException e) {
- // Ignore cancellations
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- throw new RuntimeException(e);
- }
- }
- }
- }
-
- private class LogControlObserver
- implements ClientResponseObserver<BeamFnApi.LogEntry,
BeamFnApi.LogControl> {
-
- @Override
- public void beforeStart(ClientCallStreamObserver<BeamFnApi.LogEntry>
requestStream) {
- requestStream.setOnReadyHandler(phaser::arrive);
- }
-
- @Override
- public void onNext(BeamFnApi.LogControl value) {}
-
- @Override
- public void onError(Throwable t) {
- inboundObserverCompletion.completeExceptionally(t);
- }
-
- @Override
- public void onCompleted() {
- inboundObserverCompletion.complete(COMPLETED);
- }
+ public synchronized void close() {}
}
}
diff --git
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
index ad763a58887..5157e124b31 100644
---
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
+++
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
@@ -166,7 +166,7 @@ public class BeamFnControlClientTest {
// Ensure that the server completing the stream translates to the
completable future
// being completed allowing for a successful shutdown of the client.
outboundServerObserver.onCompleted();
- client.waitForTermination();
+ client.terminationFuture().get();
} finally {
server.shutdownNow();
}
@@ -236,7 +236,7 @@ public class BeamFnControlClientTest {
// Ensure that the client shuts down when an Error is thrown from the
harness
try {
- client.waitForTermination();
+ client.terminationFuture().get();
throw new IllegalStateException("The future should have terminated
with an error");
} catch (ExecutionException errorWrapper) {
assertThat(errorWrapper.getCause().getMessage(), containsString("Test
Error"));
diff --git
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
index 66b9246d619..1fd8e249dd0 100644
---
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
+++
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
@@ -28,6 +28,7 @@ import static org.junit.Assert.assertTrue;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Handler;
@@ -44,7 +45,13 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.Struct;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.Timestamp;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.Value;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.CallOptions;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Channel;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ClientCall;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ClientInterceptor;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ForwardingClientCall;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.MethodDescriptor;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Server;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Status;
import
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.inprocess.InProcessChannelBuilder;
@@ -157,7 +164,7 @@ public class BeamFnLoggingClientTest {
try {
BeamFnLoggingClient client =
- new BeamFnLoggingClient(
+ BeamFnLoggingClient.createAndStart(
PipelineOptionsFactory.fromArgs(
new String[] {
"--defaultSdkHarnessLogLevel=OFF",
@@ -223,6 +230,12 @@ public class BeamFnLoggingClientTest {
values.addAll(logEntries.getLogEntriesList()))
.build();
+ // Keep a strong reference to the loggers. Otherwise the call to
client.close()
+ // removes the only reference and the logger may get GC'd before the
assertions (BEAM-4136).
+ Logger rootLogger = null;
+ Logger configuredLogger = null;
+ Phaser streamBlocker = new Phaser(1);
+
Endpoints.ApiServiceDescriptor apiServiceDescriptor =
Endpoints.ApiServiceDescriptor.newBuilder()
.setUrl(this.getClass().getName() + "-" +
UUID.randomUUID().toString())
@@ -234,6 +247,9 @@ public class BeamFnLoggingClientTest {
@Override
public StreamObserver<BeamFnApi.LogEntry.List> logging(
StreamObserver<BeamFnApi.LogControl> outboundObserver) {
+ // Block before returning an error on the stream so that
we can observe the
+ // loggers before they are reset.
+ streamBlocker.awaitAdvance(1);
outboundServerObserver.set(outboundObserver);
outboundObserver.onError(
Status.INTERNAL.withDescription("TEST
ERROR").asException());
@@ -244,15 +260,9 @@ public class BeamFnLoggingClientTest {
server.start();
ManagedChannel channel =
InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build();
-
- // Keep a strong reference to the loggers. Otherwise the call to
client.close()
- // removes the only reference and the logger may get GC'd before the
assertions (BEAM-4136).
- Logger rootLogger = null;
- Logger configuredLogger = null;
-
try {
BeamFnLoggingClient client =
- new BeamFnLoggingClient(
+ BeamFnLoggingClient.createAndStart(
PipelineOptionsFactory.fromArgs(
new String[] {
"--defaultSdkHarnessLogLevel=OFF",
@@ -261,15 +271,16 @@ public class BeamFnLoggingClientTest {
.create(),
apiServiceDescriptor,
(Endpoints.ApiServiceDescriptor descriptor) -> channel);
-
+ // The loggers should be installed before createAndStart returns.
rootLogger = LogManager.getLogManager().getLogger("");
configuredLogger =
LogManager.getLogManager().getLogger("ConfiguredLogger");
-
+ // Allow the stream to return with an error.
+ assertEquals(0, streamBlocker.arrive());
thrown.expectMessage("TEST ERROR");
client.close();
} finally {
assertNotNull("rootLogger should be initialized before exception",
rootLogger);
- assertNotNull("configuredLogger should be initialized before exception",
rootLogger);
+ assertNotNull("configuredLogger should be initialized before exception",
configuredLogger);
// Verify that after close, log levels are reset.
assertEquals(Level.INFO, rootLogger.getLevel());
@@ -311,11 +322,12 @@ public class BeamFnLoggingClientTest {
})
.build();
server.start();
+ thrown.expectMessage("Logging stream terminated unexpectedly");
ManagedChannel channel =
InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build();
try {
BeamFnLoggingClient client =
- new BeamFnLoggingClient(
+ BeamFnLoggingClient.createAndStart(
PipelineOptionsFactory.fromArgs(
new String[] {
"--defaultSdkHarnessLogLevel=OFF",
@@ -329,7 +341,6 @@ public class BeamFnLoggingClientTest {
// removes the only reference and the logger may get GC'd before the
assertions (BEAM-4136).
Logger rootLogger = LogManager.getLogManager().getLogger("");
Logger configuredLogger =
LogManager.getLogManager().getLogger("ConfiguredLogger");
-
client.close();
// Verify that after close, log levels are reset.
@@ -341,4 +352,167 @@ public class BeamFnLoggingClientTest {
server.shutdownNow();
}
}
+
+ @Test
+ public void testClosableWhenBlockingForOnReady() throws Exception {
+ BeamFnLoggingMDC.setInstructionId("instruction-1");
+ Collection<BeamFnApi.LogEntry> values = new ConcurrentLinkedQueue<>();
+ AtomicReference<StreamObserver<BeamFnApi.LogControl>>
outboundServerObserver =
+ new AtomicReference<>();
+
+ final AtomicBoolean elementsAllowed = new AtomicBoolean(true);
+ CallStreamObserver<BeamFnApi.LogEntry.List> inboundServerObserver =
+ TestStreams.withOnNext(
+ (BeamFnApi.LogEntry.List logEntries) ->
+ values.addAll(logEntries.getLogEntriesList()))
+ .build();
+
+ Endpoints.ApiServiceDescriptor apiServiceDescriptor =
+ Endpoints.ApiServiceDescriptor.newBuilder()
+ .setUrl(this.getClass().getName() + "-" +
UUID.randomUUID().toString())
+ .build();
+ Server server =
+ InProcessServerBuilder.forName(apiServiceDescriptor.getUrl())
+ .addService(
+ new BeamFnLoggingGrpc.BeamFnLoggingImplBase() {
+ @Override
+ public StreamObserver<BeamFnApi.LogEntry.List> logging(
+ StreamObserver<BeamFnApi.LogControl> outboundObserver) {
+ outboundServerObserver.set(outboundObserver);
+ return inboundServerObserver;
+ }
+ })
+ .build();
+ server.start();
+
+ ManagedChannel channel =
+ InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl())
+ .intercept(
+ new ClientInterceptor() {
+ @Override
+ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
+ MethodDescriptor<ReqT, RespT> method, CallOptions
callOptions, Channel next) {
+ ClientCall<ReqT, RespT> delegate = next.newCall(method,
callOptions);
+ return new
ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
+ delegate) {
+ @Override
+ public boolean isReady() {
+ return elementsAllowed.get();
+ }
+ };
+ }
+ })
+ .build();
+
+ // Keep a strong reference to the loggers. Otherwise the call to
client.close()
+ // removes the only reference and the logger may get GC'd before the
assertions (BEAM-4136).
+ Logger rootLogger = null;
+ Logger configuredLogger = null;
+
+ try {
+ BeamFnLoggingClient client =
+ BeamFnLoggingClient.createAndStart(
+ PipelineOptionsFactory.fromArgs(
+ new String[] {
+ "--defaultSdkHarnessLogLevel=OFF",
+ "--sdkHarnessLogLevelOverrides={\"ConfiguredLogger\":
\"DEBUG\"}"
+ })
+ .create(),
+ apiServiceDescriptor,
+ (Endpoints.ApiServiceDescriptor descriptor) -> channel);
+
+ rootLogger = LogManager.getLogManager().getLogger("");
+ configuredLogger =
LogManager.getLogManager().getLogger("ConfiguredLogger");
+
+ long numEntries = 2000;
+ for (int i = 0; i < numEntries; ++i) {
+ configuredLogger.log(TEST_RECORD);
+ }
+ // Measure how long it takes all the logs to appear.
+ int sleepTime = 0;
+ while (values.size() < numEntries) {
+ ++sleepTime;
+ Thread.sleep(1);
+ }
+ // Attempt to enter the blocking state by pushing back on the stream,
publishing records and
+ // then giving them time for it to block.
+ elementsAllowed.set(false);
+ for (int i = 0; i < numEntries; ++i) {
+ configuredLogger.log(TEST_RECORD);
+ }
+ Thread.sleep(sleepTime * 3);
+ // At this point, the background thread is either blocking as intended
or the background
+ // thread hasn't yet observed all the input. In either case the test
should pass.
+ assertTrue(values.size() < numEntries * 2);
+
+ client.close();
+
+ assertNotNull("rootLogger should be initialized before exception",
rootLogger);
+ assertNotNull("configuredLogger should be initialized before exception",
rootLogger);
+
+ // Verify that after stream terminates, log levels are reset.
+ assertEquals(Level.INFO, rootLogger.getLevel());
+ assertNull(configuredLogger.getLevel());
+
+ assertTrue(channel.isShutdown());
+ } finally {
+ server.shutdownNow();
+ }
+ }
+
+ @Test
+ public void testServerCloseNotifiesTermination() throws Exception {
+ BeamFnLoggingMDC.setInstructionId("instruction-1");
+ Collection<BeamFnApi.LogEntry> values = new ConcurrentLinkedQueue<>();
+ AtomicReference<StreamObserver<BeamFnApi.LogControl>>
outboundServerObserver =
+ new AtomicReference<>();
+ CallStreamObserver<BeamFnApi.LogEntry.List> inboundServerObserver =
+ TestStreams.withOnNext(
+ (BeamFnApi.LogEntry.List logEntries) ->
+ values.addAll(logEntries.getLogEntriesList()))
+ .build();
+
+ Endpoints.ApiServiceDescriptor apiServiceDescriptor =
+ Endpoints.ApiServiceDescriptor.newBuilder()
+ .setUrl(this.getClass().getName() + "-" +
UUID.randomUUID().toString())
+ .build();
+ Server server =
+ InProcessServerBuilder.forName(apiServiceDescriptor.getUrl())
+ .addService(
+ new BeamFnLoggingGrpc.BeamFnLoggingImplBase() {
+ @Override
+ public StreamObserver<BeamFnApi.LogEntry.List> logging(
+ StreamObserver<BeamFnApi.LogControl> outboundObserver) {
+ outboundServerObserver.set(outboundObserver);
+ outboundObserver.onCompleted();
+ return inboundServerObserver;
+ }
+ })
+ .build();
+ server.start();
+
+ ManagedChannel channel =
InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build();
+ try {
+ BeamFnLoggingClient client =
+ BeamFnLoggingClient.createAndStart(
+ PipelineOptionsFactory.fromArgs(
+ new String[] {
+ "--defaultSdkHarnessLogLevel=OFF",
+ "--sdkHarnessLogLevelOverrides={\"ConfiguredLogger\":
\"DEBUG\"}"
+ })
+ .create(),
+ apiServiceDescriptor,
+ (Endpoints.ApiServiceDescriptor descriptor) -> channel);
+
+ thrown.expectMessage("Logging stream terminated unexpectedly");
+ client.terminationFuture().get();
+ } finally {
+ // Verify that after termination, log levels are reset.
+ assertEquals(Level.INFO,
LogManager.getLogManager().getLogger("").getLevel());
+
assertNull(LogManager.getLogManager().getLogger("ConfiguredLogger").getLevel());
+
+ assertTrue(channel.isShutdown());
+ server.shutdownNow();
+ }
+ }
}