Repository: beam Updated Branches: refs/heads/master dc3e2f756 -> 6dd90d89d
[BEAM-3016] Fix blocking issue within run() when channel terminates while blocking within DirectStreamObserver. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b1a22a89 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b1a22a89 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b1a22a89 Branch: refs/heads/master Commit: b1a22a89bd0d66db2754ba86f85d418a8122f9ea Parents: dc3e2f7 Author: Luke Cwik <[email protected]> Authored: Fri Oct 6 09:09:17 2017 -0700 Committer: Luke Cwik <[email protected]> Committed: Wed Oct 11 16:11:59 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/fn/harness/FnHarness.java | 10 +- .../fn/harness/logging/BeamFnLoggingClient.java | 139 ++++++++++--------- .../logging/BeamFnLoggingClientTest.java | 117 ++++++++++++++-- 3 files changed, 185 insertions(+), 81 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b1a22a89/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java index d6c461f..7d78856 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java @@ -20,7 +20,6 @@ package org.apache.beam.fn.harness; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.protobuf.TextFormat; -import java.io.PrintStream; import java.util.EnumMap; import org.apache.beam.fn.harness.channel.ManagedChannelFactory; import org.apache.beam.fn.harness.control.BeamFnControlClient; @@ -93,13 +92,10 @@ public class FnHarness { Endpoints.ApiServiceDescriptor controlApiServiceDescriptor) throws Exception { ManagedChannelFactory channelFactory = ManagedChannelFactory.from(options); StreamObserverFactory streamObserverFactory = StreamObserverFactory.fromOptions(options); - PrintStream originalErrStream = System.err; - try (BeamFnLoggingClient logging = new BeamFnLoggingClient( options, loggingApiServiceDescriptor, - channelFactory::forDescriptor, - streamObserverFactory::from)) { + channelFactory::forDescriptor)) { LOG.info("Fn Harness started"); EnumMap<BeamFnApi.InstructionRequest.RequestCase, @@ -134,9 +130,9 @@ public class FnHarness { LOG.info("Entering instruction processing loop"); control.processInstructionRequests(options.as(GcsOptions.class).getExecutorService()); } catch (Throwable t) { - t.printStackTrace(originalErrStream); + t.printStackTrace(); } finally { - originalErrStream.println("Shutting SDK harness down."); + System.out.println("Shutting SDK harness down."); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/b1a22a89/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java index 240e954..d43ab25 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java @@ -24,7 +24,10 @@ import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableMap; import com.google.protobuf.Timestamp; import io.grpc.ManagedChannel; -import io.grpc.stub.StreamObserver; +import io.grpc.Status; +import io.grpc.stub.CallStreamObserver; +import io.grpc.stub.ClientCallStreamObserver; +import io.grpc.stub.ClientResponseObserver; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -36,8 +39,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; -import java.util.function.BiFunction; import java.util.function.Function; import java.util.logging.Formatter; import java.util.logging.Handler; @@ -79,12 +82,6 @@ public class BeamFnLoggingClient implements AutoCloseable { private static final Formatter FORMATTER = new SimpleFormatter(); - private static final String FAKE_INSTRUCTION_ID = "FAKE_INSTRUCTION_ID"; - - /* Used to signal to a thread processing a queue to finish its work gracefully. */ - private static final BeamFnApi.LogEntry POISON_PILL = - BeamFnApi.LogEntry.newBuilder().setInstructionReference(FAKE_INSTRUCTION_ID).build(); - /** * The number of log messages that will be buffered. Assuming log messages are at most 1 KiB, * this represents a buffer of about 10 MiBs. @@ -97,22 +94,20 @@ public class BeamFnLoggingClient implements AutoCloseable { private final Collection<Logger> configuredLoggers; private final Endpoints.ApiServiceDescriptor apiServiceDescriptor; private final ManagedChannel channel; - private final StreamObserver<BeamFnApi.LogEntry.List> outboundObserver; + private final CallStreamObserver<BeamFnApi.LogEntry.List> outboundObserver; private final LogControlObserver inboundObserver; private final LogRecordHandler logRecordHandler; private final CompletableFuture<Object> inboundObserverCompletion; + private final Phaser phaser; public BeamFnLoggingClient( PipelineOptions options, Endpoints.ApiServiceDescriptor apiServiceDescriptor, - Function<Endpoints.ApiServiceDescriptor, ManagedChannel> channelFactory, - BiFunction<Function<StreamObserver<BeamFnApi.LogControl>, - StreamObserver<BeamFnApi.LogEntry.List>>, - StreamObserver<BeamFnApi.LogControl>, - StreamObserver<BeamFnApi.LogEntry.List>> streamObserverFactory) { + Function<Endpoints.ApiServiceDescriptor, ManagedChannel> channelFactory) { this.apiServiceDescriptor = apiServiceDescriptor; this.inboundObserverCompletion = new CompletableFuture<>(); this.configuredLoggers = new ArrayList<>(); + this.phaser = new Phaser(1); this.channel = channelFactory.apply(apiServiceDescriptor); // Reset the global log manager, get the root logger and remove the default log handlers. @@ -142,29 +137,32 @@ public class BeamFnLoggingClient implements AutoCloseable { inboundObserver = new LogControlObserver(); logRecordHandler = new LogRecordHandler(options.as(GcsOptions.class).getExecutorService()); logRecordHandler.setLevel(Level.ALL); - outboundObserver = streamObserverFactory.apply(stub::logging, inboundObserver); + outboundObserver = + (CallStreamObserver<BeamFnApi.LogEntry.List>) stub.logging(inboundObserver); rootLogger.addHandler(logRecordHandler); } @Override public void close() throws Exception { - // Hang up with the server - logRecordHandler.close(); + try { + // Hang up with the server + logRecordHandler.close(); - // Wait for the server to hang up - inboundObserverCompletion.get(); - - // Reset the logging configuration to what it is at startup - for (Logger logger : configuredLoggers) { - logger.setLevel(null); - } - configuredLoggers.clear(); - LogManager.getLogManager().readConfiguration(); + // Wait for the server to hang up + inboundObserverCompletion.get(); + } finally { + // Reset the logging configuration to what it is at startup + for (Logger logger : configuredLoggers) { + logger.setLevel(null); + } + configuredLoggers.clear(); + LogManager.getLogManager().readConfiguration(); - // Shut the channel down - channel.shutdown(); - if (!channel.awaitTermination(10, TimeUnit.SECONDS)) { - channel.shutdownNow(); + // Shut the channel down + channel.shutdown(); + if (!channel.awaitTermination(10, TimeUnit.SECONDS)) { + channel.shutdownNow(); + } } } @@ -231,24 +229,41 @@ public class BeamFnLoggingClient implements AutoCloseable { List<BeamFnApi.LogEntry> additionalLogEntries = new ArrayList<>(MAX_BUFFERED_LOG_ENTRY_COUNT); + Throwable thrown = null; try { - BeamFnApi.LogEntry logEntry; - while ((logEntry = bufferedLogEntries.take()) != POISON_PILL) { + // As long as we haven't yet terminated, then attempt + while (!phaser.isTerminated()) { + // Try to wait for a message to show up. + BeamFnApi.LogEntry logEntry = bufferedLogEntries.poll(1, TimeUnit.SECONDS); + // If we don't have a message then we need to try this loop again. + if (logEntry == null) { + continue; + } + + // Attempt to honor flow control. Phaser termination causes await advance to return + // immediately. + int phase = phaser.getPhase(); + if (!outboundObserver.isReady()) { + phaser.awaitAdvance(phase); + } + + // Batch together as many log messages as possible that are held within the buffer BeamFnApi.LogEntry.List.Builder builder = BeamFnApi.LogEntry.List.newBuilder().addLogEntries(logEntry); bufferedLogEntries.drainTo(additionalLogEntries); - for (int i = 0; i < additionalLogEntries.size(); ++i) { - if (additionalLogEntries.get(i) == POISON_PILL) { - additionalLogEntries = additionalLogEntries.subList(0, i); - break; - } - } builder.addAllLogEntries(additionalLogEntries); outboundObserver.onNext(builder.build()); + additionalLogEntries.clear(); } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException(e); + } catch (Throwable t) { + thrown = t; + } + if (thrown != null) { + outboundObserver.onError( + Status.INTERNAL.withDescription(getStackTraceAsString(thrown)).asException()); + throw new IllegalStateException(thrown); + } else { + outboundObserver.onCompleted(); } } @@ -257,31 +272,17 @@ public class BeamFnLoggingClient implements AutoCloseable { } @Override - public void close() { - synchronized (outboundObserver) { - // If we are done, then a previous caller has already shutdown the queue processing thread - // hence we don't need to do it again. - if (!bufferedLogWriter.isDone()) { - // We check to see if we were able to successfully insert the poison pill at the end of - // the queue forcing the remainder of the elements to be processed or if the processing - // thread is done. - try { - // The order of these checks is important because short circuiting will cause us to - // insert into the queue first and only if it fails do we check that the thread is done. - while (!bufferedLogEntries.offer(POISON_PILL, 60, TimeUnit.SECONDS) - || !bufferedLogWriter.isDone()) { - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - waitTillFinish(); - } - outboundObserver.onCompleted(); + public synchronized void close() { + // If we are done, then a previous caller has already shutdown the queue processing thread + // hence we don't need to do it again. + if (phaser.isTerminated()) { + return; } - } - private void waitTillFinish() { + // Terminate the phaser that we block on when attempting to honor flow control on the + // outbound observer. + phaser.arriveAndDeregister(); + try { bufferedLogWriter.get(); } catch (CancellationException e) { @@ -295,7 +296,14 @@ public class BeamFnLoggingClient implements AutoCloseable { } } - private class LogControlObserver implements StreamObserver<BeamFnApi.LogControl> { + private class LogControlObserver + implements ClientResponseObserver<BeamFnApi.LogEntry, BeamFnApi.LogControl> { + + @Override + public void beforeStart(ClientCallStreamObserver requestStream) { + requestStream.setOnReadyHandler(phaser::arrive); + } + @Override public void onNext(BeamFnApi.LogControl value) { } @@ -309,5 +317,6 @@ public class BeamFnLoggingClient implements AutoCloseable { public void onCompleted() { inboundObserverCompletion.complete(null); } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/b1a22a89/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java index 161ce18..015e5ec 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java @@ -28,6 +28,7 @@ import static org.junit.Assert.assertTrue; import com.google.protobuf.Timestamp; import io.grpc.ManagedChannel; import io.grpc.Server; +import io.grpc.Status; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.stub.CallStreamObserver; @@ -37,7 +38,6 @@ import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; import java.util.logging.Level; import java.util.logging.LogManager; import java.util.logging.LogRecord; @@ -46,7 +46,9 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi; import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc; import org.apache.beam.model.pipeline.v1.Endpoints; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -90,6 +92,7 @@ public class BeamFnLoggingClientTest { .setTimestamp(Timestamp.newBuilder().setSeconds(1234567).setNanos(890000000).build()) .setLogLocation("LoggerName") .build(); + @Rule public ExpectedException thrown = ExpectedException.none(); @Test public void testLogging() throws Exception { @@ -124,9 +127,10 @@ public class BeamFnLoggingClientTest { }) .build(); server.start(); + + ManagedChannel channel = + InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build(); try { - ManagedChannel channel = - InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build(); BeamFnLoggingClient client = new BeamFnLoggingClient( PipelineOptionsFactory.fromArgs(new String[] { @@ -134,8 +138,7 @@ public class BeamFnLoggingClientTest { "--workerLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}" }).create(), apiServiceDescriptor, - (Endpoints.ApiServiceDescriptor descriptor) -> channel, - this::createStreamForTest); + (Endpoints.ApiServiceDescriptor descriptor) -> channel); // Ensure that log levels were correctly set. assertEquals(Level.OFF, @@ -162,9 +165,105 @@ public class BeamFnLoggingClientTest { } } - private <ReqT, RespT> StreamObserver<RespT> createStreamForTest( - Function<StreamObserver<ReqT>, StreamObserver<RespT>> clientFactory, - StreamObserver<ReqT> handler) { - return clientFactory.apply(handler); + @Test + public void testWhenServerFailsThatClientIsAbleToCleanup() throws Exception { + AtomicBoolean clientClosedStream = new AtomicBoolean(); + Collection<BeamFnApi.LogEntry> values = new ConcurrentLinkedQueue<>(); + AtomicReference<StreamObserver<BeamFnApi.LogControl>> outboundServerObserver = + new AtomicReference<>(); + CallStreamObserver<BeamFnApi.LogEntry.List> inboundServerObserver = TestStreams.withOnNext( + (BeamFnApi.LogEntry.List logEntries) -> values.addAll(logEntries.getLogEntriesList())) + .build(); + + Endpoints.ApiServiceDescriptor apiServiceDescriptor = + Endpoints.ApiServiceDescriptor.newBuilder() + .setUrl(this.getClass().getName() + "-" + UUID.randomUUID().toString()) + .build(); + Server server = InProcessServerBuilder.forName(apiServiceDescriptor.getUrl()) + .addService(new BeamFnLoggingGrpc.BeamFnLoggingImplBase() { + @Override + public StreamObserver<BeamFnApi.LogEntry.List> logging( + StreamObserver<BeamFnApi.LogControl> outboundObserver) { + outboundServerObserver.set(outboundObserver); + outboundObserver.onError(Status.INTERNAL.withDescription("TEST ERROR").asException()); + return inboundServerObserver; + } + }) + .build(); + server.start(); + + ManagedChannel channel = + InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build(); + try { + BeamFnLoggingClient client = new BeamFnLoggingClient( + PipelineOptionsFactory.fromArgs(new String[] { + "--defaultWorkerLogLevel=OFF", + "--workerLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}" + }).create(), + apiServiceDescriptor, + (Endpoints.ApiServiceDescriptor descriptor) -> channel); + + thrown.expectMessage("TEST ERROR"); + client.close(); + } finally { + // Verify that after close, log levels are reset. + assertEquals(Level.INFO, LogManager.getLogManager().getLogger("").getLevel()); + assertNull(LogManager.getLogManager().getLogger("ConfiguredLogger").getLevel()); + + assertTrue(channel.isShutdown()); + + server.shutdownNow(); + } + } + + @Test + public void testWhenServerHangsUpEarlyThatClientIsAbleCleanup() throws Exception { + AtomicBoolean clientClosedStream = new AtomicBoolean(); + Collection<BeamFnApi.LogEntry> values = new ConcurrentLinkedQueue<>(); + AtomicReference<StreamObserver<BeamFnApi.LogControl>> outboundServerObserver = + new AtomicReference<>(); + CallStreamObserver<BeamFnApi.LogEntry.List> inboundServerObserver = + TestStreams.withOnNext( + (BeamFnApi.LogEntry.List logEntries) -> values.addAll(logEntries.getLogEntriesList())) + .build(); + + Endpoints.ApiServiceDescriptor apiServiceDescriptor = + Endpoints.ApiServiceDescriptor.newBuilder() + .setUrl(this.getClass().getName() + "-" + UUID.randomUUID().toString()) + .build(); + Server server = InProcessServerBuilder.forName(apiServiceDescriptor.getUrl()) + .addService(new BeamFnLoggingGrpc.BeamFnLoggingImplBase() { + @Override + public StreamObserver<BeamFnApi.LogEntry.List> logging( + StreamObserver<BeamFnApi.LogControl> outboundObserver) { + outboundServerObserver.set(outboundObserver); + outboundObserver.onCompleted(); + return inboundServerObserver; + } + }) + .build(); + server.start(); + + ManagedChannel channel = + InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build(); + try { + BeamFnLoggingClient client = new BeamFnLoggingClient( + PipelineOptionsFactory.fromArgs(new String[] { + "--defaultWorkerLogLevel=OFF", + "--workerLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}" + }).create(), + apiServiceDescriptor, + (Endpoints.ApiServiceDescriptor descriptor) -> channel); + + client.close(); + } finally { + // Verify that after close, log levels are reset. + assertEquals(Level.INFO, LogManager.getLogManager().getLogger("").getLevel()); + assertNull(LogManager.getLogManager().getLogger("ConfiguredLogger").getLevel()); + + assertTrue(channel.isShutdown()); + + server.shutdownNow(); + } } }
