Fix split package in SDK harness The Java SDK harness defined classes both in its own namespace org.apache.beam.fn.harness and the org.apache.beam.runners.core namespace, resulting in a split package across multiple jars.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f1b4700f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f1b4700f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f1b4700f Branch: refs/heads/master Commit: f1b4700f32c5ea39559145d6f5db3909439f6c80 Parents: 7e4719c Author: Kenneth Knowles <[email protected]> Authored: Mon Jul 17 13:46:46 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Mon Jul 17 13:46:46 2017 -0700 ---------------------------------------------------------------------- .../beam/fn/harness/BeamFnDataReadRunner.java | 173 ++++++ .../beam/fn/harness/BeamFnDataWriteRunner.java | 159 ++++++ .../beam/fn/harness/BoundedSourceRunner.java | 167 ++++++ .../apache/beam/fn/harness/FnApiDoFnRunner.java | 548 +++++++++++++++++++ .../fn/harness/PTransformRunnerFactory.java | 81 +++ .../harness/control/ProcessBundleHandler.java | 4 +- .../beam/runners/core/BeamFnDataReadRunner.java | 173 ------ .../runners/core/BeamFnDataWriteRunner.java | 159 ------ .../beam/runners/core/BoundedSourceRunner.java | 167 ------ .../beam/runners/core/FnApiDoFnRunner.java | 547 ------------------ .../runners/core/PTransformRunnerFactory.java | 81 --- .../apache/beam/runners/core/package-info.java | 22 - .../fn/harness/BeamFnDataReadRunnerTest.java | 281 ++++++++++ .../fn/harness/BeamFnDataWriteRunnerTest.java | 269 +++++++++ .../fn/harness/BoundedSourceRunnerTest.java | 187 +++++++ .../beam/fn/harness/FnApiDoFnRunnerTest.java | 210 +++++++ .../control/ProcessBundleHandlerTest.java | 2 +- .../runners/core/BeamFnDataReadRunnerTest.java | 281 ---------- .../runners/core/BeamFnDataWriteRunnerTest.java | 269 --------- .../runners/core/BoundedSourceRunnerTest.java | 187 ------- .../beam/runners/core/FnApiDoFnRunnerTest.java | 210 ------- 21 files changed, 2078 insertions(+), 2099 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/f1b4700f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java new file mode 100644 index 0000000..e2c17b0 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.fn.harness; + +import static com.google.common.collect.Iterables.getOnlyElement; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Multimap; +import com.google.protobuf.BytesValue; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.function.Supplier; +import org.apache.beam.fn.harness.data.BeamFnDataClient; +import org.apache.beam.fn.harness.fn.ThrowingConsumer; +import org.apache.beam.fn.harness.fn.ThrowingRunnable; +import org.apache.beam.fn.v1.BeamFnApi; +import org.apache.beam.runners.dataflow.util.CloudObject; +import org.apache.beam.runners.dataflow.util.CloudObjects; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Registers as a consumer for data over the Beam Fn API. Multiplexes any received data + * to all consumers in the specified output map. + * + * <p>Can be re-used serially across {@link org.apache.beam.fn.v1.BeamFnApi.ProcessBundleRequest}s. + * For each request, call {@link #registerInputLocation()} to start and call + * {@link #blockTillReadFinishes()} to finish. + */ +public class BeamFnDataReadRunner<OutputT> { + + private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataReadRunner.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String URN = "urn:org.apache.beam:source:runner:0.1"; + + /** A registrar which provides a factory to handle reading from the Fn Api Data Plane. */ + @AutoService(PTransformRunnerFactory.Registrar.class) + public static class Registrar implements + PTransformRunnerFactory.Registrar { + + @Override + public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() { + return ImmutableMap.of(URN, new Factory()); + } + } + + /** A factory for {@link BeamFnDataReadRunner}s. */ + static class Factory<OutputT> + implements PTransformRunnerFactory<BeamFnDataReadRunner<OutputT>> { + + @Override + public BeamFnDataReadRunner<OutputT> createRunnerForPTransform( + PipelineOptions pipelineOptions, + BeamFnDataClient beamFnDataClient, + String pTransformId, + RunnerApi.PTransform pTransform, + Supplier<String> processBundleInstructionId, + Map<String, RunnerApi.PCollection> pCollections, + Map<String, RunnerApi.Coder> coders, + Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers, + Consumer<ThrowingRunnable> addStartFunction, + Consumer<ThrowingRunnable> addFinishFunction) throws IOException { + + BeamFnApi.Target target = BeamFnApi.Target.newBuilder() + .setPrimitiveTransformReference(pTransformId) + .setName(getOnlyElement(pTransform.getOutputsMap().keySet())) + .build(); + RunnerApi.Coder coderSpec = coders.get(pCollections.get( + getOnlyElement(pTransform.getOutputsMap().values())).getCoderId()); + Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers = + (Collection) pCollectionIdsToConsumers.get( + getOnlyElement(pTransform.getOutputsMap().values())); + + BeamFnDataReadRunner<OutputT> runner = new BeamFnDataReadRunner<>( + pTransform.getSpec(), + processBundleInstructionId, + target, + coderSpec, + beamFnDataClient, + consumers); + addStartFunction.accept(runner::registerInputLocation); + addFinishFunction.accept(runner::blockTillReadFinishes); + return runner; + } + } + + private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor; + private final Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers; + private final Supplier<String> processBundleInstructionIdSupplier; + private final BeamFnDataClient beamFnDataClientFactory; + private final Coder<WindowedValue<OutputT>> coder; + private final BeamFnApi.Target inputTarget; + + private CompletableFuture<Void> readFuture; + + BeamFnDataReadRunner( + RunnerApi.FunctionSpec functionSpec, + Supplier<String> processBundleInstructionIdSupplier, + BeamFnApi.Target inputTarget, + RunnerApi.Coder coderSpec, + BeamFnDataClient beamFnDataClientFactory, + Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers) + throws IOException { + this.apiServiceDescriptor = functionSpec.getParameter().unpack(BeamFnApi.RemoteGrpcPort.class) + .getApiServiceDescriptor(); + this.inputTarget = inputTarget; + this.processBundleInstructionIdSupplier = processBundleInstructionIdSupplier; + this.beamFnDataClientFactory = beamFnDataClientFactory; + this.consumers = consumers; + + @SuppressWarnings("unchecked") + Coder<WindowedValue<OutputT>> coder = + (Coder<WindowedValue<OutputT>>) + CloudObjects.coderFromCloudObject( + CloudObject.fromSpec( + OBJECT_MAPPER.readValue( + coderSpec + .getSpec() + .getSpec() + .getParameter() + .unpack(BytesValue.class) + .getValue() + .newInput(), + Map.class))); + this.coder = coder; + } + + public void registerInputLocation() { + this.readFuture = beamFnDataClientFactory.forInboundConsumer( + apiServiceDescriptor, + KV.of(processBundleInstructionIdSupplier.get(), inputTarget), + coder, + this::multiplexToConsumers); + } + + public void blockTillReadFinishes() throws Exception { + LOG.debug("Waiting for process bundle instruction {} and target {} to close.", + processBundleInstructionIdSupplier.get(), inputTarget); + readFuture.get(); + } + + private void multiplexToConsumers(WindowedValue<OutputT> value) throws Exception { + for (ThrowingConsumer<WindowedValue<OutputT>> consumer : consumers) { + consumer.accept(value); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f1b4700f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java new file mode 100644 index 0000000..eec4dfd --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.fn.harness; + +import static com.google.common.collect.Iterables.getOnlyElement; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Multimap; +import com.google.protobuf.BytesValue; +import java.io.IOException; +import java.util.Map; +import java.util.function.Consumer; +import java.util.function.Supplier; +import org.apache.beam.fn.harness.data.BeamFnDataClient; +import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer; +import org.apache.beam.fn.harness.fn.ThrowingConsumer; +import org.apache.beam.fn.harness.fn.ThrowingRunnable; +import org.apache.beam.fn.v1.BeamFnApi; +import org.apache.beam.runners.dataflow.util.CloudObject; +import org.apache.beam.runners.dataflow.util.CloudObjects; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; + +/** + * Registers as a consumer with the Beam Fn Data Api. Consumes elements and encodes them for + * transmission. + * + * <p>Can be re-used serially across {@link org.apache.beam.fn.v1.BeamFnApi.ProcessBundleRequest}s. + * For each request, call {@link #registerForOutput()} to start and call {@link #close()} to finish. + */ +public class BeamFnDataWriteRunner<InputT> { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String URN = "urn:org.apache.beam:sink:runner:0.1"; + + /** A registrar which provides a factory to handle writing to the Fn Api Data Plane. */ + @AutoService(PTransformRunnerFactory.Registrar.class) + public static class Registrar implements + PTransformRunnerFactory.Registrar { + + @Override + public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() { + return ImmutableMap.of(URN, new Factory()); + } + } + + /** A factory for {@link BeamFnDataWriteRunner}s. */ + static class Factory<InputT> + implements PTransformRunnerFactory<BeamFnDataWriteRunner<InputT>> { + + @Override + public BeamFnDataWriteRunner<InputT> createRunnerForPTransform( + PipelineOptions pipelineOptions, + BeamFnDataClient beamFnDataClient, + String pTransformId, + RunnerApi.PTransform pTransform, + Supplier<String> processBundleInstructionId, + Map<String, RunnerApi.PCollection> pCollections, + Map<String, RunnerApi.Coder> coders, + Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers, + Consumer<ThrowingRunnable> addStartFunction, + Consumer<ThrowingRunnable> addFinishFunction) throws IOException { + BeamFnApi.Target target = BeamFnApi.Target.newBuilder() + .setPrimitiveTransformReference(pTransformId) + .setName(getOnlyElement(pTransform.getInputsMap().keySet())) + .build(); + RunnerApi.Coder coderSpec = coders.get( + pCollections.get(getOnlyElement(pTransform.getInputsMap().values())).getCoderId()); + BeamFnDataWriteRunner<InputT> runner = + new BeamFnDataWriteRunner<>( + pTransform.getSpec(), + processBundleInstructionId, + target, + coderSpec, + beamFnDataClient); + addStartFunction.accept(runner::registerForOutput); + pCollectionIdsToConsumers.put( + getOnlyElement(pTransform.getInputsMap().values()), + (ThrowingConsumer) + (ThrowingConsumer<WindowedValue<InputT>>) runner::consume); + addFinishFunction.accept(runner::close); + return runner; + } + } + + private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor; + private final BeamFnApi.Target outputTarget; + private final Coder<WindowedValue<InputT>> coder; + private final BeamFnDataClient beamFnDataClientFactory; + private final Supplier<String> processBundleInstructionIdSupplier; + + private CloseableThrowingConsumer<WindowedValue<InputT>> consumer; + + BeamFnDataWriteRunner( + RunnerApi.FunctionSpec functionSpec, + Supplier<String> processBundleInstructionIdSupplier, + BeamFnApi.Target outputTarget, + RunnerApi.Coder coderSpec, + BeamFnDataClient beamFnDataClientFactory) + throws IOException { + this.apiServiceDescriptor = functionSpec.getParameter().unpack(BeamFnApi.RemoteGrpcPort.class) + .getApiServiceDescriptor(); + this.beamFnDataClientFactory = beamFnDataClientFactory; + this.processBundleInstructionIdSupplier = processBundleInstructionIdSupplier; + this.outputTarget = outputTarget; + + @SuppressWarnings("unchecked") + Coder<WindowedValue<InputT>> coder = + (Coder<WindowedValue<InputT>>) + CloudObjects.coderFromCloudObject( + CloudObject.fromSpec( + OBJECT_MAPPER.readValue( + coderSpec + .getSpec() + .getSpec() + .getParameter() + .unpack(BytesValue.class) + .getValue() + .newInput(), + Map.class))); + this.coder = coder; + } + + public void registerForOutput() { + consumer = beamFnDataClientFactory.forOutboundConsumer( + apiServiceDescriptor, + KV.of(processBundleInstructionIdSupplier.get(), outputTarget), + coder); + } + + public void close() throws Exception { + consumer.close(); + } + + public void consume(WindowedValue<InputT> value) throws Exception { + consumer.accept(value); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f1b4700f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java new file mode 100644 index 0000000..977e803 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BoundedSourceRunner.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.fn.harness; + +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Multimap; +import com.google.protobuf.BytesValue; +import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.function.Consumer; +import java.util.function.Supplier; +import org.apache.beam.fn.harness.data.BeamFnDataClient; +import org.apache.beam.fn.harness.fn.ThrowingConsumer; +import org.apache.beam.fn.harness.fn.ThrowingRunnable; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.Source.Reader; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.util.WindowedValue; + +/** + * A runner which creates {@link Reader}s for each {@link BoundedSource} sent as an input and + * executes the {@link Reader}s read loop. + */ +public class BoundedSourceRunner<InputT extends BoundedSource<OutputT>, OutputT> { + + private static final String URN = "urn:org.apache.beam:source:java:0.1"; + + /** A registrar which provides a factory to handle Java {@link BoundedSource}s. */ + @AutoService(PTransformRunnerFactory.Registrar.class) + public static class Registrar implements + PTransformRunnerFactory.Registrar { + + @Override + public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() { + return ImmutableMap.of(URN, new Factory()); + } + } + + /** A factory for {@link BoundedSourceRunner}. */ + static class Factory<InputT extends BoundedSource<OutputT>, OutputT> + implements PTransformRunnerFactory<BoundedSourceRunner<InputT, OutputT>> { + @Override + public BoundedSourceRunner<InputT, OutputT> createRunnerForPTransform( + PipelineOptions pipelineOptions, + BeamFnDataClient beamFnDataClient, + String pTransformId, + RunnerApi.PTransform pTransform, + Supplier<String> processBundleInstructionId, + Map<String, RunnerApi.PCollection> pCollections, + Map<String, RunnerApi.Coder> coders, + Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers, + Consumer<ThrowingRunnable> addStartFunction, + Consumer<ThrowingRunnable> addFinishFunction) { + + ImmutableList.Builder<ThrowingConsumer<WindowedValue<?>>> consumers = ImmutableList.builder(); + for (String pCollectionId : pTransform.getOutputsMap().values()) { + consumers.addAll(pCollectionIdsToConsumers.get(pCollectionId)); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + BoundedSourceRunner<InputT, OutputT> runner = new BoundedSourceRunner( + pipelineOptions, + pTransform.getSpec(), + consumers.build()); + + // TODO: Remove and replace with source being sent across gRPC port + addStartFunction.accept(runner::start); + + ThrowingConsumer runReadLoop = + (ThrowingConsumer<WindowedValue<InputT>>) runner::runReadLoop; + for (String pCollectionId : pTransform.getInputsMap().values()) { + pCollectionIdsToConsumers.put( + pCollectionId, + runReadLoop); + } + + return runner; + } + } + + private final PipelineOptions pipelineOptions; + private final RunnerApi.FunctionSpec definition; + private final Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers; + + BoundedSourceRunner( + PipelineOptions pipelineOptions, + RunnerApi.FunctionSpec definition, + Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers) { + this.pipelineOptions = pipelineOptions; + this.definition = definition; + this.consumers = consumers; + } + + /** + * The runner harness is meant to send the source over the Beam Fn Data API which would be + * consumed by the {@link #runReadLoop}. Drop this method once the runner harness sends the + * source instead of unpacking it from the data block of the function specification. + */ + @Deprecated + public void start() throws Exception { + try { + // The representation here is defined as the java serialized representation of the + // bounded source object packed into a protobuf Any using a protobuf BytesValue wrapper. + byte[] bytes = definition.getParameter().unpack(BytesValue.class).getValue().toByteArray(); + @SuppressWarnings("unchecked") + InputT boundedSource = + (InputT) SerializableUtils.deserializeFromByteArray(bytes, definition.toString()); + runReadLoop(WindowedValue.valueInGlobalWindow(boundedSource)); + } catch (InvalidProtocolBufferException e) { + throw new IOException( + String.format("Failed to decode %s, expected %s", + definition.getParameter().getTypeUrl(), BytesValue.getDescriptor().getFullName()), + e); + } + } + + /** + * Creates a {@link Reader} for each {@link BoundedSource} and executes the {@link Reader}s + * read loop. See {@link Reader} for further details of the read loop. + * + * <p>Propagates any exceptions caused during reading or processing via a consumer to the + * caller. + */ + public void runReadLoop(WindowedValue<InputT> value) throws Exception { + try (Reader<OutputT> reader = value.getValue().createReader(pipelineOptions)) { + if (!reader.start()) { + // Reader has no data, immediately return + return; + } + do { + // TODO: Should this use the input window as the window for all the outputs? + WindowedValue<OutputT> nextValue = WindowedValue.timestampedValueInGlobalWindow( + reader.getCurrent(), reader.getCurrentTimestamp()); + for (ThrowingConsumer<WindowedValue<OutputT>> consumer : consumers) { + consumer.accept(nextValue); + } + } while (reader.advance()); + } + } + + @Override + public String toString() { + return definition.toString(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f1b4700f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java new file mode 100644 index 0000000..97bd71c --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -0,0 +1,548 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.service.AutoService; +import com.google.common.collect.Collections2; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMultimap; +import com.google.common.collect.Multimap; +import com.google.protobuf.ByteString; +import com.google.protobuf.BytesValue; +import com.google.protobuf.InvalidProtocolBufferException; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.function.Consumer; +import java.util.function.Supplier; +import org.apache.beam.fn.harness.data.BeamFnDataClient; +import org.apache.beam.fn.harness.fn.ThrowingConsumer; +import org.apache.beam.fn.harness.fn.ThrowingRunnable; +import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.runners.core.construction.ParDoTranslation; +import org.apache.beam.runners.dataflow.util.DoFnInfo; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.state.State; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.OnTimerContext; +import org.apache.beam.sdk.transforms.DoFn.ProcessContext; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.util.UserCodeException; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.joda.time.Instant; + +/** + * A {@link DoFnRunner} specific to integrating with the Fn Api. This is to remove the layers + * of abstraction caused by StateInternals/TimerInternals since they model state and timer + * concepts differently. + */ +public class FnApiDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> { + /** + * A registrar which provides a factory to handle Java {@link DoFn}s. + */ + @AutoService(PTransformRunnerFactory.Registrar.class) + public static class Registrar implements + PTransformRunnerFactory.Registrar { + + @Override + public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() { + return ImmutableMap.of(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN, new Factory()); + } + } + + /** A factory for {@link FnApiDoFnRunner}. */ + static class Factory<InputT, OutputT> + implements PTransformRunnerFactory<DoFnRunner<InputT, OutputT>> { + + @Override + public DoFnRunner<InputT, OutputT> createRunnerForPTransform( + PipelineOptions pipelineOptions, + BeamFnDataClient beamFnDataClient, + String pTransformId, + RunnerApi.PTransform pTransform, + Supplier<String> processBundleInstructionId, + Map<String, RunnerApi.PCollection> pCollections, + Map<String, RunnerApi.Coder> coders, + Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers, + Consumer<ThrowingRunnable> addStartFunction, + Consumer<ThrowingRunnable> addFinishFunction) { + + // For every output PCollection, create a map from output name to Consumer + ImmutableMap.Builder<String, Collection<ThrowingConsumer<WindowedValue<?>>>> + outputMapBuilder = ImmutableMap.builder(); + for (Map.Entry<String, String> entry : pTransform.getOutputsMap().entrySet()) { + outputMapBuilder.put( + entry.getKey(), + pCollectionIdsToConsumers.get(entry.getValue())); + } + ImmutableMap<String, Collection<ThrowingConsumer<WindowedValue<?>>>> outputMap = + outputMapBuilder.build(); + + // Get the DoFnInfo from the serialized blob. + ByteString serializedFn; + try { + serializedFn = pTransform.getSpec().getParameter().unpack(BytesValue.class).getValue(); + } catch (InvalidProtocolBufferException e) { + throw new IllegalArgumentException( + String.format("Unable to unwrap DoFn %s", pTransform.getSpec()), e); + } + @SuppressWarnings({"unchecked", "rawtypes"}) + DoFnInfo<InputT, OutputT> doFnInfo = (DoFnInfo) SerializableUtils.deserializeFromByteArray( + serializedFn.toByteArray(), "DoFnInfo"); + + // Verify that the DoFnInfo tag to output map matches the output map on the PTransform. + checkArgument( + Objects.equals( + new HashSet<>(Collections2.transform(outputMap.keySet(), Long::parseLong)), + doFnInfo.getOutputMap().keySet()), + "Unexpected mismatch between transform output map %s and DoFnInfo output map %s.", + outputMap.keySet(), + doFnInfo.getOutputMap()); + + ImmutableMultimap.Builder<TupleTag<?>, + ThrowingConsumer<WindowedValue<?>>> tagToOutputMapBuilder = + ImmutableMultimap.builder(); + for (Map.Entry<Long, TupleTag<?>> entry : doFnInfo.getOutputMap().entrySet()) { + @SuppressWarnings({"unchecked", "rawtypes"}) + Collection<ThrowingConsumer<WindowedValue<?>>> consumers = + outputMap.get(Long.toString(entry.getKey())); + tagToOutputMapBuilder.putAll(entry.getValue(), consumers); + } + + ImmutableMultimap<TupleTag<?>, ThrowingConsumer<WindowedValue<?>>> tagToOutputMap = + tagToOutputMapBuilder.build(); + + @SuppressWarnings({"unchecked", "rawtypes"}) + DoFnRunner<InputT, OutputT> runner = new FnApiDoFnRunner<>( + pipelineOptions, + doFnInfo.getDoFn(), + (Collection<ThrowingConsumer<WindowedValue<OutputT>>>) (Collection) + tagToOutputMap.get(doFnInfo.getOutputMap().get(doFnInfo.getMainOutput())), + tagToOutputMap, + doFnInfo.getWindowingStrategy()); + + // Register the appropriate handlers. + addStartFunction.accept(runner::startBundle); + for (String pcollectionId : pTransform.getInputsMap().values()) { + pCollectionIdsToConsumers.put( + pcollectionId, + (ThrowingConsumer) (ThrowingConsumer<WindowedValue<InputT>>) runner::processElement); + } + addFinishFunction.accept(runner::finishBundle); + return runner; + } + } + + ////////////////////////////////////////////////////////////////////////////////////////////////// + + private final PipelineOptions pipelineOptions; + private final DoFn<InputT, OutputT> doFn; + private final Collection<ThrowingConsumer<WindowedValue<OutputT>>> mainOutputConsumers; + private final Multimap<TupleTag<?>, ThrowingConsumer<WindowedValue<?>>> outputMap; + private final DoFnInvoker<InputT, OutputT> doFnInvoker; + private final StartBundleContext startBundleContext; + private final ProcessBundleContext processBundleContext; + private final FinishBundleContext finishBundleContext; + + /** + * The lifetime of this member is only valid during {@link #processElement(WindowedValue)}. + */ + private WindowedValue<InputT> currentElement; + + /** + * The lifetime of this member is only valid during {@link #processElement(WindowedValue)}. + */ + private BoundedWindow currentWindow; + + FnApiDoFnRunner( + PipelineOptions pipelineOptions, + DoFn<InputT, OutputT> doFn, + Collection<ThrowingConsumer<WindowedValue<OutputT>>> mainOutputConsumers, + Multimap<TupleTag<?>, ThrowingConsumer<WindowedValue<?>>> outputMap, + WindowingStrategy windowingStrategy) { + this.pipelineOptions = pipelineOptions; + this.doFn = doFn; + this.mainOutputConsumers = mainOutputConsumers; + this.outputMap = outputMap; + this.doFnInvoker = DoFnInvokers.invokerFor(doFn); + this.startBundleContext = new StartBundleContext(); + this.processBundleContext = new ProcessBundleContext(); + this.finishBundleContext = new FinishBundleContext(); + } + + @Override + public void startBundle() { + doFnInvoker.invokeStartBundle(startBundleContext); + } + + @Override + public void processElement(WindowedValue<InputT> elem) { + currentElement = elem; + try { + Iterator<BoundedWindow> windowIterator = + (Iterator<BoundedWindow>) elem.getWindows().iterator(); + while (windowIterator.hasNext()) { + currentWindow = windowIterator.next(); + doFnInvoker.invokeProcessElement(processBundleContext); + } + } finally { + currentElement = null; + currentWindow = null; + } + } + + @Override + public void onTimer( + String timerId, + BoundedWindow window, + Instant timestamp, + TimeDomain timeDomain) { + throw new UnsupportedOperationException("TODO: Add support for timers"); + } + + @Override + public void finishBundle() { + doFnInvoker.invokeFinishBundle(finishBundleContext); + } + + /** + * Outputs the given element to the specified set of consumers wrapping any exceptions. + */ + private <T> void outputTo( + Collection<ThrowingConsumer<WindowedValue<T>>> consumers, + WindowedValue<T> output) { + Iterator<ThrowingConsumer<WindowedValue<T>>> consumerIterator; + try { + for (ThrowingConsumer<WindowedValue<T>> consumer : consumers) { + consumer.accept(output); + } + } catch (Throwable t) { + throw UserCodeException.wrap(t); + } + } + + /** + * Provides arguments for a {@link DoFnInvoker} for {@link DoFn.StartBundle @StartBundle}. + */ + private class StartBundleContext + extends DoFn<InputT, OutputT>.StartBundleContext + implements DoFnInvoker.ArgumentProvider<InputT, OutputT> { + + private StartBundleContext() { + doFn.super(); + } + + @Override + public PipelineOptions getPipelineOptions() { + return pipelineOptions; + } + + @Override + public PipelineOptions pipelineOptions() { + return pipelineOptions; + } + + @Override + public BoundedWindow window() { + throw new UnsupportedOperationException( + "Cannot access window outside of @ProcessElement and @OnTimer methods."); + } + + @Override + public DoFn<InputT, OutputT>.StartBundleContext startBundleContext( + DoFn<InputT, OutputT> doFn) { + return this; + } + + @Override + public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext( + DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException( + "Cannot access FinishBundleContext outside of @FinishBundle method."); + } + + @Override + public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException( + "Cannot access ProcessContext outside of @ProcessElement method."); + } + + @Override + public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException( + "Cannot access OnTimerContext outside of @OnTimer methods."); + } + + @Override + public RestrictionTracker<?> restrictionTracker() { + throw new UnsupportedOperationException( + "Cannot access RestrictionTracker outside of @ProcessElement method."); + } + + @Override + public State state(String stateId) { + throw new UnsupportedOperationException( + "Cannot access state outside of @ProcessElement and @OnTimer methods."); + } + + @Override + public Timer timer(String timerId) { + throw new UnsupportedOperationException( + "Cannot access timers outside of @ProcessElement and @OnTimer methods."); + } + } + + /** + * Provides arguments for a {@link DoFnInvoker} for {@link DoFn.ProcessElement @ProcessElement}. + */ + private class ProcessBundleContext + extends DoFn<InputT, OutputT>.ProcessContext + implements DoFnInvoker.ArgumentProvider<InputT, OutputT> { + + private ProcessBundleContext() { + doFn.super(); + } + + @Override + public BoundedWindow window() { + return currentWindow; + } + + @Override + public DoFn.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException( + "Cannot access StartBundleContext outside of @StartBundle method."); + } + + @Override + public DoFn.FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException( + "Cannot access FinishBundleContext outside of @FinishBundle method."); + } + + @Override + public ProcessContext processContext(DoFn<InputT, OutputT> doFn) { + return this; + } + + @Override + public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException("TODO: Add support for timers"); + } + + @Override + public RestrictionTracker<?> restrictionTracker() { + throw new UnsupportedOperationException("TODO: Add support for SplittableDoFn"); + } + + @Override + public State state(String stateId) { + throw new UnsupportedOperationException("TODO: Add support for state"); + } + + @Override + public Timer timer(String timerId) { + throw new UnsupportedOperationException("TODO: Add support for timers"); + } + + @Override + public PipelineOptions getPipelineOptions() { + return pipelineOptions; + } + + @Override + public PipelineOptions pipelineOptions() { + return pipelineOptions; + } + + @Override + public void output(OutputT output) { + outputTo(mainOutputConsumers, + WindowedValue.of( + output, + currentElement.getTimestamp(), + currentWindow, + currentElement.getPane())); + } + + @Override + public void outputWithTimestamp(OutputT output, Instant timestamp) { + outputTo(mainOutputConsumers, + WindowedValue.of( + output, + timestamp, + currentWindow, + currentElement.getPane())); + } + + @Override + public <T> void output(TupleTag<T> tag, T output) { + Collection<ThrowingConsumer<WindowedValue<T>>> consumers = (Collection) outputMap.get(tag); + if (consumers == null) { + throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); + } + outputTo(consumers, + WindowedValue.of( + output, + currentElement.getTimestamp(), + currentWindow, + currentElement.getPane())); + } + + @Override + public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { + Collection<ThrowingConsumer<WindowedValue<T>>> consumers = (Collection) outputMap.get(tag); + if (consumers == null) { + throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); + } + outputTo(consumers, + WindowedValue.of( + output, + timestamp, + currentWindow, + currentElement.getPane())); + } + + @Override + public InputT element() { + return currentElement.getValue(); + } + + @Override + public <T> T sideInput(PCollectionView<T> view) { + throw new UnsupportedOperationException("TODO: Support side inputs"); + } + + @Override + public Instant timestamp() { + return currentElement.getTimestamp(); + } + + @Override + public PaneInfo pane() { + return currentElement.getPane(); + } + + @Override + public void updateWatermark(Instant watermark) { + throw new UnsupportedOperationException("TODO: Add support for SplittableDoFn"); + } + } + + /** + * Provides arguments for a {@link DoFnInvoker} for {@link DoFn.FinishBundle @FinishBundle}. + */ + private class FinishBundleContext + extends DoFn<InputT, OutputT>.FinishBundleContext + implements DoFnInvoker.ArgumentProvider<InputT, OutputT> { + + private FinishBundleContext() { + doFn.super(); + } + + @Override + public PipelineOptions getPipelineOptions() { + return pipelineOptions; + } + + @Override + public PipelineOptions pipelineOptions() { + return pipelineOptions; + } + + @Override + public BoundedWindow window() { + throw new UnsupportedOperationException( + "Cannot access window outside of @ProcessElement and @OnTimer methods."); + } + + @Override + public DoFn<InputT, OutputT>.StartBundleContext startBundleContext( + DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException( + "Cannot access StartBundleContext outside of @StartBundle method."); + } + + @Override + public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext( + DoFn<InputT, OutputT> doFn) { + return this; + } + + @Override + public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException( + "Cannot access ProcessContext outside of @ProcessElement method."); + } + + @Override + public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException( + "Cannot access OnTimerContext outside of @OnTimer methods."); + } + + @Override + public RestrictionTracker<?> restrictionTracker() { + throw new UnsupportedOperationException( + "Cannot access RestrictionTracker outside of @ProcessElement method."); + } + + @Override + public State state(String stateId) { + throw new UnsupportedOperationException( + "Cannot access state outside of @ProcessElement and @OnTimer methods."); + } + + @Override + public Timer timer(String timerId) { + throw new UnsupportedOperationException( + "Cannot access timers outside of @ProcessElement and @OnTimer methods."); + } + + @Override + public void output(OutputT output, Instant timestamp, BoundedWindow window) { + outputTo(mainOutputConsumers, + WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING)); + } + + @Override + public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) { + Collection<ThrowingConsumer<WindowedValue<T>>> consumers = (Collection) outputMap.get(tag); + if (consumers == null) { + throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); + } + outputTo(consumers, + WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING)); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f1b4700f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java new file mode 100644 index 0000000..7cf0610 --- /dev/null +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/PTransformRunnerFactory.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.fn.harness; + +import com.google.common.collect.Multimap; +import java.io.IOException; +import java.util.Map; +import java.util.function.Consumer; +import java.util.function.Supplier; +import org.apache.beam.fn.harness.data.BeamFnDataClient; +import org.apache.beam.fn.harness.fn.ThrowingConsumer; +import org.apache.beam.fn.harness.fn.ThrowingRunnable; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.WindowedValue; + +/** + * A factory able to instantiate an appropriate handler for a given PTransform. + */ +public interface PTransformRunnerFactory<T> { + + /** + * Creates and returns a handler for a given PTransform. Note that the handler must support + * processing multiple bundles. The handler will be discarded if an error is thrown during + * element processing, or during execution of start/finish. + * + * @param pipelineOptions Pipeline options + * @param beamFnDataClient + * @param pTransformId The id of the PTransform. + * @param pTransform The PTransform definition. + * @param processBundleInstructionId A supplier containing the active process bundle instruction + * id. + * @param pCollections A mapping from PCollection id to PCollection definition. + * @param coders A mapping from coder id to coder definition. + * @param pCollectionIdsToConsumers A mapping from PCollection id to a collection of consumers. + * Note that if this handler is a consumer, it should register itself within this multimap under + * the appropriate PCollection ids. Also note that all output consumers needed by this PTransform + * (based on the values of the {@link RunnerApi.PTransform#getOutputsMap()} will have already + * registered within this multimap. + * @param addStartFunction A consumer to register a start bundle handler with. + * @param addFinishFunction A consumer to register a finish bundle handler with. + */ + T createRunnerForPTransform( + PipelineOptions pipelineOptions, + BeamFnDataClient beamFnDataClient, + String pTransformId, + RunnerApi.PTransform pTransform, + Supplier<String> processBundleInstructionId, + Map<String, RunnerApi.PCollection> pCollections, + Map<String, RunnerApi.Coder> coders, + Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers, + Consumer<ThrowingRunnable> addStartFunction, + Consumer<ThrowingRunnable> addFinishFunction) throws IOException; + + /** + * A registrar which can return a mapping from {@link RunnerApi.FunctionSpec#getUrn()} to + * a factory capable of instantiating an appropriate handler. + */ + interface Registrar { + /** + * Returns a mapping from {@link RunnerApi.FunctionSpec#getUrn()} to a factory capable of + * instantiating an appropriate handler. + */ + Map<String, PTransformRunnerFactory> getPTransformRunnerFactories(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/f1b4700f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java index 2a9cef8..1e73570 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java @@ -34,12 +34,12 @@ import java.util.Set; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import org.apache.beam.fn.harness.PTransformRunnerFactory; +import org.apache.beam.fn.harness.PTransformRunnerFactory.Registrar; import org.apache.beam.fn.harness.data.BeamFnDataClient; import org.apache.beam.fn.harness.fn.ThrowingConsumer; import org.apache.beam.fn.harness.fn.ThrowingRunnable; import org.apache.beam.fn.v1.BeamFnApi; -import org.apache.beam.runners.core.PTransformRunnerFactory; -import org.apache.beam.runners.core.PTransformRunnerFactory.Registrar; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.WindowedValue; http://git-wip-us.apache.org/repos/asf/beam/blob/f1b4700f/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java deleted file mode 100644 index 9339347..0000000 --- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.core; - -import static com.google.common.collect.Iterables.getOnlyElement; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.auto.service.AutoService; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Multimap; -import com.google.protobuf.BytesValue; -import java.io.IOException; -import java.util.Collection; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.function.Consumer; -import java.util.function.Supplier; -import org.apache.beam.fn.harness.data.BeamFnDataClient; -import org.apache.beam.fn.harness.fn.ThrowingConsumer; -import org.apache.beam.fn.harness.fn.ThrowingRunnable; -import org.apache.beam.fn.v1.BeamFnApi; -import org.apache.beam.runners.dataflow.util.CloudObject; -import org.apache.beam.runners.dataflow.util.CloudObjects; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.common.runner.v1.RunnerApi; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Registers as a consumer for data over the Beam Fn API. Multiplexes any received data - * to all consumers in the specified output map. - * - * <p>Can be re-used serially across {@link org.apache.beam.fn.v1.BeamFnApi.ProcessBundleRequest}s. - * For each request, call {@link #registerInputLocation()} to start and call - * {@link #blockTillReadFinishes()} to finish. - */ -public class BeamFnDataReadRunner<OutputT> { - - private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataReadRunner.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final String URN = "urn:org.apache.beam:source:runner:0.1"; - - /** A registrar which provides a factory to handle reading from the Fn Api Data Plane. */ - @AutoService(PTransformRunnerFactory.Registrar.class) - public static class Registrar implements - PTransformRunnerFactory.Registrar { - - @Override - public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() { - return ImmutableMap.of(URN, new Factory()); - } - } - - /** A factory for {@link BeamFnDataReadRunner}s. */ - static class Factory<OutputT> - implements PTransformRunnerFactory<BeamFnDataReadRunner<OutputT>> { - - @Override - public BeamFnDataReadRunner<OutputT> createRunnerForPTransform( - PipelineOptions pipelineOptions, - BeamFnDataClient beamFnDataClient, - String pTransformId, - RunnerApi.PTransform pTransform, - Supplier<String> processBundleInstructionId, - Map<String, RunnerApi.PCollection> pCollections, - Map<String, RunnerApi.Coder> coders, - Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers, - Consumer<ThrowingRunnable> addStartFunction, - Consumer<ThrowingRunnable> addFinishFunction) throws IOException { - - BeamFnApi.Target target = BeamFnApi.Target.newBuilder() - .setPrimitiveTransformReference(pTransformId) - .setName(getOnlyElement(pTransform.getOutputsMap().keySet())) - .build(); - RunnerApi.Coder coderSpec = coders.get(pCollections.get( - getOnlyElement(pTransform.getOutputsMap().values())).getCoderId()); - Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers = - (Collection) pCollectionIdsToConsumers.get( - getOnlyElement(pTransform.getOutputsMap().values())); - - BeamFnDataReadRunner<OutputT> runner = new BeamFnDataReadRunner<>( - pTransform.getSpec(), - processBundleInstructionId, - target, - coderSpec, - beamFnDataClient, - consumers); - addStartFunction.accept(runner::registerInputLocation); - addFinishFunction.accept(runner::blockTillReadFinishes); - return runner; - } - } - - private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor; - private final Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers; - private final Supplier<String> processBundleInstructionIdSupplier; - private final BeamFnDataClient beamFnDataClientFactory; - private final Coder<WindowedValue<OutputT>> coder; - private final BeamFnApi.Target inputTarget; - - private CompletableFuture<Void> readFuture; - - BeamFnDataReadRunner( - RunnerApi.FunctionSpec functionSpec, - Supplier<String> processBundleInstructionIdSupplier, - BeamFnApi.Target inputTarget, - RunnerApi.Coder coderSpec, - BeamFnDataClient beamFnDataClientFactory, - Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers) - throws IOException { - this.apiServiceDescriptor = functionSpec.getParameter().unpack(BeamFnApi.RemoteGrpcPort.class) - .getApiServiceDescriptor(); - this.inputTarget = inputTarget; - this.processBundleInstructionIdSupplier = processBundleInstructionIdSupplier; - this.beamFnDataClientFactory = beamFnDataClientFactory; - this.consumers = consumers; - - @SuppressWarnings("unchecked") - Coder<WindowedValue<OutputT>> coder = - (Coder<WindowedValue<OutputT>>) - CloudObjects.coderFromCloudObject( - CloudObject.fromSpec( - OBJECT_MAPPER.readValue( - coderSpec - .getSpec() - .getSpec() - .getParameter() - .unpack(BytesValue.class) - .getValue() - .newInput(), - Map.class))); - this.coder = coder; - } - - public void registerInputLocation() { - this.readFuture = beamFnDataClientFactory.forInboundConsumer( - apiServiceDescriptor, - KV.of(processBundleInstructionIdSupplier.get(), inputTarget), - coder, - this::multiplexToConsumers); - } - - public void blockTillReadFinishes() throws Exception { - LOG.debug("Waiting for process bundle instruction {} and target {} to close.", - processBundleInstructionIdSupplier.get(), inputTarget); - readFuture.get(); - } - - private void multiplexToConsumers(WindowedValue<OutputT> value) throws Exception { - for (ThrowingConsumer<WindowedValue<OutputT>> consumer : consumers) { - consumer.accept(value); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f1b4700f/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java deleted file mode 100644 index c2a996b..0000000 --- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.core; - -import static com.google.common.collect.Iterables.getOnlyElement; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.auto.service.AutoService; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Multimap; -import com.google.protobuf.BytesValue; -import java.io.IOException; -import java.util.Map; -import java.util.function.Consumer; -import java.util.function.Supplier; -import org.apache.beam.fn.harness.data.BeamFnDataClient; -import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer; -import org.apache.beam.fn.harness.fn.ThrowingConsumer; -import org.apache.beam.fn.harness.fn.ThrowingRunnable; -import org.apache.beam.fn.v1.BeamFnApi; -import org.apache.beam.runners.dataflow.util.CloudObject; -import org.apache.beam.runners.dataflow.util.CloudObjects; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.common.runner.v1.RunnerApi; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; - -/** - * Registers as a consumer with the Beam Fn Data Api. Consumes elements and encodes them for - * transmission. - * - * <p>Can be re-used serially across {@link org.apache.beam.fn.v1.BeamFnApi.ProcessBundleRequest}s. - * For each request, call {@link #registerForOutput()} to start and call {@link #close()} to finish. - */ -public class BeamFnDataWriteRunner<InputT> { - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private static final String URN = "urn:org.apache.beam:sink:runner:0.1"; - - /** A registrar which provides a factory to handle writing to the Fn Api Data Plane. */ - @AutoService(PTransformRunnerFactory.Registrar.class) - public static class Registrar implements - PTransformRunnerFactory.Registrar { - - @Override - public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() { - return ImmutableMap.of(URN, new Factory()); - } - } - - /** A factory for {@link BeamFnDataWriteRunner}s. */ - static class Factory<InputT> - implements PTransformRunnerFactory<BeamFnDataWriteRunner<InputT>> { - - @Override - public BeamFnDataWriteRunner<InputT> createRunnerForPTransform( - PipelineOptions pipelineOptions, - BeamFnDataClient beamFnDataClient, - String pTransformId, - RunnerApi.PTransform pTransform, - Supplier<String> processBundleInstructionId, - Map<String, RunnerApi.PCollection> pCollections, - Map<String, RunnerApi.Coder> coders, - Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers, - Consumer<ThrowingRunnable> addStartFunction, - Consumer<ThrowingRunnable> addFinishFunction) throws IOException { - BeamFnApi.Target target = BeamFnApi.Target.newBuilder() - .setPrimitiveTransformReference(pTransformId) - .setName(getOnlyElement(pTransform.getInputsMap().keySet())) - .build(); - RunnerApi.Coder coderSpec = coders.get( - pCollections.get(getOnlyElement(pTransform.getInputsMap().values())).getCoderId()); - BeamFnDataWriteRunner<InputT> runner = - new BeamFnDataWriteRunner<>( - pTransform.getSpec(), - processBundleInstructionId, - target, - coderSpec, - beamFnDataClient); - addStartFunction.accept(runner::registerForOutput); - pCollectionIdsToConsumers.put( - getOnlyElement(pTransform.getInputsMap().values()), - (ThrowingConsumer) - (ThrowingConsumer<WindowedValue<InputT>>) runner::consume); - addFinishFunction.accept(runner::close); - return runner; - } - } - - private final BeamFnApi.ApiServiceDescriptor apiServiceDescriptor; - private final BeamFnApi.Target outputTarget; - private final Coder<WindowedValue<InputT>> coder; - private final BeamFnDataClient beamFnDataClientFactory; - private final Supplier<String> processBundleInstructionIdSupplier; - - private CloseableThrowingConsumer<WindowedValue<InputT>> consumer; - - BeamFnDataWriteRunner( - RunnerApi.FunctionSpec functionSpec, - Supplier<String> processBundleInstructionIdSupplier, - BeamFnApi.Target outputTarget, - RunnerApi.Coder coderSpec, - BeamFnDataClient beamFnDataClientFactory) - throws IOException { - this.apiServiceDescriptor = functionSpec.getParameter().unpack(BeamFnApi.RemoteGrpcPort.class) - .getApiServiceDescriptor(); - this.beamFnDataClientFactory = beamFnDataClientFactory; - this.processBundleInstructionIdSupplier = processBundleInstructionIdSupplier; - this.outputTarget = outputTarget; - - @SuppressWarnings("unchecked") - Coder<WindowedValue<InputT>> coder = - (Coder<WindowedValue<InputT>>) - CloudObjects.coderFromCloudObject( - CloudObject.fromSpec( - OBJECT_MAPPER.readValue( - coderSpec - .getSpec() - .getSpec() - .getParameter() - .unpack(BytesValue.class) - .getValue() - .newInput(), - Map.class))); - this.coder = coder; - } - - public void registerForOutput() { - consumer = beamFnDataClientFactory.forOutboundConsumer( - apiServiceDescriptor, - KV.of(processBundleInstructionIdSupplier.get(), outputTarget), - coder); - } - - public void close() throws Exception { - consumer.close(); - } - - public void consume(WindowedValue<InputT> value) throws Exception { - consumer.accept(value); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/f1b4700f/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BoundedSourceRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BoundedSourceRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BoundedSourceRunner.java deleted file mode 100644 index 3338c3a..0000000 --- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BoundedSourceRunner.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.core; - -import com.google.auto.service.AutoService; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Multimap; -import com.google.protobuf.BytesValue; -import com.google.protobuf.InvalidProtocolBufferException; -import java.io.IOException; -import java.util.Collection; -import java.util.Map; -import java.util.function.Consumer; -import java.util.function.Supplier; -import org.apache.beam.fn.harness.data.BeamFnDataClient; -import org.apache.beam.fn.harness.fn.ThrowingConsumer; -import org.apache.beam.fn.harness.fn.ThrowingRunnable; -import org.apache.beam.sdk.common.runner.v1.RunnerApi; -import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.Source.Reader; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.util.SerializableUtils; -import org.apache.beam.sdk.util.WindowedValue; - -/** - * A runner which creates {@link Reader}s for each {@link BoundedSource} sent as an input and - * executes the {@link Reader}s read loop. - */ -public class BoundedSourceRunner<InputT extends BoundedSource<OutputT>, OutputT> { - - private static final String URN = "urn:org.apache.beam:source:java:0.1"; - - /** A registrar which provides a factory to handle Java {@link BoundedSource}s. */ - @AutoService(PTransformRunnerFactory.Registrar.class) - public static class Registrar implements - PTransformRunnerFactory.Registrar { - - @Override - public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() { - return ImmutableMap.of(URN, new Factory()); - } - } - - /** A factory for {@link BoundedSourceRunner}. */ - static class Factory<InputT extends BoundedSource<OutputT>, OutputT> - implements PTransformRunnerFactory<BoundedSourceRunner<InputT, OutputT>> { - @Override - public BoundedSourceRunner<InputT, OutputT> createRunnerForPTransform( - PipelineOptions pipelineOptions, - BeamFnDataClient beamFnDataClient, - String pTransformId, - RunnerApi.PTransform pTransform, - Supplier<String> processBundleInstructionId, - Map<String, RunnerApi.PCollection> pCollections, - Map<String, RunnerApi.Coder> coders, - Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers, - Consumer<ThrowingRunnable> addStartFunction, - Consumer<ThrowingRunnable> addFinishFunction) { - - ImmutableList.Builder<ThrowingConsumer<WindowedValue<?>>> consumers = ImmutableList.builder(); - for (String pCollectionId : pTransform.getOutputsMap().values()) { - consumers.addAll(pCollectionIdsToConsumers.get(pCollectionId)); - } - - @SuppressWarnings({"rawtypes", "unchecked"}) - BoundedSourceRunner<InputT, OutputT> runner = new BoundedSourceRunner( - pipelineOptions, - pTransform.getSpec(), - consumers.build()); - - // TODO: Remove and replace with source being sent across gRPC port - addStartFunction.accept(runner::start); - - ThrowingConsumer runReadLoop = - (ThrowingConsumer<WindowedValue<InputT>>) runner::runReadLoop; - for (String pCollectionId : pTransform.getInputsMap().values()) { - pCollectionIdsToConsumers.put( - pCollectionId, - runReadLoop); - } - - return runner; - } - } - - private final PipelineOptions pipelineOptions; - private final RunnerApi.FunctionSpec definition; - private final Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers; - - BoundedSourceRunner( - PipelineOptions pipelineOptions, - RunnerApi.FunctionSpec definition, - Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers) { - this.pipelineOptions = pipelineOptions; - this.definition = definition; - this.consumers = consumers; - } - - /** - * The runner harness is meant to send the source over the Beam Fn Data API which would be - * consumed by the {@link #runReadLoop}. Drop this method once the runner harness sends the - * source instead of unpacking it from the data block of the function specification. - */ - @Deprecated - public void start() throws Exception { - try { - // The representation here is defined as the java serialized representation of the - // bounded source object packed into a protobuf Any using a protobuf BytesValue wrapper. - byte[] bytes = definition.getParameter().unpack(BytesValue.class).getValue().toByteArray(); - @SuppressWarnings("unchecked") - InputT boundedSource = - (InputT) SerializableUtils.deserializeFromByteArray(bytes, definition.toString()); - runReadLoop(WindowedValue.valueInGlobalWindow(boundedSource)); - } catch (InvalidProtocolBufferException e) { - throw new IOException( - String.format("Failed to decode %s, expected %s", - definition.getParameter().getTypeUrl(), BytesValue.getDescriptor().getFullName()), - e); - } - } - - /** - * Creates a {@link Reader} for each {@link BoundedSource} and executes the {@link Reader}s - * read loop. See {@link Reader} for further details of the read loop. - * - * <p>Propagates any exceptions caused during reading or processing via a consumer to the - * caller. - */ - public void runReadLoop(WindowedValue<InputT> value) throws Exception { - try (Reader<OutputT> reader = value.getValue().createReader(pipelineOptions)) { - if (!reader.start()) { - // Reader has no data, immediately return - return; - } - do { - // TODO: Should this use the input window as the window for all the outputs? - WindowedValue<OutputT> nextValue = WindowedValue.timestampedValueInGlobalWindow( - reader.getCurrent(), reader.getCurrentTimestamp()); - for (ThrowingConsumer<WindowedValue<OutputT>> consumer : consumers) { - consumer.accept(nextValue); - } - } while (reader.advance()); - } - } - - @Override - public String toString() { - return definition.toString(); - } -}
