[
https://issues.apache.org/jira/browse/BEAM-4658?focusedWorklogId=127769&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-127769
]
ASF GitHub Bot logged work on BEAM-4658:
----------------------------------------
Author: ASF GitHub Bot
Created on: 26/Jul/18 14:45
Start Date: 26/Jul/18 14:45
Worklog Time Spent: 10m
Work Description: lukecwik closed pull request #6002: [BEAM-4658] Modify
RemoteBundle interface to allow for multiple inputs.
URL: https://github.com/apache/beam/pull/6002
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.java
index ca20949c953..9a9121b92a4 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.direct.portable;
+import com.google.common.collect.Iterables;
import java.util.ArrayList;
import java.util.Collection;
import javax.annotation.Nullable;
@@ -29,6 +30,7 @@
import org.apache.beam.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.util.WindowedValue;
/**
@@ -59,7 +61,8 @@ public void cleanup() throws Exception {
private class RemoteStageEvaluator<T> implements TransformEvaluator<T> {
private final PTransformNode transform;
- private final RemoteBundle<T> bundle;
+ private final RemoteBundle bundle;
+ private final FnDataReceiver<WindowedValue<?>> mainInput;
private final Collection<UncommittedBundle<?>> outputs;
private RemoteStageEvaluator(PTransformNode transform) throws Exception {
@@ -67,19 +70,20 @@ private RemoteStageEvaluator(PTransformNode transform)
throws Exception {
ExecutableStage stage =
ExecutableStage.fromPayload(
ExecutableStagePayload.parseFrom(transform.getTransform().getSpec().getPayload()));
- outputs = new ArrayList<>();
+ this.outputs = new ArrayList<>();
StageBundleFactory<T> stageFactory = jobFactory.forStage(stage);
- bundle =
+ this.bundle =
stageFactory.getBundle(
BundleFactoryOutputReceiverFactory.create(
bundleFactory, stage.getComponents(), outputs::add),
StateRequestHandler.unsupported(),
BundleProgressHandler.unsupported());
+ this.mainInput =
Iterables.getOnlyElement(bundle.getInputReceivers().values());
}
@Override
public void processElement(WindowedValue<T> element) throws Exception {
- bundle.getInputReceiver().accept(element);
+ mainInput.accept(element);
}
@Override
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/SplittableRemoteStageEvaluatorFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/SplittableRemoteStageEvaluatorFactory.java
index d1aaf174a0b..8119c40d3cb 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/SplittableRemoteStageEvaluatorFactory.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/SplittableRemoteStageEvaluatorFactory.java
@@ -36,6 +36,7 @@
import org.apache.beam.runners.fnexecution.wire.WireCoders;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
@@ -89,7 +90,8 @@ public void cleanup() throws Exception {
private final CopyOnAccessInMemoryStateInternals<byte[]> stateInternals;
private final DirectTimerInternals timerInternals;
- private final RemoteBundle<KV<InputT, RestrictionT>> bundle;
+ private final RemoteBundle bundle;
+ private final FnDataReceiver<WindowedValue<?>> mainInput;
private final Collection<UncommittedBundle<?>> outputs;
private final SDFFeederViaStateAndTimers<InputT, RestrictionT> feeder;
@@ -144,6 +146,7 @@ public void onCompleted(ProcessBundleResponse response) {
}
}
});
+ this.mainInput =
Iterables.getOnlyElement(bundle.getInputReceivers().values());
}
@Override
@@ -158,7 +161,7 @@ public void processElement(
} else {
elementRestriction =
feeder.resume(Iterables.getOnlyElement(kwi.timersIterable()));
}
- bundle.getInputReceiver().accept(elementRestriction);
+ mainInput.accept(elementRestriction);
}
@Override
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
index daf5ab4beaf..6de971413e7 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
@@ -20,6 +20,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
+import com.google.common.collect.Iterables;
import java.util.Map;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.model.pipeline.v1.RunnerApi;
@@ -109,10 +110,11 @@ public void mapPartition(
checkState(
stateRequestHandler != null, "%s not yet prepared",
StateRequestHandler.class.getName());
- try (RemoteBundle<InputT> bundle =
+ try (RemoteBundle bundle =
stageBundleFactory.getBundle(
new ReceiverFactory(collector, outputMap), stateRequestHandler,
progressHandler)) {
- FnDataReceiver<WindowedValue<InputT>> receiver =
bundle.getInputReceiver();
+ FnDataReceiver<WindowedValue<?>> receiver =
+ Iterables.getOnlyElement(bundle.getInputReceivers().values());
for (WindowedValue<InputT> input : iterable) {
receiver.accept(input);
}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index cc0ed8849a5..c1fa7bb28ab 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -19,6 +19,7 @@
import static org.apache.flink.util.Preconditions.checkState;
+import com.google.common.collect.Iterables;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -132,11 +133,11 @@ private void
processElementWithSdkHarness(WindowedValue<InputT> element) throws
checkState(
stateRequestHandler != null, "%s not yet prepared",
StateRequestHandler.class.getName());
- try (RemoteBundle<InputT> bundle =
+ try (RemoteBundle bundle =
stageBundleFactory.getBundle(
new ReceiverFactory(outputManager, outputMap),
stateRequestHandler, progressHandler)) {
logger.debug(String.format("Sending value: %s", element));
- bundle.getInputReceiver().accept(element);
+
Iterables.getOnlyElement(bundle.getInputReceivers().values()).accept(element);
}
}
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
index 234684a10a9..c0b86f9fe19 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
@@ -118,12 +118,12 @@ public void sdkErrorsSurfaceOnClose() throws Exception {
testHarness.open();
@SuppressWarnings("unchecked")
- RemoteBundle<Integer> bundle = Mockito.mock(RemoteBundle.class);
+ RemoteBundle bundle = Mockito.mock(RemoteBundle.class);
when(stageBundleFactory.getBundle(any(), any(), any())).thenReturn(bundle);
@SuppressWarnings("unchecked")
- FnDataReceiver<WindowedValue<Integer>> receiver =
Mockito.mock(FnDataReceiver.class);
- when(bundle.getInputReceiver()).thenReturn(receiver);
+ FnDataReceiver<WindowedValue<?>> receiver =
Mockito.mock(FnDataReceiver.class);
+
when(bundle.getInputReceivers()).thenReturn(ImmutableMap.of("pCollectionId",
receiver));
Exception expected = new Exception();
doThrow(expected).when(bundle).close();
@@ -141,12 +141,12 @@ public void expectedInputsAreSent() throws Exception {
getOperator(mainOutput, Collections.emptyList(), outputManagerFactory);
@SuppressWarnings("unchecked")
- RemoteBundle<Integer> bundle = Mockito.mock(RemoteBundle.class);
+ RemoteBundle bundle = Mockito.mock(RemoteBundle.class);
when(stageBundleFactory.getBundle(any(), any(), any())).thenReturn(bundle);
@SuppressWarnings("unchecked")
- FnDataReceiver<WindowedValue<Integer>> receiver =
Mockito.mock(FnDataReceiver.class);
- when(bundle.getInputReceiver()).thenReturn(receiver);
+ FnDataReceiver<WindowedValue<?>> receiver =
Mockito.mock(FnDataReceiver.class);
+
when(bundle.getInputReceivers()).thenReturn(ImmutableMap.of("pCollectionId",
receiver));
WindowedValue<Integer> one = WindowedValue.valueInGlobalWindow(1);
WindowedValue<Integer> two = WindowedValue.valueInGlobalWindow(2);
@@ -209,21 +209,23 @@ public void outputsAreTaggedCorrectly() throws Exception {
StageBundleFactory<Void> stageBundleFactory =
new StageBundleFactory<Void>() {
@Override
- public RemoteBundle<Void> getBundle(
+ public RemoteBundle getBundle(
OutputReceiverFactory receiverFactory,
StateRequestHandler stateRequestHandler,
BundleProgressHandler progressHandler) {
- return new RemoteBundle<Void>() {
+ return new RemoteBundle() {
@Override
public String getId() {
return "bundle-id";
}
@Override
- public FnDataReceiver<WindowedValue<Void>> getInputReceiver() {
- return input -> {
- /* Ignore input*/
- };
+ public Map<String, FnDataReceiver<WindowedValue<?>>>
getInputReceivers() {
+ return ImmutableMap.of(
+ "pCollectionId",
+ input -> {
+ /* Ignore input*/
+ });
}
@Override
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java
index 545789f28a7..e29732e3e48 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunctionTest.java
@@ -95,12 +95,12 @@ public void sdkErrorsSurfaceOnClose() throws Exception {
function.open(new Configuration());
@SuppressWarnings("unchecked")
- RemoteBundle<Integer> bundle = Mockito.mock(RemoteBundle.class);
+ RemoteBundle bundle = Mockito.mock(RemoteBundle.class);
when(stageBundleFactory.getBundle(any(), any(), any())).thenReturn(bundle);
@SuppressWarnings("unchecked")
- FnDataReceiver<WindowedValue<Integer>> receiver =
Mockito.mock(FnDataReceiver.class);
- when(bundle.getInputReceiver()).thenReturn(receiver);
+ FnDataReceiver<WindowedValue<?>> receiver =
Mockito.mock(FnDataReceiver.class);
+
when(bundle.getInputReceivers()).thenReturn(ImmutableMap.of("pCollectionId",
receiver));
Exception expected = new Exception();
doThrow(expected).when(bundle).close();
@@ -124,12 +124,12 @@ public void expectedInputsAreSent() throws Exception {
function.open(new Configuration());
@SuppressWarnings("unchecked")
- RemoteBundle<Integer> bundle = Mockito.mock(RemoteBundle.class);
+ RemoteBundle bundle = Mockito.mock(RemoteBundle.class);
when(stageBundleFactory.getBundle(any(), any(), any())).thenReturn(bundle);
@SuppressWarnings("unchecked")
- FnDataReceiver<WindowedValue<Integer>> receiver =
Mockito.mock(FnDataReceiver.class);
- when(bundle.getInputReceiver()).thenReturn(receiver);
+ FnDataReceiver<WindowedValue<?>> receiver =
Mockito.mock(FnDataReceiver.class);
+
when(bundle.getInputReceivers()).thenReturn(ImmutableMap.of("pCollectionId",
receiver));
WindowedValue<Integer> one = WindowedValue.valueInGlobalWindow(1);
WindowedValue<Integer> two = WindowedValue.valueInGlobalWindow(2);
@@ -157,21 +157,23 @@ public void outputsAreTaggedCorrectly() throws Exception {
StageBundleFactory<Integer> stageBundleFactory =
new StageBundleFactory<Integer>() {
@Override
- public RemoteBundle<Integer> getBundle(
+ public RemoteBundle getBundle(
OutputReceiverFactory receiverFactory,
StateRequestHandler stateRequestHandler,
BundleProgressHandler progressHandler) {
- return new RemoteBundle<Integer>() {
+ return new RemoteBundle() {
@Override
public String getId() {
return "bundle-id";
}
@Override
- public FnDataReceiver<WindowedValue<Integer>> getInputReceiver()
{
- return input -> {
- /* Ignore input*/
- };
+ public Map<String, FnDataReceiver<WindowedValue<?>>>
getInputReceivers() {
+ return ImmutableMap.of(
+ "pCollectionId",
+ input -> {
+ /* Ignore input*/
+ });
}
@Override
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
index 95735d76c28..dee5f6f5c0b 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
@@ -270,7 +270,7 @@ private static Platform getPlatform() {
}
@Override
- public RemoteBundle<InputT> getBundle(
+ public RemoteBundle getBundle(
OutputReceiverFactory outputReceiverFactory,
StateRequestHandler stateRequestHandler,
BundleProgressHandler progressHandler)
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/RemoteBundle.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/RemoteBundle.java
index d28fdfd6b02..ff3066a4470 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/RemoteBundle.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/RemoteBundle.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.fnexecution.control;
+import java.util.Map;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.util.WindowedValue;
@@ -29,15 +30,15 @@
* <p>When a RemoteBundle is closed, it will block until bundle processing is
finished on remote
* resources, and throw an exception if bundle processing has failed.
*/
-public interface RemoteBundle<InputT> extends AutoCloseable {
+public interface RemoteBundle extends AutoCloseable {
/** Get an id used to represent this bundle. */
String getId();
/**
- * Get a {@link FnDataReceiver receiver} which consumes input elements,
forwarding them to the
- * remote environment.
+ * Get a map of PCollection ids to {@link FnDataReceiver receiver}s which
consume input elements,
+ * forwarding them to the remote environment.
*/
- FnDataReceiver<WindowedValue<InputT>> getInputReceiver();
+ Map<String, FnDataReceiver<WindowedValue<?>>> getInputReceivers();
/**
* Closes this bundle. This causes the input {@link FnDataReceiver} to be
closed (future calls to
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
index 605a8ea3e94..7c5b0acd169 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
@@ -20,6 +20,8 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletionStage;
@@ -90,13 +92,14 @@ private BundleProcessor(
* <p>NOTE: It is important to {@link #close()} each bundle after all
elements are emitted.
*
* <pre>{@code
- * try (ActiveBundle<InputT> bundle = SdkHarnessClient.newBundle(...)) {
- * FnDataReceiver<InputT> inputReceiver = bundle.getInputReceiver();
- * // send all elements ...
+ * try (ActiveBundle bundle = SdkHarnessClient.newBundle(...)) {
+ * FnDataReceiver<InputT> inputReceiver =
+ * (FnDataReceiver)
bundle.getInputReceivers().get(mainPCollectionId);
+ * // send all main input elements ...
* }
* }</pre>
*/
- public ActiveBundle<T> newBundle(
+ public ActiveBundle newBundle(
Map<BeamFnApi.Target, RemoteOutputReceiver<?>> outputReceivers,
BundleProgressHandler progressHandler) {
return newBundle(
@@ -119,13 +122,14 @@ private BundleProcessor(
* <p>NOTE: It is important to {@link #close()} each bundle after all
elements are emitted.
*
* <pre>{@code
- * try (ActiveBundle<InputT> bundle = SdkHarnessClient.newBundle(...)) {
- * FnDataReceiver<InputT> inputReceiver = bundle.getInputReceiver();
+ * try (ActiveBundle bundle = SdkHarnessClient.newBundle(...)) {
+ * FnDataReceiver<InputT> inputReceiver =
+ * (FnDataReceiver)
bundle.getInputReceivers().get(mainPCollectionId);
* // send all elements ...
* }
* }</pre>
*/
- public ActiveBundle<T> newBundle(
+ public ActiveBundle newBundle(
Map<BeamFnApi.Target, RemoteOutputReceiver<?>> outputReceivers,
StateRequestHandler stateRequestHandler,
BundleProgressHandler progressHandler) {
@@ -159,14 +163,25 @@ private BundleProcessor(
outputClients.put(targetReceiver.getKey(), outputClient);
}
- CloseableFnDataReceiver<WindowedValue<T>> dataReceiver =
- fnApiDataService.send(
- LogicalEndpoint.of(bundleId, remoteInput.getTarget()),
remoteInput.getCoder());
-
- return new ActiveBundle<>(
+ String bundleOutputPCollection =
+ Iterables.getOnlyElement(
+ processBundleDescriptor
+
.getTransformsOrThrow(remoteInput.getTarget().getPrimitiveTransformReference())
+ .getOutputsMap()
+ .values());
+ Map<String, CloseableFnDataReceiver<WindowedValue<?>>> dataReceivers =
+ ImmutableMap.of(
+ bundleOutputPCollection,
+ (CloseableFnDataReceiver<WindowedValue<?>>)
+ (CloseableFnDataReceiver)
+ fnApiDataService.send(
+ LogicalEndpoint.of(bundleId,
remoteInput.getTarget()),
+ remoteInput.getCoder()));
+
+ return new ActiveBundle(
bundleId,
specificResponse,
- dataReceiver,
+ dataReceivers,
outputClients,
stateDelegator.registerForProcessBundleInstructionId(bundleId,
stateRequestHandler),
progressHandler);
@@ -182,10 +197,10 @@ private BundleProcessor(
}
/** An active bundle for a particular {@link
BeamFnApi.ProcessBundleDescriptor}. */
- public static class ActiveBundle<InputT> implements RemoteBundle<InputT> {
+ public static class ActiveBundle implements RemoteBundle {
private final String bundleId;
private final CompletionStage<BeamFnApi.ProcessBundleResponse> response;
- private final CloseableFnDataReceiver<WindowedValue<InputT>> inputReceiver;
+ private final Map<String, CloseableFnDataReceiver<WindowedValue<?>>>
inputReceivers;
private final Map<BeamFnApi.Target, InboundDataClient> outputClients;
private final StateDelegator.Registration stateRegistration;
private final BundleProgressHandler progressHandler;
@@ -193,13 +208,13 @@ private BundleProcessor(
private ActiveBundle(
String bundleId,
CompletionStage<ProcessBundleResponse> response,
- CloseableFnDataReceiver<WindowedValue<InputT>> inputReceiver,
+ Map<String, CloseableFnDataReceiver<WindowedValue<?>>> inputReceivers,
Map<Target, InboundDataClient> outputClients,
Registration stateRegistration,
BundleProgressHandler progressHandler) {
this.bundleId = bundleId;
this.response = response;
- this.inputReceiver = inputReceiver;
+ this.inputReceivers = inputReceivers;
this.outputClients = outputClients;
this.stateRegistration = stateRegistration;
this.progressHandler = progressHandler;
@@ -212,19 +227,19 @@ public String getId() {
}
/**
- * Returns a {@link FnDataReceiver receiver} which consumes input elements
forwarding them to
- * the SDK.
+ * Get a map of PCollection ids to {@link FnDataReceiver receiver}s which
consume input
+ * elements, forwarding them to the remote environment.
*/
@Override
- public FnDataReceiver<WindowedValue<InputT>> getInputReceiver() {
- return inputReceiver;
+ public Map<String, FnDataReceiver<WindowedValue<?>>> getInputReceivers() {
+ return (Map) inputReceivers;
}
/**
* Blocks till bundle processing is finished. This is comprised of:
*
* <ul>
- * <li>closing the {@link #getInputReceiver() input receiver}.
+ * <li>closing each {@link #getInputReceivers() input receiver}.
* <li>waiting for the SDK to say that processing the bundle is finished.
* <li>waiting for all inbound data clients to complete
* </ul>
@@ -235,10 +250,16 @@ public String getId() {
@Override
public void close() throws Exception {
Exception exception = null;
- try {
- inputReceiver.close();
- } catch (Exception e) {
- exception = e;
+ for (CloseableFnDataReceiver<?> inputReceiver : inputReceivers.values())
{
+ try {
+ inputReceiver.close();
+ } catch (Exception e) {
+ if (exception == null) {
+ exception = e;
+ } else {
+ exception.addSuppressed(e);
+ }
+ }
}
try {
// We don't have to worry about the completion stage.
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
index 122bf3eb502..cb224d64746 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SingleEnvironmentInstanceJobBundleFactory.java
@@ -149,7 +149,7 @@ private BundleProcessorStageBundleFactory(
}
@Override
- public RemoteBundle<T> getBundle(
+ public RemoteBundle getBundle(
OutputReceiverFactory outputReceiverFactory,
StateRequestHandler stateRequestHandler,
BundleProgressHandler progressHandler) {
@@ -167,7 +167,7 @@ private BundleProcessorStageBundleFactory(
outputReceiverFactory.create(bundleOutputPCollection);
outputReceivers.put(
targetCoders.getKey(),
- RemoteOutputReceiver.of((Coder) targetCoders.getValue(),
outputReceiver));
+ RemoteOutputReceiver.of(targetCoders.getValue(), outputReceiver));
}
return processor.newBundle(outputReceivers, stateRequestHandler,
progressHandler);
}
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/StageBundleFactory.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/StageBundleFactory.java
index 8dae649c197..df4f4f200f5 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/StageBundleFactory.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/StageBundleFactory.java
@@ -30,7 +30,7 @@
*/
public interface StageBundleFactory<T> extends AutoCloseable {
/** Get a new {@link RemoteBundle bundle} for processing the data in an
executable stage. */
- RemoteBundle<T> getBundle(
+ RemoteBundle getBundle(
OutputReceiverFactory outputReceiverFactory,
StateRequestHandler stateRequestHandler,
BundleProgressHandler progressHandler)
diff --git
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index 13a533ae2cd..f4ffc03efed 100644
---
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -246,9 +246,10 @@ public void process(ProcessContext ctxt) {
}
// The impulse example
- try (ActiveBundle<byte[]> bundle =
+ try (ActiveBundle bundle =
processor.newBundle(outputReceivers,
BundleProgressHandler.unsupported())) {
- bundle.getInputReceiver().accept(WindowedValue.valueInGlobalWindow(new
byte[0]));
+ Iterables.getOnlyElement(bundle.getInputReceivers().values())
+ .accept(WindowedValue.valueInGlobalWindow(new byte[0]));
}
for (Collection<? super WindowedValue<?>> windowedValues :
outputValues.values()) {
@@ -361,15 +362,13 @@ public void processElement(ProcessContext context) {
});
BundleProgressHandler progressHandler =
BundleProgressHandler.unsupported();
- try (ActiveBundle<byte[]> bundle =
+ try (ActiveBundle bundle =
processor.newBundle(outputReceivers, stateRequestHandler,
progressHandler)) {
- bundle
- .getInputReceiver()
+ Iterables.getOnlyElement(bundle.getInputReceivers().values())
.accept(
WindowedValue.valueInGlobalWindow(
CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "X")));
- bundle
- .getInputReceiver()
+ Iterables.getOnlyElement(bundle.getInputReceivers().values())
.accept(
WindowedValue.valueInGlobalWindow(
CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "Y")));
@@ -515,10 +514,11 @@ public void clear(K key, W window) {
}
});
- try (ActiveBundle<KV<byte[], byte[]>> bundle =
+ try (ActiveBundle bundle =
processor.newBundle(
outputReceivers, stateRequestHandler,
BundleProgressHandler.unsupported())) {
-
bundle.getInputReceiver().accept(WindowedValue.valueInGlobalWindow(kvBytes("X",
"Y")));
+ Iterables.getOnlyElement(bundle.getInputReceivers().values())
+ .accept(WindowedValue.valueInGlobalWindow(kvBytes("X", "Y")));
}
for (Collection<WindowedValue<?>> windowedValues : outputValues.values()) {
assertThat(
diff --git
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
index 1c20d7428f8..3be8760b840 100644
---
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
+++
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
@@ -33,7 +33,7 @@
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableMap;
-import java.io.IOException;
+import com.google.common.collect.Iterables;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -44,8 +44,6 @@
import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleResponse;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.RemoteGrpcPort;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
-import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
@@ -97,11 +95,70 @@
@Rule public ExpectedException thrown = ExpectedException.none();
private SdkHarnessClient sdkHarnessClient;
+ private ProcessBundleDescriptor descriptor;
+ private BeamFnApi.Target sdkGrpcReadTarget;
+ private BeamFnApi.Target sdkGrpcWriteTarget;
@Before
- public void setup() {
+ public void setup() throws Exception {
MockitoAnnotations.initMocks(this);
sdkHarnessClient = SdkHarnessClient.usingFnApiClient(fnApiControlClient,
dataService);
+
+ Pipeline userPipeline = Pipeline.create();
+ TupleTag<String> outputTag = new TupleTag<>();
+ userPipeline
+ .apply("create", Create.of("foo"))
+ .apply("proc", ParDo.of(new TestFn()).withOutputTags(outputTag,
TupleTagList.empty()));
+ RunnerApi.Pipeline userProto = PipelineTranslation.toProto(userPipeline);
+
+ ProcessBundleDescriptor.Builder pbdBuilder =
+ ProcessBundleDescriptor.newBuilder()
+ .setId("my_id")
+ .putAllEnvironments(userProto.getComponents().getEnvironmentsMap())
+
.putAllWindowingStrategies(userProto.getComponents().getWindowingStrategiesMap())
+ .putAllCoders(userProto.getComponents().getCodersMap());
+ RunnerApi.Coder fullValueCoder =
+
CoderTranslation.toProto(WindowedValue.getFullCoder(StringUtf8Coder.of(),
Coder.INSTANCE))
+ .getCoder();
+ pbdBuilder.putCoders("wire_coder", fullValueCoder);
+
+ PTransform targetProcessor =
userProto.getComponents().getTransformsOrThrow("proc");
+ RemoteGrpcPort port =
+ RemoteGrpcPort.newBuilder()
+ .setApiServiceDescriptor(harness.dataEndpoint())
+ .setCoderId("wire_coder")
+ .build();
+ RemoteGrpcPortRead readNode =
+ RemoteGrpcPortRead.readFromPort(
+ port, getOnlyElement(targetProcessor.getInputsMap().values()));
+ RemoteGrpcPortWrite writeNode =
+ RemoteGrpcPortWrite.writeToPort(
+ getOnlyElement(targetProcessor.getOutputsMap().values()), port);
+ // TODO: Ensure cross-env (Runner <-> SDK GRPC Read/Write Node) coders are
length-prefixed
+ for (String pc : targetProcessor.getInputsMap().values()) {
+ pbdBuilder.putPcollections(pc,
userProto.getComponents().getPcollectionsOrThrow(pc));
+ }
+ for (String pc : targetProcessor.getOutputsMap().values()) {
+ pbdBuilder.putPcollections(pc,
userProto.getComponents().getPcollectionsOrThrow(pc));
+ }
+ pbdBuilder
+ .putTransforms("proc", targetProcessor)
+ .putTransforms("read", readNode.toPTransform())
+ .putTransforms("write", writeNode.toPTransform());
+ descriptor = pbdBuilder.build();
+
+ sdkGrpcReadTarget =
+ BeamFnApi.Target.newBuilder()
+ .setName(
+
getOnlyElement(descriptor.getTransformsOrThrow("read").getOutputsMap().keySet()))
+ .setPrimitiveTransformReference("read")
+ .build();
+ sdkGrpcWriteTarget =
+ BeamFnApi.Target.newBuilder()
+ .setName(
+
getOnlyElement(descriptor.getTransformsOrThrow("write").getInputsMap().keySet()))
+ .setPrimitiveTransformReference("write")
+ .build();
}
@Test
@@ -118,7 +175,7 @@ public void testRegisterCachesBundleProcessors() throws
Exception {
RemoteInputDestination<WindowedValue<Integer>> remoteInputs =
RemoteInputDestination.of(
FullWindowedValueCoder.of(VarIntCoder.of(),
GlobalWindow.Coder.INSTANCE),
- Target.getDefaultInstance());
+ sdkGrpcReadTarget);
BundleProcessor<?> processor1 = sdkHarnessClient.getProcessor(descriptor1,
remoteInputs);
BundleProcessor<?> processor2 = sdkHarnessClient.getProcessor(descriptor2,
remoteInputs);
@@ -144,7 +201,7 @@ public void testRegisterWithStateRequiresStateDelegator()
throws Exception {
RemoteInputDestination<WindowedValue<Integer>> remoteInputs =
RemoteInputDestination.of(
FullWindowedValueCoder.of(VarIntCoder.of(),
GlobalWindow.Coder.INSTANCE),
- Target.getDefaultInstance());
+ sdkGrpcReadTarget);
thrown.expect(IllegalStateException.class);
thrown.expectMessage("containing a state");
@@ -153,10 +210,6 @@ public void testRegisterWithStateRequiresStateDelegator()
throws Exception {
@Test
public void testNewBundleNoDataDoesNotCrash() throws Exception {
- String descriptorId1 = "descriptor1";
-
- ProcessBundleDescriptor descriptor =
- ProcessBundleDescriptor.newBuilder().setId(descriptorId1).build();
CompletableFuture<InstructionResponse> processBundleResponseFuture = new
CompletableFuture<>();
when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class)))
.thenReturn(new CompletableFuture<>())
@@ -166,10 +219,10 @@ public void testNewBundleNoDataDoesNotCrash() throws
Exception {
FullWindowedValueCoder.of(StringUtf8Coder.of(), Coder.INSTANCE);
BundleProcessor<String> processor =
sdkHarnessClient.getProcessor(
- descriptor, RemoteInputDestination.of(coder,
Target.getDefaultInstance()));
+ descriptor, RemoteInputDestination.of(coder, sdkGrpcReadTarget));
when(dataService.send(any(),
eq(coder))).thenReturn(mock(CloseableFnDataReceiver.class));
- try (ActiveBundle<String> activeBundle =
+ try (ActiveBundle activeBundle =
processor.newBundle(Collections.emptyMap(),
BundleProgressHandler.unsupported())) {
// Correlating the ProcessBundleRequest and ProcessBundleResponse is
owned by the underlying
// FnApiControlClient. The SdkHarnessClient owns just wrapping the
request and unwrapping
@@ -185,34 +238,16 @@ public void testNewBundleNoDataDoesNotCrash() throws
Exception {
@Test
public void testNewBundleAndProcessElements() throws Exception {
- ProcessBundleDescriptor processBundleDescriptor =
- getProcessBundleDescriptor(harness.dataEndpoint());
-
- BeamFnApi.Target sdkGrpcReadTarget =
- BeamFnApi.Target.newBuilder()
- .setName(
- getOnlyElement(
-
processBundleDescriptor.getTransformsOrThrow("read").getOutputsMap().keySet()))
- .setPrimitiveTransformReference("read")
- .build();
- BeamFnApi.Target sdkGrpcWriteTarget =
- BeamFnApi.Target.newBuilder()
- .setName(
- getOnlyElement(
-
processBundleDescriptor.getTransformsOrThrow("write").getInputsMap().keySet()))
- .setPrimitiveTransformReference("write")
- .build();
-
SdkHarnessClient client = harness.client();
BundleProcessor<String> processor =
client.getProcessor(
- processBundleDescriptor,
+ descriptor,
RemoteInputDestination.of(
FullWindowedValueCoder.of(StringUtf8Coder.of(),
Coder.INSTANCE),
sdkGrpcReadTarget));
Collection<WindowedValue<String>> outputs = new ArrayList<>();
- try (ActiveBundle<String> activeBundle =
+ try (ActiveBundle activeBundle =
processor.newBundle(
Collections.singletonMap(
sdkGrpcWriteTarget,
@@ -221,7 +256,8 @@ public void testNewBundleAndProcessElements() throws
Exception {
LengthPrefixCoder.of(StringUtf8Coder.of()),
Coder.INSTANCE),
outputs::add)),
BundleProgressHandler.unsupported())) {
- FnDataReceiver<WindowedValue<String>> bundleInputReceiver =
activeBundle.getInputReceiver();
+ FnDataReceiver<WindowedValue<?>> bundleInputReceiver =
+ Iterables.getOnlyElement(activeBundle.getInputReceivers().values());
bundleInputReceiver.accept(WindowedValue.valueInGlobalWindow("foo"));
bundleInputReceiver.accept(WindowedValue.valueInGlobalWindow("bar"));
bundleInputReceiver.accept(WindowedValue.valueInGlobalWindow("baz"));
@@ -238,14 +274,11 @@ public void testNewBundleAndProcessElements() throws
Exception {
@Test
public void handleCleanupWhenInputSenderFails() throws Exception {
- String descriptorId1 = "descriptor1";
Exception testException = new Exception();
InboundDataClient mockOutputReceiver = mock(InboundDataClient.class);
CloseableFnDataReceiver mockInputSender =
mock(CloseableFnDataReceiver.class);
- ProcessBundleDescriptor descriptor =
- ProcessBundleDescriptor.newBuilder().setId(descriptorId1).build();
CompletableFuture<InstructionResponse> processBundleResponseFuture = new
CompletableFuture<>();
when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class)))
.thenReturn(new CompletableFuture<>())
@@ -255,7 +288,7 @@ public void handleCleanupWhenInputSenderFails() throws
Exception {
FullWindowedValueCoder.of(StringUtf8Coder.of(), Coder.INSTANCE);
BundleProcessor<String> processor =
sdkHarnessClient.getProcessor(
- descriptor, RemoteInputDestination.of(coder,
Target.getDefaultInstance()));
+ descriptor, RemoteInputDestination.of(coder, sdkGrpcReadTarget));
when(dataService.receive(any(), any(),
any())).thenReturn(mockOutputReceiver);
when(dataService.send(any(), eq(coder))).thenReturn(mockInputSender);
@@ -265,10 +298,9 @@ public void handleCleanupWhenInputSenderFails() throws
Exception {
BundleProgressHandler mockProgressHandler =
mock(BundleProgressHandler.class);
try {
- try (ActiveBundle<String> activeBundle =
+ try (ActiveBundle activeBundle =
processor.newBundle(
- ImmutableMap.of(Target.getDefaultInstance(),
mockRemoteOutputReceiver),
- mockProgressHandler)) {
+ ImmutableMap.of(sdkGrpcWriteTarget, mockRemoteOutputReceiver),
mockProgressHandler)) {
// We shouldn't be required to complete the process bundle response
future.
}
fail("Exception expected");
@@ -282,7 +314,6 @@ public void handleCleanupWhenInputSenderFails() throws
Exception {
@Test
public void handleCleanupWithStateWhenInputSenderFails() throws Exception {
- String descriptorId1 = "descriptor1";
Exception testException = new Exception();
InboundDataClient mockOutputReceiver = mock(InboundDataClient.class);
@@ -295,8 +326,6 @@ public void handleCleanupWithStateWhenInputSenderFails()
throws Exception {
StateRequestHandler mockStateHandler = mock(StateRequestHandler.class);
BundleProgressHandler mockProgressHandler =
mock(BundleProgressHandler.class);
- ProcessBundleDescriptor descriptor =
- ProcessBundleDescriptor.newBuilder().setId(descriptorId1).build();
CompletableFuture<InstructionResponse> processBundleResponseFuture = new
CompletableFuture<>();
when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class)))
.thenReturn(new CompletableFuture<>())
@@ -306,9 +335,7 @@ public void handleCleanupWithStateWhenInputSenderFails()
throws Exception {
FullWindowedValueCoder.of(StringUtf8Coder.of(), Coder.INSTANCE);
BundleProcessor<String> processor =
sdkHarnessClient.getProcessor(
- descriptor,
- RemoteInputDestination.of(coder, Target.getDefaultInstance()),
- mockStateDelegator);
+ descriptor, RemoteInputDestination.of(coder, sdkGrpcReadTarget),
mockStateDelegator);
when(dataService.receive(any(), any(),
any())).thenReturn(mockOutputReceiver);
when(dataService.send(any(), eq(coder))).thenReturn(mockInputSender);
@@ -317,9 +344,9 @@ public void handleCleanupWithStateWhenInputSenderFails()
throws Exception {
RemoteOutputReceiver mockRemoteOutputReceiver =
mock(RemoteOutputReceiver.class);
try {
- try (ActiveBundle<String> activeBundle =
+ try (ActiveBundle activeBundle =
processor.newBundle(
- ImmutableMap.of(Target.getDefaultInstance(),
mockRemoteOutputReceiver),
+ ImmutableMap.of(sdkGrpcWriteTarget, mockRemoteOutputReceiver),
mockStateHandler,
mockProgressHandler)) {
// We shouldn't be required to complete the process bundle response
future.
@@ -341,8 +368,6 @@ public void handleCleanupWhenProcessingBundleFails() throws
Exception {
InboundDataClient mockOutputReceiver = mock(InboundDataClient.class);
CloseableFnDataReceiver mockInputSender =
mock(CloseableFnDataReceiver.class);
- ProcessBundleDescriptor descriptor =
- ProcessBundleDescriptor.newBuilder().setId("descriptor1").build();
CompletableFuture<InstructionResponse> processBundleResponseFuture = new
CompletableFuture<>();
when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class)))
.thenReturn(new CompletableFuture<>())
@@ -352,7 +377,7 @@ public void handleCleanupWhenProcessingBundleFails() throws
Exception {
FullWindowedValueCoder.of(StringUtf8Coder.of(), Coder.INSTANCE);
BundleProcessor<String> processor =
sdkHarnessClient.getProcessor(
- descriptor, RemoteInputDestination.of(coder,
Target.getDefaultInstance()));
+ descriptor, RemoteInputDestination.of(coder, sdkGrpcReadTarget));
when(dataService.receive(any(), any(),
any())).thenReturn(mockOutputReceiver);
when(dataService.send(any(), eq(coder))).thenReturn(mockInputSender);
@@ -360,10 +385,9 @@ public void handleCleanupWhenProcessingBundleFails()
throws Exception {
BundleProgressHandler mockProgressHandler =
mock(BundleProgressHandler.class);
try {
- try (ActiveBundle<String> activeBundle =
+ try (ActiveBundle activeBundle =
processor.newBundle(
- ImmutableMap.of(Target.getDefaultInstance(),
mockRemoteOutputReceiver),
- mockProgressHandler)) {
+ ImmutableMap.of(sdkGrpcWriteTarget, mockRemoteOutputReceiver),
mockProgressHandler)) {
processBundleResponseFuture.completeExceptionally(testException);
}
fail("Exception expected");
@@ -377,7 +401,6 @@ public void handleCleanupWhenProcessingBundleFails() throws
Exception {
@Test
public void handleCleanupWithStateWhenProcessingBundleFails() throws
Exception {
- String descriptorId1 = "descriptor1";
Exception testException = new Exception();
InboundDataClient mockOutputReceiver = mock(InboundDataClient.class);
@@ -389,8 +412,6 @@ public void
handleCleanupWithStateWhenProcessingBundleFails() throws Exception {
StateRequestHandler mockStateHandler = mock(StateRequestHandler.class);
BundleProgressHandler mockProgressHandler =
mock(BundleProgressHandler.class);
- ProcessBundleDescriptor descriptor =
- ProcessBundleDescriptor.newBuilder().setId(descriptorId1).build();
CompletableFuture<InstructionResponse> processBundleResponseFuture = new
CompletableFuture<>();
when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class)))
.thenReturn(new CompletableFuture<>())
@@ -400,18 +421,16 @@ public void
handleCleanupWithStateWhenProcessingBundleFails() throws Exception {
FullWindowedValueCoder.of(StringUtf8Coder.of(), Coder.INSTANCE);
BundleProcessor<String> processor =
sdkHarnessClient.getProcessor(
- descriptor,
- RemoteInputDestination.of(coder, Target.getDefaultInstance()),
- mockStateDelegator);
+ descriptor, RemoteInputDestination.of(coder, sdkGrpcReadTarget),
mockStateDelegator);
when(dataService.receive(any(), any(),
any())).thenReturn(mockOutputReceiver);
when(dataService.send(any(), eq(coder))).thenReturn(mockInputSender);
RemoteOutputReceiver mockRemoteOutputReceiver =
mock(RemoteOutputReceiver.class);
try {
- try (ActiveBundle<String> activeBundle =
+ try (ActiveBundle activeBundle =
processor.newBundle(
- ImmutableMap.of(Target.getDefaultInstance(),
mockRemoteOutputReceiver),
+ ImmutableMap.of(sdkGrpcWriteTarget, mockRemoteOutputReceiver),
mockStateHandler,
mockProgressHandler)) {
processBundleResponseFuture.completeExceptionally(testException);
@@ -428,14 +447,11 @@ public void
handleCleanupWithStateWhenProcessingBundleFails() throws Exception {
@Test
public void handleCleanupWhenAwaitingOnClosingOutputReceivers() throws
Exception {
- String descriptorId1 = "descriptor1";
Exception testException = new Exception();
InboundDataClient mockOutputReceiver = mock(InboundDataClient.class);
CloseableFnDataReceiver mockInputSender =
mock(CloseableFnDataReceiver.class);
- ProcessBundleDescriptor descriptor =
- ProcessBundleDescriptor.newBuilder().setId(descriptorId1).build();
CompletableFuture<InstructionResponse> processBundleResponseFuture = new
CompletableFuture<>();
when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class)))
.thenReturn(new CompletableFuture<>())
@@ -445,7 +461,7 @@ public void
handleCleanupWhenAwaitingOnClosingOutputReceivers() throws Exception
FullWindowedValueCoder.of(StringUtf8Coder.of(), Coder.INSTANCE);
BundleProcessor<String> processor =
sdkHarnessClient.getProcessor(
- descriptor, RemoteInputDestination.of(coder,
Target.getDefaultInstance()));
+ descriptor, RemoteInputDestination.of(coder, sdkGrpcReadTarget));
when(dataService.receive(any(), any(),
any())).thenReturn(mockOutputReceiver);
when(dataService.send(any(), eq(coder))).thenReturn(mockInputSender);
doThrow(testException).when(mockOutputReceiver).awaitCompletion();
@@ -454,10 +470,9 @@ public void
handleCleanupWhenAwaitingOnClosingOutputReceivers() throws Exception
BundleProgressHandler mockProgressHandler =
mock(BundleProgressHandler.class);
try {
- try (ActiveBundle<String> activeBundle =
+ try (ActiveBundle activeBundle =
processor.newBundle(
- ImmutableMap.of(Target.getDefaultInstance(),
mockRemoteOutputReceiver),
- mockProgressHandler)) {
+ ImmutableMap.of(sdkGrpcWriteTarget, mockRemoteOutputReceiver),
mockProgressHandler)) {
// Correlating the ProcessBundleRequest and ProcessBundleResponse is
owned by the underlying
// FnApiControlClient. The SdkHarnessClient owns just wrapping the
request and unwrapping
// the response.
@@ -477,7 +492,6 @@ public void
handleCleanupWhenAwaitingOnClosingOutputReceivers() throws Exception
@Test
public void handleCleanupWithStateWhenAwaitingOnClosingOutputReceivers()
throws Exception {
- String descriptorId1 = "descriptor1";
Exception testException = new Exception();
InboundDataClient mockOutputReceiver = mock(InboundDataClient.class);
@@ -489,8 +503,6 @@ public void
handleCleanupWithStateWhenAwaitingOnClosingOutputReceivers() throws
StateRequestHandler mockStateHandler = mock(StateRequestHandler.class);
BundleProgressHandler mockProgressHandler =
mock(BundleProgressHandler.class);
- ProcessBundleDescriptor descriptor =
- ProcessBundleDescriptor.newBuilder().setId(descriptorId1).build();
CompletableFuture<InstructionResponse> processBundleResponseFuture = new
CompletableFuture<>();
when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class)))
.thenReturn(new CompletableFuture<>())
@@ -500,9 +512,7 @@ public void
handleCleanupWithStateWhenAwaitingOnClosingOutputReceivers() throws
FullWindowedValueCoder.of(StringUtf8Coder.of(), Coder.INSTANCE);
BundleProcessor<String> processor =
sdkHarnessClient.getProcessor(
- descriptor,
- RemoteInputDestination.of(coder, Target.getDefaultInstance()),
- mockStateDelegator);
+ descriptor, RemoteInputDestination.of(coder, sdkGrpcReadTarget),
mockStateDelegator);
when(dataService.receive(any(), any(),
any())).thenReturn(mockOutputReceiver);
when(dataService.send(any(), eq(coder))).thenReturn(mockInputSender);
doThrow(testException).when(mockOutputReceiver).awaitCompletion();
@@ -510,9 +520,9 @@ public void
handleCleanupWithStateWhenAwaitingOnClosingOutputReceivers() throws
RemoteOutputReceiver mockRemoteOutputReceiver =
mock(RemoteOutputReceiver.class);
try {
- try (ActiveBundle<String> activeBundle =
+ try (ActiveBundle activeBundle =
processor.newBundle(
- ImmutableMap.of(Target.getDefaultInstance(),
mockRemoteOutputReceiver),
+ ImmutableMap.of(sdkGrpcWriteTarget, mockRemoteOutputReceiver),
mockStateHandler,
mockProgressHandler)) {
// Correlating the ProcessBundleRequest and ProcessBundleResponse is
owned by the underlying
@@ -532,52 +542,6 @@ public void
handleCleanupWithStateWhenAwaitingOnClosingOutputReceivers() throws
}
}
- private BeamFnApi.ProcessBundleDescriptor getProcessBundleDescriptor(
- Endpoints.ApiServiceDescriptor endpoint) throws IOException {
- Pipeline userPipeline = Pipeline.create();
- TupleTag<String> outputTag = new TupleTag<>();
- userPipeline
- .apply("create", Create.of("foo"))
- .apply("proc", ParDo.of(new TestFn()).withOutputTags(outputTag,
TupleTagList.empty()));
- RunnerApi.Pipeline userProto = PipelineTranslation.toProto(userPipeline);
-
- ProcessBundleDescriptor.Builder pbdBuilder =
- ProcessBundleDescriptor.newBuilder()
- .setId("my_id")
- .putAllEnvironments(userProto.getComponents().getEnvironmentsMap())
-
.putAllWindowingStrategies(userProto.getComponents().getWindowingStrategiesMap())
- .putAllCoders(userProto.getComponents().getCodersMap());
- RunnerApi.Coder fullValueCoder =
-
CoderTranslation.toProto(WindowedValue.getFullCoder(StringUtf8Coder.of(),
Coder.INSTANCE))
- .getCoder();
- pbdBuilder.putCoders("wire_coder", fullValueCoder);
-
- PTransform targetProcessor =
userProto.getComponents().getTransformsOrThrow("proc");
- RemoteGrpcPort port =
- RemoteGrpcPort.newBuilder()
- .setApiServiceDescriptor(endpoint)
- .setCoderId("wire_coder")
- .build();
- RemoteGrpcPortRead readNode =
- RemoteGrpcPortRead.readFromPort(
- port, getOnlyElement(targetProcessor.getInputsMap().values()));
- RemoteGrpcPortWrite writeNode =
- RemoteGrpcPortWrite.writeToPort(
- getOnlyElement(targetProcessor.getOutputsMap().values()), port);
- // TODO: Ensure cross-env (Runner <-> SDK GRPC Read/Write Node) coders are
length-prefixed
- for (String pc : targetProcessor.getInputsMap().values()) {
- pbdBuilder.putPcollections(pc,
userProto.getComponents().getPcollectionsOrThrow(pc));
- }
- for (String pc : targetProcessor.getOutputsMap().values()) {
- pbdBuilder.putPcollections(pc,
userProto.getComponents().getPcollectionsOrThrow(pc));
- }
- pbdBuilder
- .putTransforms("proc", targetProcessor)
- .putTransforms("read", readNode.toPTransform())
- .putTransforms("write", writeNode.toPTransform());
- return pbdBuilder.build();
- }
-
private static class TestFn extends DoFn<String, String> {
@ProcessElement
public void processElement(ProcessContext context) {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 127769)
Time Spent: 1h 20m (was: 1h 10m)
> Update pipeline representation in runner support libraries to handle timers
> ---------------------------------------------------------------------------
>
> Key: BEAM-4658
> URL: https://issues.apache.org/jira/browse/BEAM-4658
> Project: Beam
> Issue Type: Sub-task
> Components: runner-core
> Reporter: Luke Cwik
> Assignee: Luke Cwik
> Priority: Major
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
> Expose relevant timer information on *ExecutableStage* to runners
> Update fusion logic to handle timers.
> Provide execution time interface to fire timers into *RemoteBundle*s and also
> to receive new timers that are being set.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)