Repository: beam Updated Branches: refs/heads/master d0deb6cc8 -> 8503adbbc
[BEAM-1347] Plumb through a yet to be created state client through PTransformRunnerFactory Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cfb79883 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cfb79883 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cfb79883 Branch: refs/heads/master Commit: cfb798830042b28eaf343103724779c90092535c Parents: d0deb6c Author: Luke Cwik <[email protected]> Authored: Fri Jul 7 14:02:58 2017 -0700 Committer: Luke Cwik <[email protected]> Committed: Wed Aug 16 14:48:07 2017 -0700 ---------------------------------------------------------------------- .../beam/fn/harness/BeamFnDataReadRunner.java | 2 ++ .../beam/fn/harness/BeamFnDataWriteRunner.java | 2 ++ .../beam/fn/harness/BoundedSourceRunner.java | 2 ++ .../apache/beam/fn/harness/FnApiDoFnRunner.java | 8 +++++++ .../org/apache/beam/fn/harness/FnHarness.java | 7 ++++-- .../fn/harness/PTransformRunnerFactory.java | 5 +++- .../harness/control/ProcessBundleHandler.java | 11 +++++++-- .../fn/harness/state/BeamFnStateClient.java | 25 ++++++++++++++++++++ .../beam/fn/harness/state/package-info.java | 22 +++++++++++++++++ .../fn/harness/BeamFnDataReadRunnerTest.java | 1 + .../fn/harness/BeamFnDataWriteRunnerTest.java | 1 + .../fn/harness/BoundedSourceRunnerTest.java | 1 + .../beam/fn/harness/FnApiDoFnRunnerTest.java | 1 + .../control/ProcessBundleHandlerTest.java | 9 +++++++ 14 files changed, 92 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java index df0e5a2..f254ec4 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java @@ -33,6 +33,7 @@ import java.util.function.Supplier; import org.apache.beam.fn.harness.data.BeamFnDataClient; import org.apache.beam.fn.harness.fn.ThrowingConsumer; import org.apache.beam.fn.harness.fn.ThrowingRunnable; +import org.apache.beam.fn.harness.state.BeamFnStateClient; import org.apache.beam.fn.v1.BeamFnApi; import org.apache.beam.runners.core.construction.CoderTranslation; import org.apache.beam.runners.core.construction.RehydratedComponents; @@ -77,6 +78,7 @@ public class BeamFnDataReadRunner<OutputT> { public BeamFnDataReadRunner<OutputT> createRunnerForPTransform( PipelineOptions pipelineOptions, BeamFnDataClient beamFnDataClient, + BeamFnStateClient beamFnStateClient, String pTransformId, RunnerApi.PTransform pTransform, Supplier<String> processBundleInstructionId, http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java index 48b450a..179a228 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java @@ -32,6 +32,7 @@ import org.apache.beam.fn.harness.data.BeamFnDataClient; import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer; import org.apache.beam.fn.harness.fn.ThrowingConsumer; import org.apache.beam.fn.harness.fn.ThrowingRunnable; +import org.apache.beam.fn.harness.state.BeamFnStateClient; import org.apache.beam.fn.v1.BeamFnApi; import org.apache.beam.runners.core.construction.CoderTranslation; import org.apache.beam.runners.core.construction.RehydratedComponents; @@ -72,6 +73,7 @@ public class BeamFnDataWriteRunner<InputT> { public BeamFnDataWriteRunner<InputT> createRunnerForPTransform( PipelineOptions pipelineOptions, BeamFnDataClient beamFnDataClient, + BeamFnStateClient beamFnStateClient, String pTransformId, RunnerApi.PTransform pTransform, Supplier<String> processBundleInstructionId, http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java index 5f6509f..c4daa0f 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java @@ -31,6 +31,7 @@ import java.util.function.Supplier; import org.apache.beam.fn.harness.data.BeamFnDataClient; import org.apache.beam.fn.harness.fn.ThrowingConsumer; import org.apache.beam.fn.harness.fn.ThrowingRunnable; +import org.apache.beam.fn.harness.state.BeamFnStateClient; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Source.Reader; @@ -64,6 +65,7 @@ public class BoundedSourceRunner<InputT extends BoundedSource<OutputT>, OutputT> public BoundedSourceRunner<InputT, OutputT> createRunnerForPTransform( PipelineOptions pipelineOptions, BeamFnDataClient beamFnDataClient, + BeamFnStateClient beamFnStateClient, String pTransformId, RunnerApi.PTransform pTransform, Supplier<String> processBundleInstructionId, http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index 86168f9..d325bb2 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -35,6 +35,7 @@ import java.util.function.Supplier; import org.apache.beam.fn.harness.data.BeamFnDataClient; import org.apache.beam.fn.harness.fn.ThrowingConsumer; import org.apache.beam.fn.harness.fn.ThrowingRunnable; +import org.apache.beam.fn.harness.state.BeamFnStateClient; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.construction.ParDoTranslation; import org.apache.beam.runners.dataflow.util.DoFnInfo; @@ -48,6 +49,8 @@ import org.apache.beam.sdk.transforms.DoFn.OnTimerContext; import org.apache.beam.sdk.transforms.DoFn.ProcessContext; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -86,6 +89,7 @@ public class FnApiDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Outp public DoFnRunner<InputT, OutputT> createRunnerForPTransform( PipelineOptions pipelineOptions, BeamFnDataClient beamFnDataClient, + BeamFnStateClient beamFnStateClient, String pTransformId, RunnerApi.PTransform pTransform, Supplier<String> processBundleInstructionId, @@ -165,6 +169,8 @@ public class FnApiDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Outp private final StartBundleContext startBundleContext; private final ProcessBundleContext processBundleContext; private final FinishBundleContext finishBundleContext; + private final WindowingStrategy windowingStrategy; + private final DoFnSignature doFnSignature; /** * The lifetime of this member is only valid during {@link #processElement(WindowedValue)}. @@ -186,6 +192,8 @@ public class FnApiDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Outp this.doFn = doFn; this.mainOutputConsumers = mainOutputConsumers; this.outputMap = outputMap; + this.windowingStrategy = windowingStrategy; + this.doFnSignature = DoFnSignatures.signatureForDoFn(doFn); this.doFnInvoker = DoFnInvokers.invokerFor(doFn); this.startBundleContext = new StartBundleContext(); this.processBundleContext = new ProcessBundleContext(); http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java index 05ab44f..a79ecca 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java @@ -109,8 +109,11 @@ public class FnHarness { BeamFnDataGrpcClient beamFnDataMultiplexer = new BeamFnDataGrpcClient( options, channelFactory::forDescriptor, streamObserverFactory::from); - ProcessBundleHandler processBundleHandler = - new ProcessBundleHandler(options, fnApiRegistry::getById, beamFnDataMultiplexer); + ProcessBundleHandler processBundleHandler = new ProcessBundleHandler( + options, + fnApiRegistry::getById, + beamFnDataMultiplexer, + null /* beamFnStateClient */); handlers.put(BeamFnApi.InstructionRequest.RequestCase.REGISTER, fnApiRegistry::register); handlers.put(BeamFnApi.InstructionRequest.RequestCase.PROCESS_BUNDLE, http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java index 7cf0610..4ef56d8 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java @@ -25,6 +25,7 @@ import java.util.function.Supplier; import org.apache.beam.fn.harness.data.BeamFnDataClient; import org.apache.beam.fn.harness.fn.ThrowingConsumer; import org.apache.beam.fn.harness.fn.ThrowingRunnable; +import org.apache.beam.fn.harness.state.BeamFnStateClient; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.WindowedValue; @@ -40,7 +41,8 @@ public interface PTransformRunnerFactory<T> { * element processing, or during execution of start/finish. * * @param pipelineOptions Pipeline options - * @param beamFnDataClient + * @param beamFnDataClient A client for handling inbound and outbound data streams. + * @param beamFnStateClient A client for handling state requests. * @param pTransformId The id of the PTransform. * @param pTransform The PTransform definition. * @param processBundleInstructionId A supplier containing the active process bundle instruction @@ -58,6 +60,7 @@ public interface PTransformRunnerFactory<T> { T createRunnerForPTransform( PipelineOptions pipelineOptions, BeamFnDataClient beamFnDataClient, + BeamFnStateClient beamFnStateClient, String pTransformId, RunnerApi.PTransform pTransform, Supplier<String> processBundleInstructionId, http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java index 1e73570..67c4d67 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java @@ -39,6 +39,7 @@ import org.apache.beam.fn.harness.PTransformRunnerFactory.Registrar; import org.apache.beam.fn.harness.data.BeamFnDataClient; import org.apache.beam.fn.harness.fn.ThrowingConsumer; import org.apache.beam.fn.harness.fn.ThrowingRunnable; +import org.apache.beam.fn.harness.state.BeamFnStateClient; import org.apache.beam.fn.v1.BeamFnApi; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.options.PipelineOptions; @@ -83,6 +84,7 @@ public class ProcessBundleHandler { private final PipelineOptions options; private final Function<String, Message> fnApiRegistry; private final BeamFnDataClient beamFnDataClient; + private final BeamFnStateClient beamFnStateClient; private final Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap; private final PTransformRunnerFactory defaultPTransformRunnerFactory; @@ -90,8 +92,9 @@ public class ProcessBundleHandler { public ProcessBundleHandler( PipelineOptions options, Function<String, Message> fnApiRegistry, - BeamFnDataClient beamFnDataClient) { - this(options, fnApiRegistry, beamFnDataClient, REGISTERED_RUNNER_FACTORIES); + BeamFnDataClient beamFnDataClient, + BeamFnStateClient beamFnStateClient) { + this(options, fnApiRegistry, beamFnDataClient, beamFnStateClient, REGISTERED_RUNNER_FACTORIES); } @VisibleForTesting @@ -99,16 +102,19 @@ public class ProcessBundleHandler { PipelineOptions options, Function<String, Message> fnApiRegistry, BeamFnDataClient beamFnDataClient, + BeamFnStateClient beamFnStateClient, Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap) { this.options = options; this.fnApiRegistry = fnApiRegistry; this.beamFnDataClient = beamFnDataClient; + this.beamFnStateClient = beamFnStateClient; this.urnToPTransformRunnerFactoryMap = urnToPTransformRunnerFactoryMap; this.defaultPTransformRunnerFactory = new PTransformRunnerFactory<Object>() { @Override public Object createRunnerForPTransform( PipelineOptions pipelineOptions, BeamFnDataClient beamFnDataClient, + BeamFnStateClient beanFnStateClient, String pTransformId, RunnerApi.PTransform pTransform, Supplier<String> processBundleInstructionId, @@ -162,6 +168,7 @@ public class ProcessBundleHandler { .createRunnerForPTransform( options, beamFnDataClient, + beamFnStateClient, pTransformId, pTransform, processBundleInstructionId, http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateClient.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateClient.java new file mode 100644 index 0000000..8150530 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/BeamFnStateClient.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness.state; + +/** + * TODO: Define interface required for handling state calls. + */ +public interface BeamFnStateClient { + +} http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/package-info.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/package-info.java new file mode 100644 index 0000000..feadb7d --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * State client and state caching. + */ +package org.apache.beam.fn.harness.state; http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java index 92e6088..e5b4968 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java @@ -141,6 +141,7 @@ public class BeamFnDataReadRunnerTest { new BeamFnDataReadRunner.Factory<String>().createRunnerForPTransform( PipelineOptionsFactory.create(), mockBeamFnDataClient, + null /* beamFnStateClient */, "pTransformId", pTransform, Suppliers.ofInstance(bundleId)::get, http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java index ffa3a2d..c4b717a 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java @@ -127,6 +127,7 @@ public class BeamFnDataWriteRunnerTest { new BeamFnDataWriteRunner.Factory<String>().createRunnerForPTransform( PipelineOptionsFactory.create(), mockBeamFnDataClient, + null /* beamFnStateClient */, "ptransformId", pTransform, Suppliers.ofInstance(bundleId)::get, http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java index b9f22e8..135495a 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BoundedSourceRunnerTest.java @@ -140,6 +140,7 @@ public class BoundedSourceRunnerTest { new BoundedSourceRunner.Factory<>().createRunnerForPTransform( PipelineOptionsFactory.create(), null /* beamFnDataClient */, + null /* beamFnStateClient */, "pTransformId", pTransform, Suppliers.ofInstance("57L")::get, http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java index efa8fcf..ebec608 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java @@ -126,6 +126,7 @@ public class FnApiDoFnRunnerTest { new FnApiDoFnRunner.Factory<>().createRunnerForPTransform( PipelineOptionsFactory.create(), null /* beamFnDataClient */, + null /* beamFnStateClient */, pTransformId, pTransform, Suppliers.ofInstance("57L")::get, http://git-wip-us.apache.org/repos/asf/beam/blob/cfb79883/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java index 0a94b5b..d0e1faf 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java @@ -35,6 +35,7 @@ import org.apache.beam.fn.harness.PTransformRunnerFactory; import org.apache.beam.fn.harness.data.BeamFnDataClient; import org.apache.beam.fn.harness.fn.ThrowingConsumer; import org.apache.beam.fn.harness.fn.ThrowingRunnable; +import org.apache.beam.fn.harness.state.BeamFnStateClient; import org.apache.beam.fn.v1.BeamFnApi; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.options.PipelineOptions; @@ -91,6 +92,7 @@ public class ProcessBundleHandlerTest { public Object createRunnerForPTransform( PipelineOptions pipelineOptions, BeamFnDataClient beamFnDataClient, + BeamFnStateClient beamFnStateClient, String pTransformId, RunnerApi.PTransform pTransform, Supplier<String> processBundleInstructionId, @@ -115,6 +117,7 @@ public class ProcessBundleHandlerTest { PipelineOptionsFactory.create(), fnApiRegistry::get, beamFnDataClient, + null /* beamFnStateClient */, ImmutableMap.of( DATA_INPUT_URN, startFinishRecorder, DATA_OUTPUT_URN, startFinishRecorder)); @@ -147,11 +150,13 @@ public class ProcessBundleHandlerTest { PipelineOptionsFactory.create(), fnApiRegistry::get, beamFnDataClient, + null /* beamFnStateClient */, ImmutableMap.of(DATA_INPUT_URN, new PTransformRunnerFactory<Object>() { @Override public Object createRunnerForPTransform( PipelineOptions pipelineOptions, BeamFnDataClient beamFnDataClient, + BeamFnStateClient beamFnStateClient, String pTransformId, RunnerApi.PTransform pTransform, Supplier<String> processBundleInstructionId, @@ -185,11 +190,13 @@ public class ProcessBundleHandlerTest { PipelineOptionsFactory.create(), fnApiRegistry::get, beamFnDataClient, + null /* beamFnStateClient */, ImmutableMap.of(DATA_INPUT_URN, new PTransformRunnerFactory<Object>() { @Override public Object createRunnerForPTransform( PipelineOptions pipelineOptions, BeamFnDataClient beamFnDataClient, + BeamFnStateClient beamFnStateClient, String pTransformId, RunnerApi.PTransform pTransform, Supplier<String> processBundleInstructionId, @@ -224,11 +231,13 @@ public class ProcessBundleHandlerTest { PipelineOptionsFactory.create(), fnApiRegistry::get, beamFnDataClient, + null /* beamFnStateClient */, ImmutableMap.of(DATA_INPUT_URN, new PTransformRunnerFactory<Object>() { @Override public Object createRunnerForPTransform( PipelineOptions pipelineOptions, BeamFnDataClient beamFnDataClient, + BeamFnStateClient beamFnStateClient, String pTransformId, RunnerApi.PTransform pTransform, Supplier<String> processBundleInstructionId,
