This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 3d9318bff08 Remove deprecated first implementations of data channel connector code on the runner side (#25104) 3d9318bff08 is described below commit 3d9318bff0873fb7426cff7cf0ee4eeedbf5796f Author: Luke Cwik <lc...@google.com> AuthorDate: Tue Jan 24 09:59:04 2023 -0800 Remove deprecated first implementations of data channel connector code on the runner side (#25104) This allows us to cleanup the existing code base now that the Dataflow JRH is gone and should improve the stability and performance for Java based portable runners built on top of this code since we have had a lot more use of the SDK harness variants at scale. --- .../fnexecution/control/SdkHarnessClient.java | 135 ++++++----- .../runners/fnexecution/data/FnDataService.java | 53 +++-- .../runners/fnexecution/data/GrpcDataService.java | 65 +++--- .../control/DefaultJobBundleFactoryTest.java | 6 +- .../fnexecution/control/SdkHarnessClientTest.java | 259 +++++++++++---------- .../fnexecution/data/GrpcDataServiceTest.java | 37 +-- .../sdk/fn/data/BeamFnDataGrpcMultiplexer.java | 227 ------------------ .../sdk/fn/data/BeamFnDataGrpcMultiplexer2.java | 2 - .../sdk/fn/data/BeamFnDataInboundObserver.java | 103 -------- .../sdk/fn/data/BeamFnDataOutboundObserver.java | 87 ------- .../data/CompletableFutureInboundDataClient.java | 78 ------- .../sdk/fn/data/BeamFnDataGrpcMultiplexerTest.java | 127 ---------- .../sdk/fn/data/BeamFnDataInboundObserverTest.java | 108 --------- .../CompletableFutureInboundDataClientTest.java | 163 ------------- .../beam/fn/harness/data/BeamFnDataClient.java | 9 +- 15 files changed, 298 insertions(+), 1161 deletions(-) diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java index 99cc2f793dc..609988b7d4e 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java @@ -20,10 +20,12 @@ package org.apache.beam.runners.fnexecution.control; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Phaser; @@ -46,12 +48,16 @@ import org.apache.beam.runners.fnexecution.data.FnDataService; import org.apache.beam.runners.fnexecution.data.RemoteInputDestination; import org.apache.beam.runners.fnexecution.state.StateDelegator; import org.apache.beam.runners.fnexecution.state.StateRequestHandler; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.fn.IdGenerator; import org.apache.beam.sdk.fn.IdGenerators; +import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2; +import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator; import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver; +import org.apache.beam.sdk.fn.data.DataEndpoint; import org.apache.beam.sdk.fn.data.FnDataReceiver; -import org.apache.beam.sdk.fn.data.InboundDataClient; import org.apache.beam.sdk.fn.data.LogicalEndpoint; +import org.apache.beam.sdk.fn.data.TimerEndpoint; import org.apache.beam.sdk.util.MoreFutures; import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; @@ -262,44 +268,64 @@ public class SdkHarnessClient implements AutoCloseable { CompletionStage<BeamFnApi.ProcessBundleResponse> specificResponse = genericResponse.thenApply(InstructionResponse::getProcessBundle); - Map<LogicalEndpoint, InboundDataClient> outputClients = new HashMap<>(); - for (Map.Entry<String, RemoteOutputReceiver<?>> receiver : outputReceivers.entrySet()) { - LogicalEndpoint endpoint = LogicalEndpoint.data(bundleId, receiver.getKey()); - InboundDataClient outputClient = - attachReceiver(endpoint, (RemoteOutputReceiver) receiver.getValue()); - outputClients.put(endpoint, outputClient); - } - for (Map.Entry<KV<String, String>, RemoteOutputReceiver<Timer<?>>> timerReceiver : - timerReceivers.entrySet()) { - LogicalEndpoint endpoint = - LogicalEndpoint.timer( - bundleId, timerReceiver.getKey().getKey(), timerReceiver.getKey().getValue()); - InboundDataClient outputClient = attachReceiver(endpoint, timerReceiver.getValue()); - outputClients.put(endpoint, outputClient); + Optional<BeamFnDataInboundObserver2> beamFnDataInboundObserver; + if (outputReceivers.isEmpty() && timerReceivers.isEmpty()) { + beamFnDataInboundObserver = Optional.empty(); + } else { + List<DataEndpoint<?>> dataEndpoints = new ArrayList<>(outputReceivers.size()); + for (Map.Entry<String, RemoteOutputReceiver<?>> receiver : outputReceivers.entrySet()) { + dataEndpoints.add( + DataEndpoint.create( + receiver.getKey(), + (Coder<Object>) receiver.getValue().getCoder(), + (FnDataReceiver<Object>) receiver.getValue().getReceiver())); + } + List<TimerEndpoint<?>> timerEndpoints = new ArrayList<>(timerReceivers.size()); + for (Map.Entry<KV<String, String>, RemoteOutputReceiver<Timer<?>>> timerReceiver : + timerReceivers.entrySet()) { + timerEndpoints.add( + TimerEndpoint.create( + timerReceiver.getKey().getKey(), + timerReceiver.getKey().getValue(), + timerReceiver.getValue().getCoder(), + timerReceiver.getValue().getReceiver())); + } + beamFnDataInboundObserver = + Optional.of(BeamFnDataInboundObserver2.forConsumers(dataEndpoints, timerEndpoints)); + fnApiDataService.registerReceiver(bundleId, beamFnDataInboundObserver.get()); } - ImmutableMap.Builder<LogicalEndpoint, CloseableFnDataReceiver> receiverBuilder = + ImmutableMap.Builder<LogicalEndpoint, FnDataReceiver<?>> receiverBuilder = ImmutableMap.builder(); + BeamFnDataOutboundAggregator beamFnDataOutboundAggregator = + fnApiDataService.createOutboundAggregator(() -> bundleId, false); for (RemoteInputDestination remoteInput : remoteInputs) { LogicalEndpoint endpoint = LogicalEndpoint.data(bundleId, remoteInput.getPTransformId()); receiverBuilder.put( endpoint, - new CountingFnDataReceiver(fnApiDataService.send(endpoint, remoteInput.getCoder()))); + new CountingFnDataReceiver( + beamFnDataOutboundAggregator.registerOutputDataLocation( + remoteInput.getPTransformId(), remoteInput.getCoder()))); } for (Map.Entry<String, Map<String, TimerSpec>> entry : timerSpecs.entrySet()) { for (TimerSpec timerSpec : entry.getValue().values()) { LogicalEndpoint endpoint = LogicalEndpoint.timer(bundleId, timerSpec.transformId(), timerSpec.timerId()); - receiverBuilder.put(endpoint, fnApiDataService.send(endpoint, timerSpec.coder())); + receiverBuilder.put( + endpoint, + beamFnDataOutboundAggregator.registerOutputTimersLocation( + timerSpec.transformId(), timerSpec.timerId(), timerSpec.coder())); } } + beamFnDataOutboundAggregator.start(); return new ActiveBundle( bundleId, specificResponse, + beamFnDataOutboundAggregator, receiverBuilder.build(), - outputClients, + beamFnDataInboundObserver, stateDelegator.registerForProcessBundleInstructionId(bundleId, stateRequestHandler), progressHandler, splitHandler, @@ -307,17 +333,13 @@ public class SdkHarnessClient implements AutoCloseable { finalizationHandler); } - private <OutputT> InboundDataClient attachReceiver( - LogicalEndpoint endpoint, RemoteOutputReceiver<OutputT> receiver) { - return fnApiDataService.receive(endpoint, receiver.getCoder(), receiver.getReceiver()); - } - /** An active bundle for a particular {@link BeamFnApi.ProcessBundleDescriptor}. */ public class ActiveBundle implements RemoteBundle { private final String bundleId; private final CompletionStage<BeamFnApi.ProcessBundleResponse> response; - private final Map<LogicalEndpoint, CloseableFnDataReceiver> inputReceivers; - private final Map<LogicalEndpoint, InboundDataClient> outputClients; + private final BeamFnDataOutboundAggregator beamFnDataOutboundAggregator; + private final Map<LogicalEndpoint, FnDataReceiver<?>> inputReceivers; + private final Optional<BeamFnDataInboundObserver2> beamFnDataInboundObserver; private final StateDelegator.Registration stateRegistration; private final BundleProgressHandler progressHandler; private final BundleSplitHandler splitHandler; @@ -330,8 +352,9 @@ public class SdkHarnessClient implements AutoCloseable { private ActiveBundle( String bundleId, CompletionStage<ProcessBundleResponse> response, - Map<LogicalEndpoint, CloseableFnDataReceiver> inputReceivers, - Map<LogicalEndpoint, InboundDataClient> outputClients, + BeamFnDataOutboundAggregator beamFnDataOutboundAggregator, + Map<LogicalEndpoint, FnDataReceiver<?>> inputReceivers, + Optional<BeamFnDataInboundObserver2> beamFnDataInboundObserver, StateDelegator.Registration stateRegistration, BundleProgressHandler progressHandler, BundleSplitHandler splitHandler, @@ -339,8 +362,9 @@ public class SdkHarnessClient implements AutoCloseable { BundleFinalizationHandler finalizationHandler) { this.bundleId = bundleId; this.response = response; + this.beamFnDataOutboundAggregator = beamFnDataOutboundAggregator; this.inputReceivers = inputReceivers; - this.outputClients = outputClients; + this.beamFnDataInboundObserver = beamFnDataInboundObserver; this.stateRegistration = stateRegistration; this.progressHandler = progressHandler; this.splitHandler = splitHandler; @@ -371,8 +395,7 @@ public class SdkHarnessClient implements AutoCloseable { @Override public Map<String, FnDataReceiver> getInputReceivers() { ImmutableMap.Builder<String, FnDataReceiver> rval = ImmutableMap.builder(); - for (Map.Entry<LogicalEndpoint, CloseableFnDataReceiver> entry : - inputReceivers.entrySet()) { + for (Map.Entry<LogicalEndpoint, FnDataReceiver<?>> entry : inputReceivers.entrySet()) { if (!entry.getKey().isTimer()) { rval.put(entry.getKey().getTransformId(), entry.getValue()); } @@ -384,12 +407,11 @@ public class SdkHarnessClient implements AutoCloseable { public Map<KV<String, String>, FnDataReceiver<Timer>> getTimerReceivers() { ImmutableMap.Builder<KV<String, String>, FnDataReceiver<Timer>> rval = ImmutableMap.builder(); - for (Map.Entry<LogicalEndpoint, CloseableFnDataReceiver> entry : - inputReceivers.entrySet()) { + for (Map.Entry<LogicalEndpoint, FnDataReceiver<?>> entry : inputReceivers.entrySet()) { if (entry.getKey().isTimer()) { rval.put( KV.of(entry.getKey().getTransformId(), entry.getKey().getTimerFamilyId()), - entry.getValue()); + (FnDataReceiver<Timer>) entry.getValue()); } } return rval.build(); @@ -432,7 +454,7 @@ public class SdkHarnessClient implements AutoCloseable { outstandingRequests.register(); } Map<String, DesiredSplit> splits = new HashMap<>(); - for (Map.Entry<LogicalEndpoint, CloseableFnDataReceiver> ptransformToInput : + for (Map.Entry<LogicalEndpoint, FnDataReceiver<?>> ptransformToInput : inputReceivers.entrySet()) { if (!ptransformToInput.getKey().isTimer()) { splits.put( @@ -487,16 +509,12 @@ public class SdkHarnessClient implements AutoCloseable { } Exception exception = null; - for (CloseableFnDataReceiver<?> inputReceiver : inputReceivers.values()) { - try { - inputReceiver.close(); - } catch (Exception e) { - if (exception == null) { - exception = e; - } else { - exception.addSuppressed(e); - } - } + try { + beamFnDataOutboundAggregator.sendOrCollectBufferedDataAndFinishOutboundStreams(); + } catch (Exception e) { + exception = e; + } finally { + beamFnDataOutboundAggregator.discard(); } try { // We don't have to worry about the completion stage. @@ -537,12 +555,13 @@ public class SdkHarnessClient implements AutoCloseable { exception.addSuppressed(e); } } - for (InboundDataClient outputClient : outputClients.values()) { + if (beamFnDataInboundObserver.isPresent()) { try { if (exception == null) { - outputClient.awaitCompletion(); + beamFnDataInboundObserver.get().awaitCompletion(); + fnApiDataService.unregisterReceiver(bundleId); } else { - outputClient.cancel(); + beamFnDataInboundObserver.get().close(); } } catch (Exception e) { if (exception == null) { @@ -696,14 +715,12 @@ public class SdkHarnessClient implements AutoCloseable { } } - /** - * A {@link CloseableFnDataReceiver} which counts the number of elements that have been accepted. - */ - private static class CountingFnDataReceiver<T> implements CloseableFnDataReceiver<T> { - private final CloseableFnDataReceiver delegate; + /** A {@link FnDataReceiver} which counts the number of elements that have been accepted. */ + private static class CountingFnDataReceiver<T> implements FnDataReceiver<T> { + private final FnDataReceiver<T> delegate; private long count; - private CountingFnDataReceiver(CloseableFnDataReceiver delegate) { + private CountingFnDataReceiver(FnDataReceiver<T> delegate) { this.delegate = delegate; } @@ -716,16 +733,6 @@ public class SdkHarnessClient implements AutoCloseable { delegate.accept(input); count += 1; } - - @Override - public void flush() throws Exception { - delegate.flush(); - } - - @Override - public void close() throws Exception { - delegate.close(); - } } /** Registers a {@link BeamFnApi.ProcessBundleDescriptor} for future processing. */ diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/FnDataService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/FnDataService.java index 1c33446993c..7c5f110eab2 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/FnDataService.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/FnDataService.java @@ -17,12 +17,11 @@ */ package org.apache.beam.runners.fnexecution.data; -import org.apache.beam.sdk.coders.Coder; +import java.util.function.Supplier; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements; import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator; import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver; import org.apache.beam.sdk.fn.data.FnDataReceiver; -import org.apache.beam.sdk.fn.data.InboundDataClient; -import org.apache.beam.sdk.fn.data.LogicalEndpoint; /** * The {@link FnDataService} is able to forward inbound elements to a consumer and is also a @@ -32,33 +31,43 @@ import org.apache.beam.sdk.fn.data.LogicalEndpoint; public interface FnDataService { /** - * Registers a receiver to be notified upon any incoming elements. + * Registers a receiver for the provided instruction id. * - * <p>The provided coder is used to decode inbound elements. The decoded elements are passed to - * the provided receiver. + * <p>The receiver is not required to be thread safe. * - * <p>Any failure during decoding or processing of the element will put the {@link - * InboundDataClient} into an error state such that {@link InboundDataClient#awaitCompletion()} - * will throw an exception. + * <p>Receivers for successfully processed bundles must be unregistered. See {@link + * #unregisterReceiver} for details. * - * <p>The provided receiver is not required to be thread safe. + * <p>Any failure during {@link FnDataReceiver#accept} will mark the provided {@code + * instructionId} as invalid and will ignore any future data. It is expected that if a bundle + * fails during processing then the failure will become visible to the {@link FnDataService} + * during a future {@link FnDataReceiver#accept} invocation. */ - <T> InboundDataClient receive( - LogicalEndpoint inputLocation, Coder<T> coder, FnDataReceiver<T> listener); + void registerReceiver(String instructionId, CloseableFnDataReceiver<Elements> observer); /** - * Creates a receiver to which you can write data values and have them sent over this data plane - * service. + * Receivers are only expected to be unregistered when bundle processing has completed + * successfully. * - * <p>The provided coder is used to encode elements on the outbound stream. - * - * <p>Closing the returned receiver signals the end of the stream. + * <p>It is expected that if a bundle fails during processing then the failure will become visible + * to the {@link FnDataService} during a future {@link FnDataReceiver#accept} invocation. + */ + void unregisterReceiver(String instructionId); + + /** + * Creates a {@link BeamFnDataOutboundAggregator} for buffering and sending outbound data and + * timers over the data plane. It is important that {@link + * BeamFnDataOutboundAggregator#sendOrCollectBufferedDataAndFinishOutboundStreams()} is called on + * the returned BeamFnDataOutboundAggregator at the end of each bundle. If + * collectElementsIfNoFlushes is set to true, {@link + * BeamFnDataOutboundAggregator#sendOrCollectBufferedDataAndFinishOutboundStreams()} returns the + * buffered elements instead of sending it through the outbound StreamObserver if there's no + * previous flush. * - * <p>The returned receiver is not thread safe. + * <p>Closing the returned aggregator signals the end of the streams. * - * @deprecated Migrate to use {@link BeamFnDataOutboundAggregator} directly for sending outbound - * data. + * <p>The returned aggregator is not thread safe. */ - @Deprecated - <T> CloseableFnDataReceiver<T> send(LogicalEndpoint outputLocation, Coder<T> coder); + BeamFnDataOutboundAggregator createOutboundAggregator( + Supplier<String> processBundleRequestIdSupplier, boolean collectElementsIfNoFlushes); } diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/GrpcDataService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/GrpcDataService.java index 47430530c8a..883d72103a9 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/GrpcDataService.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/GrpcDataService.java @@ -23,17 +23,13 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; import org.apache.beam.model.fnexecution.v1.BeamFnApi; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements; import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer; -import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver; -import org.apache.beam.sdk.fn.data.BeamFnDataOutboundObserver; +import org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer2; +import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator; import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver; -import org.apache.beam.sdk.fn.data.DecodingFnDataReceiver; -import org.apache.beam.sdk.fn.data.FnDataReceiver; -import org.apache.beam.sdk.fn.data.InboundDataClient; -import org.apache.beam.sdk.fn.data.LogicalEndpoint; import org.apache.beam.sdk.fn.server.FnService; import org.apache.beam.sdk.fn.stream.OutboundObserverFactory; import org.apache.beam.sdk.options.PipelineOptions; @@ -64,7 +60,7 @@ public class GrpcDataService extends BeamFnDataGrpc.BeamFnDataImplBase return new GrpcDataService(options, executor, outboundObserverFactory); } - private final SettableFuture<BeamFnDataGrpcMultiplexer> connectedClient; + private final SettableFuture<BeamFnDataGrpcMultiplexer2> connectedClient; /** * A collection of multiplexers which are not used to send data. A handle to these multiplexers is * maintained in order to perform an orderly shutdown. @@ -72,7 +68,7 @@ public class GrpcDataService extends BeamFnDataGrpc.BeamFnDataImplBase * <p>TODO: (BEAM-3811) Replace with some cancellable collection, to ensure that new clients of a * closed {@link GrpcDataService} are closed with that {@link GrpcDataService}. */ - private final Queue<BeamFnDataGrpcMultiplexer> additionalMultiplexers; + private final Queue<BeamFnDataGrpcMultiplexer2> additionalMultiplexers; private final PipelineOptions options; private final ExecutorService executor; @@ -103,8 +99,8 @@ public class GrpcDataService extends BeamFnDataGrpc.BeamFnDataImplBase public StreamObserver<BeamFnApi.Elements> data( final StreamObserver<BeamFnApi.Elements> outboundElementObserver) { LOG.info("Beam Fn Data client connected."); - BeamFnDataGrpcMultiplexer multiplexer = - new BeamFnDataGrpcMultiplexer( + BeamFnDataGrpcMultiplexer2 multiplexer = + new BeamFnDataGrpcMultiplexer2( null, outboundObserverFactory, inbound -> outboundElementObserver); // First client that connects completes this future. if (!connectedClient.set(multiplexer)) { @@ -125,7 +121,7 @@ public class GrpcDataService extends BeamFnDataGrpc.BeamFnDataImplBase // Multiplexer, but if there isn't any multiplexer it prevents callers blocking forever. connectedClient.cancel(true); // Close any other open connections - for (BeamFnDataGrpcMultiplexer additional : additionalMultiplexers) { + for (BeamFnDataGrpcMultiplexer2 additional : additionalMultiplexers) { try { additional.close(); } catch (Exception ignored) { @@ -139,18 +135,11 @@ public class GrpcDataService extends BeamFnDataGrpc.BeamFnDataImplBase @Override @SuppressWarnings("FutureReturnValueIgnored") - public <T> InboundDataClient receive( - final LogicalEndpoint inputLocation, Coder<T> coder, FnDataReceiver<T> listener) { - LOG.debug( - "Registering receiver for instruction {} and transform {}", - inputLocation.getInstructionId(), - inputLocation.getTransformId()); - final BeamFnDataInboundObserver observer = - BeamFnDataInboundObserver.forConsumer( - inputLocation, new DecodingFnDataReceiver<T>(coder, listener)); + public void registerReceiver(String instructionId, CloseableFnDataReceiver<Elements> observer) { + LOG.debug("Registering observer for instruction {}", instructionId); if (connectedClient.isDone()) { try { - connectedClient.get().registerConsumer(inputLocation, observer); + connectedClient.get().registerConsumer(instructionId, observer); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); @@ -161,7 +150,7 @@ public class GrpcDataService extends BeamFnDataGrpc.BeamFnDataImplBase executor.submit( () -> { try { - connectedClient.get().registerConsumer(inputLocation, observer); + connectedClient.get().registerConsumer(instructionId, observer); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); @@ -170,21 +159,29 @@ public class GrpcDataService extends BeamFnDataGrpc.BeamFnDataImplBase } }); } - return observer; } @Override - public <T> CloseableFnDataReceiver<T> send(LogicalEndpoint outputLocation, Coder<T> coder) { - LOG.debug( - "Creating sender for instruction {} and transform {}", - outputLocation.getInstructionId(), - outputLocation.getTransformId()); + public void unregisterReceiver(String instructionId) { try { - return new BeamFnDataOutboundObserver<>( - outputLocation, - coder, + connectedClient.get().unregisterConsumer(instructionId); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e.getCause()); + } + } + + @Override + public BeamFnDataOutboundAggregator createOutboundAggregator( + Supplier<String> processBundleRequestIdSupplier, boolean collectElementsIfNoFlushes) { + try { + return new BeamFnDataOutboundAggregator( + options, + processBundleRequestIdSupplier, connectedClient.get(3, TimeUnit.MINUTES).getOutboundObserver(), - options); + collectElementsIfNoFlushes); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java index e0d41ba55d7..44ae7e6bca9 100644 --- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactoryTest.java @@ -20,6 +20,7 @@ package org.apache.beam.runners.fnexecution.control; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; @@ -62,7 +63,7 @@ import org.apache.beam.runners.fnexecution.state.StateDelegator; import org.apache.beam.runners.fnexecution.state.StateRequestHandler; import org.apache.beam.sdk.fn.IdGenerator; import org.apache.beam.sdk.fn.IdGenerators; -import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver; +import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator; import org.apache.beam.sdk.fn.server.GrpcFnServer; import org.apache.beam.sdk.fn.server.ServerFactory; import org.apache.beam.sdk.options.ExperimentalOptions; @@ -123,7 +124,8 @@ public class DefaultJobBundleFactoryTest { when(dataServer.getApiServiceDescriptor()) .thenReturn(ApiServiceDescriptor.getDefaultInstance()); GrpcDataService dataService = mock(GrpcDataService.class); - when(dataService.send(any(), any())).thenReturn(mock(CloseableFnDataReceiver.class)); + when(dataService.createOutboundAggregator(any(), anyBoolean())) + .thenReturn(mock(BeamFnDataOutboundAggregator.class)); when(dataServer.getService()).thenReturn(dataService); when(stateServer.getApiServiceDescriptor()) .thenReturn(ApiServiceDescriptor.getDefaultInstance()); diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java index 6c41dd4c290..52f5e2067c5 100644 --- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java @@ -24,14 +24,17 @@ import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; import java.util.ArrayList; @@ -46,6 +49,7 @@ import java.util.concurrent.TimeUnit; import org.apache.beam.model.fnexecution.v1.BeamFnApi; import org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleApplication; import org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements; import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse; import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor; import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleProgressResponse; @@ -66,12 +70,13 @@ import org.apache.beam.runners.fnexecution.data.RemoteInputDestination; import org.apache.beam.runners.fnexecution.state.StateDelegator; import org.apache.beam.runners.fnexecution.state.StateRequestHandler; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.LengthPrefixCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator; import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver; import org.apache.beam.sdk.fn.data.FnDataReceiver; -import org.apache.beam.sdk.fn.data.InboundDataClient; import org.apache.beam.sdk.fn.data.RemoteGrpcPortRead; import org.apache.beam.sdk.fn.data.RemoteGrpcPortWrite; import org.apache.beam.sdk.transforms.Create; @@ -93,6 +98,7 @@ import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.ArgumentCaptor; +import org.mockito.Captor; import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.Mockito; @@ -107,7 +113,7 @@ public class SdkHarnessClientTest { @Mock public FnApiControlClient fnApiControlClient; @Mock public FnDataService dataService; - + @Captor ArgumentCaptor<CloseableFnDataReceiver<BeamFnApi.Elements>> outputReceiverCaptor; @Rule public EmbeddedSdkHarness harness = EmbeddedSdkHarness.create(); @Rule public ExpectedException thrown = ExpectedException.none(); @@ -240,7 +246,8 @@ public class SdkHarnessClientTest { Collections.singletonList( RemoteInputDestination.of( (FullWindowedValueCoder) coder, SDK_GRPC_READ_TRANSFORM))); - when(dataService.send(any(), eq(coder))).thenReturn(mock(CloseableFnDataReceiver.class)); + when(dataService.createOutboundAggregator(any(), anyBoolean())) + .thenReturn(mock(BeamFnDataOutboundAggregator.class)); try (RemoteBundle activeBundle = processor.newBundle(Collections.emptyMap(), BundleProgressHandler.ignored())) { @@ -270,7 +277,8 @@ public class SdkHarnessClientTest { Collections.singletonList( RemoteInputDestination.of( (FullWindowedValueCoder) coder, SDK_GRPC_READ_TRANSFORM))); - when(dataService.send(any(), eq(coder))).thenReturn(mock(CloseableFnDataReceiver.class)); + when(dataService.createOutboundAggregator(any(), anyBoolean())) + .thenReturn(mock(BeamFnDataOutboundAggregator.class)); RemoteBundle activeBundle = processor.newBundle(Collections.emptyMap(), BundleProgressHandler.ignored()); @@ -302,7 +310,8 @@ public class SdkHarnessClientTest { Collections.singletonList( RemoteInputDestination.of( (FullWindowedValueCoder) coder, SDK_GRPC_READ_TRANSFORM))); - when(dataService.send(any(), eq(coder))).thenReturn(mock(CloseableFnDataReceiver.class)); + when(dataService.createOutboundAggregator(any(), anyBoolean())) + .thenReturn(mock(BeamFnDataOutboundAggregator.class)); RemoteBundle activeBundle = processor.newBundle(Collections.emptyMap(), BundleProgressHandler.ignored()); @@ -352,7 +361,8 @@ public class SdkHarnessClientTest { Collections.singletonList( RemoteInputDestination.of( (FullWindowedValueCoder) coder, SDK_GRPC_READ_TRANSFORM))); - when(dataService.send(any(), eq(coder))).thenReturn(mock(CloseableFnDataReceiver.class)); + when(dataService.createOutboundAggregator(any(), anyBoolean())) + .thenReturn(mock(BeamFnDataOutboundAggregator.class)); BundleProgressHandler mockProgressHandler = mock(BundleProgressHandler.class); @@ -423,7 +433,8 @@ public class SdkHarnessClientTest { Collections.singletonList( RemoteInputDestination.of( (FullWindowedValueCoder) coder, SDK_GRPC_READ_TRANSFORM))); - when(dataService.send(any(), eq(coder))).thenReturn(mock(CloseableFnDataReceiver.class)); + when(dataService.createOutboundAggregator(any(), anyBoolean())) + .thenReturn(mock(BeamFnDataOutboundAggregator.class)); BundleCheckpointHandler mockCheckpointHandler = mock(BundleCheckpointHandler.class); BundleSplitHandler mockSplitHandler = mock(BundleSplitHandler.class); @@ -519,10 +530,9 @@ public class SdkHarnessClientTest { @Test public void handleCleanupWhenInputSenderFails() throws Exception { - Exception testException = new Exception(); + RuntimeException testException = new RuntimeException(); - InboundDataClient mockOutputReceiver = mock(InboundDataClient.class); - CloseableFnDataReceiver mockInputSender = mock(CloseableFnDataReceiver.class); + BeamFnDataOutboundAggregator mockInputSender = mock(BeamFnDataOutboundAggregator.class); CompletableFuture<InstructionResponse> processBundleResponseFuture = new CompletableFuture<>(); when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class))) @@ -536,18 +546,21 @@ public class SdkHarnessClientTest { Collections.singletonList( RemoteInputDestination.of( (FullWindowedValueCoder) coder, SDK_GRPC_READ_TRANSFORM))); - when(dataService.receive(any(), any(), any())).thenReturn(mockOutputReceiver); - when(dataService.send(any(), eq(coder))).thenReturn(mockInputSender); + doNothing().when(dataService).registerReceiver(any(), outputReceiverCaptor.capture()); + when(dataService.createOutboundAggregator(any(), anyBoolean())).thenReturn(mockInputSender); - doThrow(testException).when(mockInputSender).close(); + doThrow(testException) + .when(mockInputSender) + .sendOrCollectBufferedDataAndFinishOutboundStreams(); - RemoteOutputReceiver mockRemoteOutputReceiver = mock(RemoteOutputReceiver.class); BundleProgressHandler mockProgressHandler = mock(BundleProgressHandler.class); try { try (RemoteBundle activeBundle = processor.newBundle( - ImmutableMap.of(SDK_GRPC_WRITE_TRANSFORM, mockRemoteOutputReceiver), + ImmutableMap.of( + SDK_GRPC_WRITE_TRANSFORM, + RemoteOutputReceiver.of(ByteArrayCoder.of(), mock(FnDataReceiver.class))), mockProgressHandler)) { // We shouldn't be required to complete the process bundle response future. } @@ -555,17 +568,21 @@ public class SdkHarnessClientTest { } catch (Exception e) { assertEquals(testException, e); - verify(mockOutputReceiver).cancel(); - verifyNoMoreInteractions(mockOutputReceiver); + // We expect that we don't register the receiver and the next accept call will raise an error + // making the data service aware of the error. + verify(dataService, never()).unregisterReceiver(any()); + assertThrows( + "Inbound observer closed.", + Exception.class, + () -> outputReceiverCaptor.getValue().accept(Elements.getDefaultInstance())); } } @Test public void handleCleanupWithStateWhenInputSenderFails() throws Exception { - Exception testException = new Exception(); + RuntimeException testException = new RuntimeException(); - InboundDataClient mockOutputReceiver = mock(InboundDataClient.class); - CloseableFnDataReceiver mockInputSender = mock(CloseableFnDataReceiver.class); + BeamFnDataOutboundAggregator mockInputSender = mock(BeamFnDataOutboundAggregator.class); StateDelegator mockStateDelegator = mock(StateDelegator.class); StateDelegator.Registration mockStateRegistration = mock(StateDelegator.Registration.class); @@ -587,17 +604,18 @@ public class SdkHarnessClientTest { Collections.singletonList( RemoteInputDestination.of((FullWindowedValueCoder) coder, SDK_GRPC_READ_TRANSFORM)), mockStateDelegator); - when(dataService.receive(any(), any(), any())).thenReturn(mockOutputReceiver); - when(dataService.send(any(), eq(coder))).thenReturn(mockInputSender); - - doThrow(testException).when(mockInputSender).close(); + when(dataService.createOutboundAggregator(any(), anyBoolean())).thenReturn(mockInputSender); - RemoteOutputReceiver mockRemoteOutputReceiver = mock(RemoteOutputReceiver.class); + doThrow(testException) + .when(mockInputSender) + .sendOrCollectBufferedDataAndFinishOutboundStreams(); try { try (RemoteBundle activeBundle = processor.newBundle( - ImmutableMap.of(SDK_GRPC_WRITE_TRANSFORM, mockRemoteOutputReceiver), + ImmutableMap.of( + SDK_GRPC_WRITE_TRANSFORM, + RemoteOutputReceiver.of(ByteArrayCoder.of(), mock(FnDataReceiver.class))), mockStateHandler, mockProgressHandler)) { // We shouldn't be required to complete the process bundle response future. @@ -607,17 +625,21 @@ public class SdkHarnessClientTest { assertEquals(testException, e); verify(mockStateRegistration).abort(); - verify(mockOutputReceiver).cancel(); - verifyNoMoreInteractions(mockStateRegistration, mockOutputReceiver); + // We expect that we don't register the receiver and the next accept call will raise an error + // making the data service aware of the error. + verify(dataService, never()).unregisterReceiver(any()); + assertThrows( + "Inbound observer closed.", + Exception.class, + () -> outputReceiverCaptor.getValue().accept(Elements.getDefaultInstance())); } } @Test public void handleCleanupWhenProcessingBundleFails() throws Exception { - Exception testException = new Exception(); + RuntimeException testException = new RuntimeException(); - InboundDataClient mockOutputReceiver = mock(InboundDataClient.class); - CloseableFnDataReceiver mockInputSender = mock(CloseableFnDataReceiver.class); + BeamFnDataOutboundAggregator mockInputSender = mock(BeamFnDataOutboundAggregator.class); CompletableFuture<InstructionResponse> processBundleResponseFuture = new CompletableFuture<>(); when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class))) @@ -631,16 +653,16 @@ public class SdkHarnessClientTest { Collections.singletonList( RemoteInputDestination.of( (FullWindowedValueCoder) coder, SDK_GRPC_READ_TRANSFORM))); - when(dataService.receive(any(), any(), any())).thenReturn(mockOutputReceiver); - when(dataService.send(any(), eq(coder))).thenReturn(mockInputSender); + when(dataService.createOutboundAggregator(any(), anyBoolean())).thenReturn(mockInputSender); - RemoteOutputReceiver mockRemoteOutputReceiver = mock(RemoteOutputReceiver.class); BundleProgressHandler mockProgressHandler = mock(BundleProgressHandler.class); try { try (RemoteBundle activeBundle = processor.newBundle( - ImmutableMap.of(SDK_GRPC_WRITE_TRANSFORM, mockRemoteOutputReceiver), + ImmutableMap.of( + SDK_GRPC_WRITE_TRANSFORM, + RemoteOutputReceiver.of(ByteArrayCoder.of(), mock(FnDataReceiver.class))), mockProgressHandler)) { processBundleResponseFuture.completeExceptionally(testException); } @@ -648,17 +670,21 @@ public class SdkHarnessClientTest { } catch (ExecutionException e) { assertEquals(testException, e.getCause()); - verify(mockOutputReceiver).cancel(); - verifyNoMoreInteractions(mockOutputReceiver); + // We expect that we don't register the receiver and the next accept call will raise an error + // making the data service aware of the error. + verify(dataService, never()).unregisterReceiver(any()); + assertThrows( + "Inbound observer closed.", + Exception.class, + () -> outputReceiverCaptor.getValue().accept(Elements.getDefaultInstance())); } } @Test public void handleCleanupWithStateWhenProcessingBundleFails() throws Exception { - Exception testException = new Exception(); + RuntimeException testException = new RuntimeException(); - InboundDataClient mockOutputReceiver = mock(InboundDataClient.class); - CloseableFnDataReceiver mockInputSender = mock(CloseableFnDataReceiver.class); + BeamFnDataOutboundAggregator mockInputSender = mock(BeamFnDataOutboundAggregator.class); StateDelegator mockStateDelegator = mock(StateDelegator.class); StateDelegator.Registration mockStateRegistration = mock(StateDelegator.Registration.class); when(mockStateDelegator.registerForProcessBundleInstructionId(any(), any())) @@ -679,15 +705,14 @@ public class SdkHarnessClientTest { Collections.singletonList( RemoteInputDestination.of((FullWindowedValueCoder) coder, SDK_GRPC_READ_TRANSFORM)), mockStateDelegator); - when(dataService.receive(any(), any(), any())).thenReturn(mockOutputReceiver); - when(dataService.send(any(), eq(coder))).thenReturn(mockInputSender); - - RemoteOutputReceiver mockRemoteOutputReceiver = mock(RemoteOutputReceiver.class); + when(dataService.createOutboundAggregator(any(), anyBoolean())).thenReturn(mockInputSender); try { try (RemoteBundle activeBundle = processor.newBundle( - ImmutableMap.of(SDK_GRPC_WRITE_TRANSFORM, mockRemoteOutputReceiver), + ImmutableMap.of( + SDK_GRPC_WRITE_TRANSFORM, + RemoteOutputReceiver.of(ByteArrayCoder.of(), mock(FnDataReceiver.class))), mockStateHandler, mockProgressHandler)) { processBundleResponseFuture.completeExceptionally(testException); @@ -697,17 +722,19 @@ public class SdkHarnessClientTest { assertEquals(testException, e.getCause()); verify(mockStateRegistration).abort(); - verify(mockOutputReceiver).cancel(); - verifyNoMoreInteractions(mockStateRegistration, mockOutputReceiver); + // We expect that we don't register the receiver and the next accept call will raise an error + // making the data service aware of the error. + verify(dataService, never()).unregisterReceiver(any()); + assertThrows( + "Inbound observer closed.", + Exception.class, + () -> outputReceiverCaptor.getValue().accept(Elements.getDefaultInstance())); } } @Test public void handleCleanupWhenAwaitingOnClosingOutputReceivers() throws Exception { - Exception testException = new Exception(); - - InboundDataClient mockOutputReceiver = mock(InboundDataClient.class); - CloseableFnDataReceiver mockInputSender = mock(CloseableFnDataReceiver.class); + BeamFnDataOutboundAggregator mockInputSender = mock(BeamFnDataOutboundAggregator.class); CompletableFuture<InstructionResponse> processBundleResponseFuture = new CompletableFuture<>(); when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class))) @@ -721,41 +748,36 @@ public class SdkHarnessClientTest { Collections.singletonList( RemoteInputDestination.of( (FullWindowedValueCoder) coder, SDK_GRPC_READ_TRANSFORM))); - when(dataService.receive(any(), any(), any())).thenReturn(mockOutputReceiver); - when(dataService.send(any(), eq(coder))).thenReturn(mockInputSender); - doThrow(testException).when(mockOutputReceiver).awaitCompletion(); + doNothing().when(dataService).registerReceiver(any(), outputReceiverCaptor.capture()); + when(dataService.createOutboundAggregator(any(), anyBoolean())).thenReturn(mockInputSender); - RemoteOutputReceiver mockRemoteOutputReceiver = mock(RemoteOutputReceiver.class); BundleProgressHandler mockProgressHandler = mock(BundleProgressHandler.class); - try { - try (RemoteBundle activeBundle = - processor.newBundle( - ImmutableMap.of(SDK_GRPC_WRITE_TRANSFORM, mockRemoteOutputReceiver), - mockProgressHandler)) { - // Correlating the ProcessBundleRequest and ProcessBundleResponse is owned by the underlying - // FnApiControlClient. The SdkHarnessClient owns just wrapping the request and unwrapping - // the response. - // - // Currently there are no fields so there's nothing to check. This test is formulated - // to match the pattern it should have if/when the response is meaningful. - BeamFnApi.ProcessBundleResponse response = - BeamFnApi.ProcessBundleResponse.getDefaultInstance(); - processBundleResponseFuture.complete( - BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response).build()); - } - fail("Exception expected"); - } catch (Exception e) { - assertEquals(testException, e); - } + RemoteBundle activeBundle = + processor.newBundle( + ImmutableMap.of( + SDK_GRPC_WRITE_TRANSFORM, + RemoteOutputReceiver.of(ByteArrayCoder.of(), mock(FnDataReceiver.class))), + mockProgressHandler); + // Correlating the ProcessBundleRequest and ProcessBundleResponse is owned by the underlying + // FnApiControlClient. The SdkHarnessClient owns just wrapping the request and unwrapping + // the response. + // + // Currently there are no fields so there's nothing to check. This test is formulated + // to match the pattern it should have if/when the response is meaningful. + BeamFnApi.ProcessBundleResponse response = BeamFnApi.ProcessBundleResponse.getDefaultInstance(); + processBundleResponseFuture.complete( + BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response).build()); + // Inject an error before we close the bundle as if the data service closed the stream + // explicitly + outputReceiverCaptor.getValue().close(); + + assertThrows("Inbound observer closed.", Exception.class, activeBundle::close); } @Test public void handleCleanupWithStateWhenAwaitingOnClosingOutputReceivers() throws Exception { - Exception testException = new Exception(); - - InboundDataClient mockOutputReceiver = mock(InboundDataClient.class); - CloseableFnDataReceiver mockInputSender = mock(CloseableFnDataReceiver.class); + BeamFnDataOutboundAggregator mockInputSender = mock(BeamFnDataOutboundAggregator.class); StateDelegator mockStateDelegator = mock(StateDelegator.class); StateDelegator.Registration mockStateRegistration = mock(StateDelegator.Registration.class); when(mockStateDelegator.registerForProcessBundleInstructionId(any(), any())) @@ -776,37 +798,34 @@ public class SdkHarnessClientTest { Collections.singletonList( RemoteInputDestination.of((FullWindowedValueCoder) coder, SDK_GRPC_READ_TRANSFORM)), mockStateDelegator); - when(dataService.receive(any(), any(), any())).thenReturn(mockOutputReceiver); - when(dataService.send(any(), eq(coder))).thenReturn(mockInputSender); - doThrow(testException).when(mockOutputReceiver).awaitCompletion(); - - RemoteOutputReceiver mockRemoteOutputReceiver = mock(RemoteOutputReceiver.class); + doNothing().when(dataService).registerReceiver(any(), outputReceiverCaptor.capture()); + when(dataService.createOutboundAggregator(any(), anyBoolean())).thenReturn(mockInputSender); - try { - try (RemoteBundle activeBundle = - processor.newBundle( - ImmutableMap.of(SDK_GRPC_WRITE_TRANSFORM, mockRemoteOutputReceiver), - mockStateHandler, - mockProgressHandler)) { - // Correlating the ProcessBundleRequest and ProcessBundleResponse is owned by the underlying - // FnApiControlClient. The SdkHarnessClient owns just wrapping the request and unwrapping - // the response. - // - // Currently there are no fields so there's nothing to check. This test is formulated - // to match the pattern it should have if/when the response is meaningful. - BeamFnApi.ProcessBundleResponse response = - BeamFnApi.ProcessBundleResponse.getDefaultInstance(); - processBundleResponseFuture.complete( - BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response).build()); - } - fail("Exception expected"); - } catch (Exception e) { - assertEquals(testException, e); - } + RemoteBundle activeBundle = + processor.newBundle( + ImmutableMap.of( + SDK_GRPC_WRITE_TRANSFORM, + RemoteOutputReceiver.of(ByteArrayCoder.of(), mock(FnDataReceiver.class))), + mockStateHandler, + mockProgressHandler); + // Correlating the ProcessBundleRequest and ProcessBundleResponse is owned by the underlying + // FnApiControlClient. The SdkHarnessClient owns just wrapping the request and unwrapping + // the response. + // + // Currently there are no fields so there's nothing to check. This test is formulated + // to match the pattern it should have if/when the response is meaningful. + BeamFnApi.ProcessBundleResponse response = BeamFnApi.ProcessBundleResponse.getDefaultInstance(); + processBundleResponseFuture.complete( + BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response).build()); + // Inject an error before we close the bundle as if the data service closed the stream + // explicitly. + outputReceiverCaptor.getValue().close(); + assertThrows("Inbound observer closed.", Exception.class, activeBundle::close); } @Test public void verifyCacheTokensAreUsedInNewBundleRequest() throws InterruptedException { + BeamFnDataOutboundAggregator mockInputSender = mock(BeamFnDataOutboundAggregator.class); when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class))) .thenReturn( CompletableFuture.<InstructionResponse>completedFuture( @@ -822,7 +841,7 @@ public class SdkHarnessClientTest { SDK_GRPC_READ_TRANSFORM)); BundleProcessor processor1 = sdkHarnessClient.getProcessor(descriptor1, remoteInputs); - when(dataService.send(any(), any())).thenReturn(mock(CloseableFnDataReceiver.class)); + when(dataService.createOutboundAggregator(any(), anyBoolean())).thenReturn(mockInputSender); StateRequestHandler stateRequestHandler = Mockito.mock(StateRequestHandler.class); List<BeamFnApi.ProcessBundleRequest.CacheToken> cacheTokens = @@ -831,7 +850,9 @@ public class SdkHarnessClientTest { when(stateRequestHandler.getCacheTokens()).thenReturn(cacheTokens); processor1.newBundle( - ImmutableMap.of(SDK_GRPC_WRITE_TRANSFORM, mock(RemoteOutputReceiver.class)), + ImmutableMap.of( + SDK_GRPC_WRITE_TRANSFORM, + RemoteOutputReceiver.of(ByteArrayCoder.of(), mock(FnDataReceiver.class))), stateRequestHandler, BundleProgressHandler.ignored()); @@ -850,8 +871,7 @@ public class SdkHarnessClientTest { @Test public void testBundleCheckpointCallback() throws Exception { - InboundDataClient mockOutputReceiver = mock(InboundDataClient.class); - CloseableFnDataReceiver mockInputSender = mock(CloseableFnDataReceiver.class); + BeamFnDataOutboundAggregator mockInputSender = mock(BeamFnDataOutboundAggregator.class); CompletableFuture<InstructionResponse> processBundleResponseFuture = new CompletableFuture<>(); when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class))) @@ -865,10 +885,8 @@ public class SdkHarnessClientTest { Collections.singletonList( RemoteInputDestination.of( (FullWindowedValueCoder) coder, SDK_GRPC_READ_TRANSFORM))); - when(dataService.receive(any(), any(), any())).thenReturn(mockOutputReceiver); - when(dataService.send(any(), eq(coder))).thenReturn(mockInputSender); + when(dataService.createOutboundAggregator(any(), anyBoolean())).thenReturn(mockInputSender); - RemoteOutputReceiver mockRemoteOutputReceiver = mock(RemoteOutputReceiver.class); BundleProgressHandler mockProgressHandler = mock(BundleProgressHandler.class); BundleSplitHandler mockSplitHandler = mock(BundleSplitHandler.class); BundleCheckpointHandler mockCheckpointHandler = mock(BundleCheckpointHandler.class); @@ -880,7 +898,7 @@ public class SdkHarnessClientTest { .build(); try (ActiveBundle activeBundle = processor.newBundle( - ImmutableMap.of(SDK_GRPC_WRITE_TRANSFORM, mockRemoteOutputReceiver), + Collections.emptyMap(), Collections.emptyMap(), (request) -> { throw new UnsupportedOperationException(); @@ -895,13 +913,12 @@ public class SdkHarnessClientTest { verify(mockProgressHandler).onCompleted(response); verify(mockCheckpointHandler).onCheckpoint(response); - verifyZeroInteractions(mockFinalizationHandler, mockSplitHandler); + verifyNoMoreInteractions(mockFinalizationHandler, mockSplitHandler); } @Test public void testBundleFinalizationCallback() throws Exception { - InboundDataClient mockOutputReceiver = mock(InboundDataClient.class); - CloseableFnDataReceiver mockInputSender = mock(CloseableFnDataReceiver.class); + BeamFnDataOutboundAggregator mockInputSender = mock(BeamFnDataOutboundAggregator.class); CompletableFuture<InstructionResponse> processBundleResponseFuture = new CompletableFuture<>(); when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class))) @@ -915,10 +932,8 @@ public class SdkHarnessClientTest { Collections.singletonList( RemoteInputDestination.of( (FullWindowedValueCoder) coder, SDK_GRPC_READ_TRANSFORM))); - when(dataService.receive(any(), any(), any())).thenReturn(mockOutputReceiver); - when(dataService.send(any(), eq(coder))).thenReturn(mockInputSender); + when(dataService.createOutboundAggregator(any(), anyBoolean())).thenReturn(mockInputSender); - RemoteOutputReceiver mockRemoteOutputReceiver = mock(RemoteOutputReceiver.class); BundleProgressHandler mockProgressHandler = mock(BundleProgressHandler.class); BundleSplitHandler mockSplitHandler = mock(BundleSplitHandler.class); BundleCheckpointHandler mockCheckpointHandler = mock(BundleCheckpointHandler.class); @@ -929,7 +944,7 @@ public class SdkHarnessClientTest { String bundleId; try (ActiveBundle activeBundle = processor.newBundle( - ImmutableMap.of(SDK_GRPC_WRITE_TRANSFORM, mockRemoteOutputReceiver), + Collections.emptyMap(), Collections.emptyMap(), (request) -> { throw new UnsupportedOperationException(); @@ -945,7 +960,7 @@ public class SdkHarnessClientTest { verify(mockProgressHandler).onCompleted(response); verify(mockFinalizationHandler).requestsFinalization(bundleId); - verifyZeroInteractions(mockCheckpointHandler, mockSplitHandler); + verifyNoMoreInteractions(mockCheckpointHandler, mockSplitHandler); } private static class TestFn extends DoFn<String, String> { diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/data/GrpcDataServiceTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/data/GrpcDataServiceTest.java index ae79d909c2c..86bf247fa34 100644 --- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/data/GrpcDataServiceTest.java +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/data/GrpcDataServiceTest.java @@ -24,7 +24,9 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -38,9 +40,10 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.LengthPrefixCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver; -import org.apache.beam.sdk.fn.data.InboundDataClient; -import org.apache.beam.sdk.fn.data.LogicalEndpoint; +import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2; +import org.apache.beam.sdk.fn.data.BeamFnDataOutboundAggregator; +import org.apache.beam.sdk.fn.data.DataEndpoint; +import org.apache.beam.sdk.fn.data.FnDataReceiver; import org.apache.beam.sdk.fn.server.GrpcFnServer; import org.apache.beam.sdk.fn.server.InProcessServerFactory; import org.apache.beam.sdk.fn.stream.OutboundObserverFactory; @@ -93,13 +96,16 @@ public class GrpcDataServiceTest { } for (int i = 0; i < 3; ++i) { - CloseableFnDataReceiver<WindowedValue<String>> consumer = - service.send(LogicalEndpoint.data(Integer.toString(i), TRANSFORM_ID), CODER); - + final String instructionId = Integer.toString(i); + BeamFnDataOutboundAggregator aggregator = + service.createOutboundAggregator(() -> instructionId, false); + aggregator.start(); + FnDataReceiver<WindowedValue<String>> consumer = + aggregator.registerOutputDataLocation(TRANSFORM_ID, CODER); consumer.accept(WindowedValue.valueInGlobalWindow("A" + i)); consumer.accept(WindowedValue.valueInGlobalWindow("B" + i)); consumer.accept(WindowedValue.valueInGlobalWindow("C" + i)); - consumer.close(); + aggregator.sendOrCollectBufferedDataAndFinishOutboundStreams(); } waitForInboundElements.countDown(); for (Future<Void> clientFuture : clientFutures) { @@ -144,18 +150,19 @@ public class GrpcDataServiceTest { } List<Collection<WindowedValue<String>>> serverInboundValues = new ArrayList<>(); - Collection<InboundDataClient> readFutures = new ArrayList<>(); + Collection<BeamFnDataInboundObserver2> inboundObservers = new ArrayList<>(); for (int i = 0; i < 3; ++i) { final Collection<WindowedValue<String>> serverInboundValue = new ArrayList<>(); serverInboundValues.add(serverInboundValue); - readFutures.add( - service.receive( - LogicalEndpoint.data(Integer.toString(i), TRANSFORM_ID), - CODER, - serverInboundValue::add)); + BeamFnDataInboundObserver2 inboundObserver = + BeamFnDataInboundObserver2.forConsumers( + Arrays.asList(DataEndpoint.create(TRANSFORM_ID, CODER, serverInboundValue::add)), + Collections.emptyList()); + service.registerReceiver(Integer.toString(i), inboundObserver); + inboundObservers.add(inboundObserver); } - for (InboundDataClient readFuture : readFutures) { - readFuture.awaitCompletion(); + for (BeamFnDataInboundObserver2 inboundObserver : inboundObservers) { + inboundObserver.awaitCompletion(); } waitForInboundElements.countDown(); for (Future<Void> clientFuture : clientFutures) { diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer.java deleted file mode 100644 index d71c87a80de..00000000000 --- a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer.java +++ /dev/null @@ -1,227 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.fn.data; - -import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; -import java.util.function.BiConsumer; -import java.util.function.Consumer; -import org.apache.beam.model.fnexecution.v1.BeamFnApi; -import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements; -import org.apache.beam.model.pipeline.v1.Endpoints; -import org.apache.beam.sdk.fn.stream.OutboundObserverFactory; -import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString; -import org.apache.beam.vendor.grpc.v1p48p1.io.grpc.Status; -import org.apache.beam.vendor.grpc.v1p48p1.io.grpc.stub.StreamObserver; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; -import org.checkerframework.checker.nullness.qual.Nullable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A gRPC multiplexer for a specific {@link Endpoints.ApiServiceDescriptor}. - * - * <p>Multiplexes data for inbound consumers based upon their individual {@link - * org.apache.beam.model.fnexecution.v1.BeamFnApi.Target}s. - * - * <p>Multiplexing inbound and outbound streams is as thread safe as the consumers of those streams. - * For inbound streams, this is as thread safe as the inbound observers. For outbound streams, this - * is as thread safe as the underlying stream observer. - * - * <p>TODO: Add support for multiplexing over multiple outbound observers by stickying the output - * location with a specific outbound observer. - * - * @deprecated Migrate to {@link BeamFnDataGrpcMultiplexer2}. - */ -@Deprecated -public class BeamFnDataGrpcMultiplexer implements AutoCloseable { - private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataGrpcMultiplexer.class); - private final Endpoints.@Nullable ApiServiceDescriptor apiServiceDescriptor; - private final StreamObserver<BeamFnApi.Elements> inboundObserver; - private final StreamObserver<BeamFnApi.Elements> outboundObserver; - private final ConcurrentMap<LogicalEndpoint, CompletableFuture<BiConsumer<ByteString, Boolean>>> - consumers; - - public BeamFnDataGrpcMultiplexer( - Endpoints.@Nullable ApiServiceDescriptor apiServiceDescriptor, - OutboundObserverFactory outboundObserverFactory, - OutboundObserverFactory.BasicFactory<Elements, Elements> baseOutboundObserverFactory) { - this.apiServiceDescriptor = apiServiceDescriptor; - this.consumers = new ConcurrentHashMap<>(); - this.inboundObserver = new InboundObserver(); - this.outboundObserver = - outboundObserverFactory.outboundObserverFor(baseOutboundObserverFactory, inboundObserver); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .omitNullValues() - .add("apiServiceDescriptor", apiServiceDescriptor) - .add("consumers", consumers) - .toString(); - } - - public StreamObserver<BeamFnApi.Elements> getInboundObserver() { - return inboundObserver; - } - - public StreamObserver<BeamFnApi.Elements> getOutboundObserver() { - return outboundObserver; - } - - private CompletableFuture<BiConsumer<ByteString, Boolean>> receiverFuture( - LogicalEndpoint endpoint) { - return consumers.computeIfAbsent( - endpoint, (LogicalEndpoint unused) -> new CompletableFuture<>()); - } - - public <T> void registerConsumer( - LogicalEndpoint inputLocation, BiConsumer<ByteString, Boolean> bytesReceiver) { - receiverFuture(inputLocation).complete(bytesReceiver); - } - - @VisibleForTesting - boolean hasConsumer(LogicalEndpoint outputLocation) { - return consumers.containsKey(outputLocation); - } - - @Override - public void close() { - for (CompletableFuture<BiConsumer<ByteString, Boolean>> receiver : - ImmutableList.copyOf(consumers.values())) { - // Cancel any observer waiting for the client to complete. If the receiver has already been - // completed or cancelled, this call will be ignored. - receiver.cancel(true); - } - // Cancel any outbound calls and complete any inbound calls, as this multiplexer is hanging up - outboundObserver.onError( - Status.CANCELLED.withDescription("Multiplexer hanging up").asException()); - inboundObserver.onCompleted(); - } - - /** - * A multiplexing {@link StreamObserver} that selects the inbound {@link Consumer} to pass the - * elements to. - * - * <p>The inbound observer blocks until the {@link Consumer} is bound allowing for the sending - * harness to initiate transmitting data without needing for the receiving harness to signal that - * it is ready to consume that data. - */ - private final class InboundObserver implements StreamObserver<BeamFnApi.Elements> { - @Override - public void onNext(BeamFnApi.Elements value) { - for (BeamFnApi.Elements.Data maybeData : value.getDataList()) { - BeamFnApi.Elements.Data data = checkArgumentNotNull(maybeData); - try { - LogicalEndpoint key = - LogicalEndpoint.data(data.getInstructionId(), data.getTransformId()); - CompletableFuture<BiConsumer<ByteString, Boolean>> consumer = receiverFuture(key); - if (!consumer.isDone()) { - LOG.debug( - "Received data for key {} without consumer ready. " - + "Waiting for consumer to be registered.", - key); - } - consumer.get().accept(data.getData(), data.getIsLast()); - if (data.getIsLast()) { - consumers.remove(key); - } - /* - * TODO: On failure we should fail any bundles that were impacted eagerly - * instead of relying on the Runner harness to do all the failure handling. - */ - } catch (ExecutionException | InterruptedException e) { - LOG.error( - "Client interrupted during handling of data for instruction {} and transform {}", - data.getInstructionId(), - data.getTransformId(), - e); - outboundObserver.onError(e); - } catch (RuntimeException e) { - LOG.error( - "Client failed to handle data for instruction {} and transform {}", - data.getInstructionId(), - data.getTransformId(), - e); - outboundObserver.onError(e); - } - } - - for (BeamFnApi.Elements.Timers timer : value.getTimersList()) { - try { - LogicalEndpoint key = - LogicalEndpoint.timer( - timer.getInstructionId(), timer.getTransformId(), timer.getTimerFamilyId()); - CompletableFuture<BiConsumer<ByteString, Boolean>> consumer = receiverFuture(key); - if (!consumer.isDone()) { - LOG.debug( - "Received data for key {} without consumer ready. " - + "Waiting for consumer to be registered.", - key); - } - consumer.get().accept(timer.getTimers(), timer.getIsLast()); - if (timer.getIsLast()) { - consumers.remove(key); - } - /* - * TODO: On failure we should fail any bundles that were impacted eagerly - * instead of relying on the Runner harness to do all the failure handling. - */ - } catch (ExecutionException | InterruptedException e) { - LOG.error( - "Client interrupted during handling of timer for instruction {}, transform {}, and timer family {}", - timer.getInstructionId(), - timer.getTransformId(), - timer.getTimerFamilyId(), - e); - outboundObserver.onError(e); - } catch (RuntimeException e) { - LOG.error( - "Client failed to handle timer for instruction {}, transform {}, and timer family {}", - timer.getInstructionId(), - timer.getTransformId(), - timer.getTimerFamilyId(), - e); - outboundObserver.onError(e); - } - } - } - - @Override - public void onError(Throwable t) { - LOG.error( - "Failed to handle for {}", - apiServiceDescriptor == null ? "unknown endpoint" : apiServiceDescriptor, - t); - } - - @Override - public void onCompleted() { - LOG.warn( - "Hanged up for {}.", - apiServiceDescriptor == null ? "unknown endpoint" : apiServiceDescriptor); - } - } -} diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2.java index 3b80f272a71..bc552dce08d 100644 --- a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2.java +++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer2.java @@ -253,7 +253,6 @@ public class BeamFnDataGrpcMultiplexer2 implements AutoCloseable { "Failed to handle for {}", apiServiceDescriptor == null ? "unknown endpoint" : apiServiceDescriptor, t); - outboundObserver.onCompleted(); } @Override @@ -261,7 +260,6 @@ public class BeamFnDataGrpcMultiplexer2 implements AutoCloseable { LOG.warn( "Hanged up for {}.", apiServiceDescriptor == null ? "unknown endpoint" : apiServiceDescriptor); - outboundObserver.onCompleted(); } } } diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver.java deleted file mode 100644 index 324113be4c2..00000000000 --- a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.fn.data; - -import java.util.function.BiConsumer; -import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Decodes individually consumed {@link ByteString}s with the provided {@link Coder} passing the - * individual decoded elements to the provided consumer. - * - * @deprecated Migrate to {@link BeamFnDataInboundObserver2}. - */ -@Deprecated -public class BeamFnDataInboundObserver - implements BiConsumer<ByteString, Boolean>, InboundDataClient { - private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataInboundObserver.class); - - public static BeamFnDataInboundObserver forConsumer( - LogicalEndpoint endpoint, FnDataReceiver<ByteString> receiver) { - return new BeamFnDataInboundObserver( - endpoint, receiver, CompletableFutureInboundDataClient.create()); - } - - private final LogicalEndpoint endpoint; - private final FnDataReceiver<ByteString> consumer; - private final InboundDataClient readFuture; - private long byteCounter; - - public BeamFnDataInboundObserver( - LogicalEndpoint endpoint, FnDataReceiver<ByteString> consumer, InboundDataClient readFuture) { - this.endpoint = endpoint; - this.consumer = consumer; - this.readFuture = readFuture; - } - - @Override - public void accept(ByteString payload, Boolean isLast) { - if (readFuture.isDone()) { - // Drop any incoming data if the stream processing has finished. - return; - } - try { - if (isLast) { - LOG.debug("Closing stream for {} having consumed {} bytes", endpoint, byteCounter); - readFuture.complete(); - return; - } - - byteCounter += payload.size(); - consumer.accept(payload); - } catch (Exception e) { - readFuture.fail(e); - } - } - - @Override - public void awaitCompletion() throws Exception { - readFuture.awaitCompletion(); - } - - @Override - public void runWhenComplete(Runnable completeRunnable) { - readFuture.runWhenComplete(completeRunnable); - } - - @Override - public boolean isDone() { - return readFuture.isDone(); - } - - @Override - public void cancel() { - readFuture.cancel(); - } - - @Override - public void complete() { - readFuture.complete(); - } - - @Override - public void fail(Throwable t) { - readFuture.fail(t); - } -} diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundObserver.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundObserver.java deleted file mode 100644 index 2fca907940d..00000000000 --- a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundObserver.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.fn.data; - -import java.io.IOException; -import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.vendor.grpc.v1p48p1.io.grpc.stub.StreamObserver; - -/** - * An outbound {@link FnDataReceiver} for the Beam Fn Data API. - * - * <p>TODO: Handle outputting large elements (> 2GiBs). Note that this also applies to the input - * side as well. - * - * <p>TODO: Handle outputting elements that are zero bytes by outputting a single byte as a marker, - * detect on the input side that no bytes were read and force reading a single byte. - * - * @deprecated Migrate to use {@link BeamFnDataOutboundAggregator} directly. - */ -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) -@Deprecated -public class BeamFnDataOutboundObserver<T> implements CloseableFnDataReceiver<T> { - - private boolean closed; - private final BeamFnDataOutboundAggregator aggregator; - private final FnDataReceiver<T> dataReceiver; - - public BeamFnDataOutboundObserver( - LogicalEndpoint outputLocation, - Coder<T> coder, - StreamObserver<Elements> outboundObserver, - PipelineOptions options) { - this.aggregator = - new BeamFnDataOutboundAggregator( - options, outputLocation::getInstructionId, outboundObserver, false); - this.dataReceiver = - outputLocation.isTimer() - ? (FnDataReceiver<T>) - this.aggregator.registerOutputTimersLocation( - outputLocation.getTransformId(), - outputLocation.getTimerFamilyId(), - (Coder<Object>) coder) - : (FnDataReceiver<T>) - aggregator.registerOutputDataLocation( - outputLocation.getTransformId(), (Coder<Object>) coder); - aggregator.start(); - this.closed = false; - } - - @Override - public void close() throws Exception { - this.aggregator.sendOrCollectBufferedDataAndFinishOutboundStreams(); - this.closed = true; - } - - @Override - public void flush() throws IOException { - aggregator.flush(); - } - - @Override - public void accept(T t) throws Exception { - if (closed) { - throw new IllegalStateException("Already closed."); - } - dataReceiver.accept(t); - } -} diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/CompletableFutureInboundDataClient.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/CompletableFutureInboundDataClient.java deleted file mode 100644 index 73b68222cfa..00000000000 --- a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/CompletableFutureInboundDataClient.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.fn.data; - -import java.util.concurrent.CompletableFuture; - -/** An {@link InboundDataClient} backed by a {@link CompletableFuture}. */ -public class CompletableFutureInboundDataClient implements InboundDataClient { - private static final Object COMPLETED = new Object(); - /** - * Create a new {@link CompletableFutureInboundDataClient} using a new {@link CompletableFuture}. - */ - public static InboundDataClient create() { - return forBackingFuture(new CompletableFuture<>()); - } - - /** - * Create a new {@link CompletableFutureInboundDataClient} wrapping the provided {@link - * CompletableFuture}. - */ - static InboundDataClient forBackingFuture(CompletableFuture<Object> future) { - return new CompletableFutureInboundDataClient(future); - } - - private final CompletableFuture<Object> future; - - private CompletableFutureInboundDataClient(CompletableFuture<Object> future) { - this.future = future; - } - - @Override - public void awaitCompletion() throws Exception { - future.get(); - } - - @Override - @SuppressWarnings("FutureReturnValueIgnored") - public void runWhenComplete(Runnable completeRunnable) { - future.whenComplete((result, throwable) -> completeRunnable.run()); - } - - @Override - public boolean isDone() { - return future.isDone(); - } - - @Override - public void cancel() { - future.cancel(true); - } - - @Override - public void complete() { - future.complete(COMPLETED); - } - - @Override - public void fail(Throwable t) { - // Use obtrudeException instead of CompleteExceptionally, forcing any future calls to .get() - // to raise the execption, even if the future is already compelted. - future.obtrudeException(t); - } -} diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexerTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexerTest.java deleted file mode 100644 index c82fd98bde6..00000000000 --- a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexerTest.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.fn.data; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.contains; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import org.apache.beam.model.fnexecution.v1.BeamFnApi; -import org.apache.beam.model.pipeline.v1.Endpoints; -import org.apache.beam.sdk.fn.stream.OutboundObserverFactory; -import org.apache.beam.sdk.fn.test.TestStreams; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles; -import org.junit.Test; - -/** Tests for {@link BeamFnDataGrpcMultiplexer}. */ -public class BeamFnDataGrpcMultiplexerTest { - private static final Endpoints.ApiServiceDescriptor DESCRIPTOR = - Endpoints.ApiServiceDescriptor.newBuilder().setUrl("test").build(); - private static final LogicalEndpoint DATA_LOCATION = LogicalEndpoint.data("777L", "888L"); - private static final LogicalEndpoint TIMER_LOCATION = - LogicalEndpoint.timer("999L", "555L", "333L"); - private static final BeamFnApi.Elements ELEMENTS = - BeamFnApi.Elements.newBuilder() - .addData( - BeamFnApi.Elements.Data.newBuilder() - .setInstructionId(DATA_LOCATION.getInstructionId()) - .setTransformId(DATA_LOCATION.getTransformId()) - .setData(ByteString.copyFrom(new byte[1]))) - .addTimers( - BeamFnApi.Elements.Timers.newBuilder() - .setInstructionId(TIMER_LOCATION.getInstructionId()) - .setTransformId(TIMER_LOCATION.getTransformId()) - .setTimerFamilyId(TIMER_LOCATION.getTimerFamilyId()) - .setTimers(ByteString.copyFrom(new byte[2]))) - .build(); - private static final BeamFnApi.Elements TERMINAL_ELEMENTS = - BeamFnApi.Elements.newBuilder() - .addData( - BeamFnApi.Elements.Data.newBuilder() - .setInstructionId(DATA_LOCATION.getInstructionId()) - .setTransformId(DATA_LOCATION.getTransformId()) - .setIsLast(true)) - .addTimers( - BeamFnApi.Elements.Timers.newBuilder() - .setInstructionId(TIMER_LOCATION.getInstructionId()) - .setTransformId(TIMER_LOCATION.getTransformId()) - .setTimerFamilyId(TIMER_LOCATION.getTimerFamilyId()) - .setIsLast(true)) - .build(); - - @Test - public void testOutboundObserver() { - final Collection<BeamFnApi.Elements> values = new ArrayList<>(); - BeamFnDataGrpcMultiplexer multiplexer = - new BeamFnDataGrpcMultiplexer( - DESCRIPTOR, - OutboundObserverFactory.clientDirect(), - inboundObserver -> TestStreams.withOnNext(values::add).build()); - multiplexer.getOutboundObserver().onNext(ELEMENTS); - assertThat(values, contains(ELEMENTS)); - } - - @Test - public void testInboundObserverBlocksTillConsumerConnects() throws Exception { - final Collection<BeamFnApi.Elements> outboundValues = new ArrayList<>(); - final Collection<KV<ByteString, Boolean>> dataInboundValues = new ArrayList<>(); - final Collection<KV<ByteString, Boolean>> timerInboundValues = new ArrayList<>(); - final BeamFnDataGrpcMultiplexer multiplexer = - new BeamFnDataGrpcMultiplexer( - DESCRIPTOR, - OutboundObserverFactory.clientDirect(), - inboundObserver -> TestStreams.withOnNext(outboundValues::add).build()); - ExecutorService executorService = Executors.newCachedThreadPool(); - executorService - .submit( - () -> { - // Purposefully sleep to simulate a delay in a consumer connecting. - Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); - multiplexer.registerConsumer( - DATA_LOCATION, - (payload, isLast) -> dataInboundValues.add(KV.of(payload, isLast))); - multiplexer.registerConsumer( - TIMER_LOCATION, - (payload, isLast) -> timerInboundValues.add(KV.of(payload, isLast))); - }) - .get(); - multiplexer.getInboundObserver().onNext(ELEMENTS); - assertTrue(multiplexer.hasConsumer(DATA_LOCATION)); - assertTrue(multiplexer.hasConsumer(TIMER_LOCATION)); - // Ensure that when we see a terminal Elements object, we remove the consumer - multiplexer.getInboundObserver().onNext(TERMINAL_ELEMENTS); - assertFalse(multiplexer.hasConsumer(DATA_LOCATION)); - assertFalse(multiplexer.hasConsumer(TIMER_LOCATION)); - - // Assert that normal and terminal Elements are passed to the consumer - assertThat( - dataInboundValues, - contains(KV.of(ELEMENTS.getData(0).getData(), false), KV.of(ByteString.EMPTY, true))); - assertThat( - timerInboundValues, - contains(KV.of(ELEMENTS.getTimers(0).getTimers(), false), KV.of(ByteString.EMPTY, true))); - } -} diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java deleted file mode 100644 index cf6747c9754..00000000000 --- a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/BeamFnDataInboundObserverTest.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.fn.data; - -import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.instanceOf; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.concurrent.ExecutionException; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.util.ByteStringOutputStream; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Tests for {@link BeamFnDataInboundObserver}. */ -@RunWith(JUnit4.class) -public class BeamFnDataInboundObserverTest { - private static final Coder<WindowedValue<String>> CODER = - WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE); - private static final LogicalEndpoint DATA_ENDPOINT = LogicalEndpoint.data("777L", "999L"); - - @Rule public ExpectedException thrown = ExpectedException.none(); - - @Test - public void testDecodingElements() throws Exception { - Collection<WindowedValue<String>> values = new ArrayList<>(); - InboundDataClient readFuture = CompletableFutureInboundDataClient.create(); - BeamFnDataInboundObserver observer = - new BeamFnDataInboundObserver( - DATA_ENDPOINT, DecodingFnDataReceiver.create(CODER, values::add), readFuture); - - // Test decoding multiple messages - observer.accept(dataWith("ABC", "DEF", "GHI"), false); - assertThat( - values, - contains( - valueInGlobalWindow("ABC"), valueInGlobalWindow("DEF"), valueInGlobalWindow("GHI"))); - values.clear(); - - // Test empty message signaling end of stream - assertFalse(readFuture.isDone()); - observer.accept(ByteString.EMPTY, true); - assertTrue(readFuture.isDone()); - - // Test messages after stream is finished are discarded - observer.accept(dataWith("ABC", "DEF", "GHI"), false); - assertThat(values, empty()); - } - - @Test - public void testConsumptionFailureCompletesReadFutureAndDiscardsMessages() throws Exception { - InboundDataClient readClient = CompletableFutureInboundDataClient.create(); - BeamFnDataInboundObserver observer = - new BeamFnDataInboundObserver( - DATA_ENDPOINT, DecodingFnDataReceiver.create(CODER, this::throwOnDefValue), readClient); - - assertFalse(readClient.isDone()); - observer.accept(dataWith("ABC", "DEF", "GHI"), false); - assertTrue(readClient.isDone()); - - thrown.expect(ExecutionException.class); - thrown.expectCause(instanceOf(RuntimeException.class)); - thrown.expectMessage("Failure"); - readClient.awaitCompletion(); - } - - private void throwOnDefValue(WindowedValue<String> value) { - if ("DEF".equals(value.getValue())) { - throw new RuntimeException("Failure"); - } - } - - private ByteString dataWith(String... values) throws Exception { - ByteStringOutputStream output = new ByteStringOutputStream(); - for (String value : values) { - CODER.encode(valueInGlobalWindow(value), output); - } - return output.toByteString(); - } -} diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/CompletableFutureInboundDataClientTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/CompletableFutureInboundDataClientTest.java deleted file mode 100644 index b11dc5d78db..00000000000 --- a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/data/CompletableFutureInboundDataClientTest.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.fn.data; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.isA; -import static org.junit.Assert.fail; - -import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Tests for {@link CompletableFutureInboundDataClient}. */ -@RunWith(JUnit4.class) -public class CompletableFutureInboundDataClientTest { - @Rule public ExpectedException thrown = ExpectedException.none(); - - @Test - public void testComplete() throws Exception { - InboundDataClient client = CompletableFutureInboundDataClient.create(); - - assertThat(client.isDone(), is(false)); - - client.complete(); - - assertThat(client.isDone(), is(true)); - // Should return immediately - client.awaitCompletion(); - } - - @Test - public void testCanceled() throws Exception { - InboundDataClient client = CompletableFutureInboundDataClient.create(); - - assertThat(client.isDone(), is(false)); - - client.cancel(); - - assertThat(client.isDone(), is(true)); - - thrown.expect(CancellationException.class); - // Should return immediately - client.awaitCompletion(); - } - - @Test - public void testFailed() throws Exception { - InboundDataClient client = CompletableFutureInboundDataClient.create(); - - assertThat(client.isDone(), is(false)); - - client.fail(new UnsupportedOperationException("message")); - - assertThat(client.isDone(), is(true)); - - thrown.expect(ExecutionException.class); - thrown.expectCause(isA(UnsupportedOperationException.class)); - thrown.expectMessage("message"); - client.awaitCompletion(); - } - - @Test - public void testCompleteMultithreaded() throws Exception { - InboundDataClient client = CompletableFutureInboundDataClient.create(); - Future<Void> waitingFuture = - Executors.newSingleThreadExecutor() - .submit( - () -> { - client.awaitCompletion(); - return null; - }); - - try { - waitingFuture.get(50, TimeUnit.MILLISECONDS); - fail(); - } catch (TimeoutException expected) { - // This should time out, as the client should never complete without external completion - } - - client.complete(); - // Blocks forever if the thread does not continue - waitingFuture.get(); - } - - @Test - public void testCompleteBackingFuture() throws Exception { - CompletableFuture<Object> future = new CompletableFuture<>(); - InboundDataClient client = CompletableFutureInboundDataClient.forBackingFuture(future); - - assertThat(future.isDone(), is(false)); - assertThat(client.isDone(), is(false)); - - client.complete(); - - assertThat(future.isDone(), is(true)); - assertThat(client.isDone(), is(true)); - // Should return immediately - client.awaitCompletion(); - } - - @Test - public void testCancelBackingFuture() throws Exception { - CompletableFuture<Object> future = new CompletableFuture<>(); - InboundDataClient client = CompletableFutureInboundDataClient.forBackingFuture(future); - - assertThat(future.isDone(), is(false)); - assertThat(client.isDone(), is(false)); - - client.cancel(); - - assertThat(future.isDone(), is(true)); - assertThat(client.isDone(), is(true)); - assertThat(future.isCancelled(), is(true)); - thrown.expect(CancellationException.class); - // Should return immediately - future.get(); - } - - @Test - public void testFailBackingFuture() throws Exception { - CompletableFuture<Object> future = new CompletableFuture<>(); - InboundDataClient client = CompletableFutureInboundDataClient.forBackingFuture(future); - - assertThat(future.isDone(), is(false)); - assertThat(client.isDone(), is(false)); - - client.fail(new UnsupportedOperationException("message")); - - assertThat(client.isDone(), is(true)); - assertThat(future.isDone(), is(true)); - assertThat(future.isCompletedExceptionally(), is(true)); - - thrown.expect(ExecutionException.class); - thrown.expectCause(isA(UnsupportedOperationException.class)); - thrown.expectMessage("message"); - client.awaitCompletion(); - } -} diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java index 5ef1ed836e6..75f3a24301c 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/BeamFnDataClient.java @@ -33,12 +33,7 @@ import org.apache.beam.sdk.fn.data.FnDataReceiver; */ public interface BeamFnDataClient { /** - * Registers the following inbound receiver for the provided instruction id. - * - * <p>The provided coder is used to decode inbound elements. The decoded elements are passed to - * the provided receiver. Any failure during decoding or processing of the element will complete - * the returned future exceptionally. On successful termination of the stream, the returned future - * is completed successfully. + * Registers a receiver for the provided instruction id. * * <p>The receiver is not required to be thread safe. * @@ -69,7 +64,7 @@ public interface BeamFnDataClient { * timers over the data plane. It is important that {@link * BeamFnDataOutboundAggregator#sendOrCollectBufferedDataAndFinishOutboundStreams()} is called on * the returned BeamFnDataOutboundAggregator at the end of each bundle. If - * collectElementsIfNoFlushes is set to true, {@link * + * collectElementsIfNoFlushes is set to true, {@link * BeamFnDataOutboundAggregator#sendOrCollectBufferedDataAndFinishOutboundStreams()} returns the * buffered elements instead of sending it through the outbound StreamObserver if there's no * previous flush.