[ https://issues.apache.org/jira/browse/BEAM-2939?focusedWorklogId=115561&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-115561 ]
ASF GitHub Bot logged work on BEAM-2939: ---------------------------------------- Author: ASF GitHub Bot Created on: 25/Jun/18 17:50 Start Date: 25/Jun/18 17:50 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #5566: [BEAM-2939, BEAM-3743, BEAM-3833] Supports SDF in Reference Runner URL: https://github.com/apache/beam/pull/5566#discussion_r197878475 ########## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ########## @@ -94,399 +47,145 @@ import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; -import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; -import org.apache.beam.sdk.util.CombineFnUtil; -import org.apache.beam.sdk.util.DoFnInfo; -import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; -import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.WindowingStrategy; import org.joda.time.Instant; /** - * A {@link DoFnRunner} specific to integrating with the Fn Api. This is to remove the layers - * of abstraction caused by StateInternals/TimerInternals since they model state and timer - * concepts differently. + * A {@link DoFnRunner} specific to integrating with the Fn Api. This is to remove the layers of + * abstraction caused by StateInternals/TimerInternals since they model state and timer concepts + * differently. */ -public class FnApiDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> { - - private final ProcessBundleContext processContext; - private final FinishBundleContext finishBundleContext; - private StartBundleContext startBundleContext; - - /** - * A registrar which provides a factory to handle Java {@link DoFn}s. - */ +public class FnApiDoFnRunner<InputT, OutputT> + implements DoFnPTransformRunnerFactory.DoFnPTransformRunner<InputT> { + /** A registrar which provides a factory to handle Java {@link DoFn}s. */ @AutoService(PTransformRunnerFactory.Registrar.class) - public static class Registrar implements - PTransformRunnerFactory.Registrar { - + public static class Registrar implements PTransformRunnerFactory.Registrar { @Override public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() { - return ImmutableMap.of( - PTransformTranslation.PAR_DO_TRANSFORM_URN, new NewFactory(), - ParDoTranslation.CUSTOM_JAVA_DO_FN_URN, new Factory()); + return ImmutableMap.of(PTransformTranslation.PAR_DO_TRANSFORM_URN, new Factory()); } } - /** A factory for {@link FnApiDoFnRunner}. */ static class Factory<InputT, OutputT> - implements PTransformRunnerFactory<DoFnRunner<InputT, OutputT>> { - + extends DoFnPTransformRunnerFactory< + InputT, InputT, OutputT, FnApiDoFnRunner<InputT, OutputT>> { @Override - public DoFnRunner<InputT, OutputT> createRunnerForPTransform( - PipelineOptions pipelineOptions, - BeamFnDataClient beamFnDataClient, - BeamFnStateClient beamFnStateClient, - String ptransformId, - PTransform pTransform, - Supplier<String> processBundleInstructionId, - Map<String, PCollection> pCollections, - Map<String, RunnerApi.Coder> coders, - Map<String, RunnerApi.WindowingStrategy> windowingStrategies, - Multimap<String, FnDataReceiver<WindowedValue<?>>> pCollectionIdsToConsumers, - Consumer<ThrowingRunnable> addStartFunction, - Consumer<ThrowingRunnable> addFinishFunction) { - - // For every output PCollection, create a map from output name to Consumer - ImmutableListMultimap.Builder<TupleTag<?>, FnDataReceiver<WindowedValue<?>>> - tagToOutputMapBuilder = ImmutableListMultimap.builder(); - for (Map.Entry<String, String> entry : pTransform.getOutputsMap().entrySet()) { - tagToOutputMapBuilder.putAll( - new TupleTag<>(entry.getKey()), - pCollectionIdsToConsumers.get(entry.getValue())); - } - ListMultimap<TupleTag<?>, FnDataReceiver<WindowedValue<?>>> tagToOutputMap = - tagToOutputMapBuilder.build(); - - // Get the DoFnInfo from the serialized blob. - ByteString serializedFn = pTransform.getSpec().getPayload(); - @SuppressWarnings({"unchecked", "rawtypes"}) - DoFnInfo<InputT, OutputT> doFnInfo = (DoFnInfo) SerializableUtils.deserializeFromByteArray( - serializedFn.toByteArray(), "DoFnInfo"); - - @SuppressWarnings({"unchecked", "rawtypes"}) - DoFnRunner<InputT, OutputT> runner = - new FnApiDoFnRunner<>( - pipelineOptions, - beamFnStateClient, - ptransformId, - processBundleInstructionId, - doFnInfo.getDoFn(), - doFnInfo.getInputCoder(), - (Collection<FnDataReceiver<WindowedValue<OutputT>>>) - (Collection) tagToOutputMap.get(doFnInfo.getMainOutput()), - tagToOutputMap, - ImmutableMap.of(), - doFnInfo.getWindowingStrategy()); - - registerHandlers( - runner, - pTransform, - ImmutableSet.of(), - addStartFunction, - addFinishFunction, - pCollectionIdsToConsumers); - return runner; + public FnApiDoFnRunner<InputT, OutputT> createRunner(Context<InputT, OutputT> context) { + return new FnApiDoFnRunner<>(context); } } - static class NewFactory<InputT, OutputT> - implements PTransformRunnerFactory<DoFnRunner<InputT, OutputT>> { - - @Override - public DoFnRunner<InputT, OutputT> createRunnerForPTransform( - PipelineOptions pipelineOptions, - BeamFnDataClient beamFnDataClient, - BeamFnStateClient beamFnStateClient, - String ptransformId, - RunnerApi.PTransform pTransform, - Supplier<String> processBundleInstructionId, - Map<String, RunnerApi.PCollection> pCollections, - Map<String, RunnerApi.Coder> coders, - Map<String, RunnerApi.WindowingStrategy> windowingStrategies, - Multimap<String, FnDataReceiver<WindowedValue<?>>> pCollectionIdsToConsumers, - Consumer<ThrowingRunnable> addStartFunction, - Consumer<ThrowingRunnable> addFinishFunction) { - - DoFn<InputT, OutputT> doFn; - TupleTag<OutputT> mainOutputTag; - Coder<InputT> inputCoder; - WindowingStrategy<InputT, ?> windowingStrategy; - - ImmutableMap.Builder<TupleTag<?>, SideInputSpec> tagToSideInputSpecMap = - ImmutableMap.builder(); - ParDoPayload parDoPayload; - try { - RehydratedComponents rehydratedComponents = RehydratedComponents.forComponents( - RunnerApi.Components.newBuilder() - .putAllCoders(coders).putAllWindowingStrategies(windowingStrategies).build()); - parDoPayload = ParDoPayload.parseFrom(pTransform.getSpec().getPayload()); - doFn = (DoFn) ParDoTranslation.getDoFn(parDoPayload); - mainOutputTag = (TupleTag) ParDoTranslation.getMainOutputTag(parDoPayload); - String mainInputTag = Iterables.getOnlyElement(Sets.difference( - pTransform.getInputsMap().keySet(), parDoPayload.getSideInputsMap().keySet())); - RunnerApi.PCollection mainInput = - pCollections.get(pTransform.getInputsOrThrow(mainInputTag)); - inputCoder = (Coder<InputT>) rehydratedComponents.getCoder( - mainInput.getCoderId()); - windowingStrategy = (WindowingStrategy) rehydratedComponents.getWindowingStrategy( - mainInput.getWindowingStrategyId()); - - // Build the map from tag id to side input specification - for (Map.Entry<String, RunnerApi.SideInput> entry - : parDoPayload.getSideInputsMap().entrySet()) { - String sideInputTag = entry.getKey(); - RunnerApi.SideInput sideInput = entry.getValue(); - checkArgument( - Materializations.MULTIMAP_MATERIALIZATION_URN.equals( - sideInput.getAccessPattern().getUrn()), - "This SDK is only capable of dealing with %s materializations " - + "but was asked to handle %s for PCollectionView with tag %s.", - Materializations.MULTIMAP_MATERIALIZATION_URN, - sideInput.getAccessPattern().getUrn(), - sideInputTag); - - RunnerApi.PCollection sideInputPCollection = - pCollections.get(pTransform.getInputsOrThrow(sideInputTag)); - WindowingStrategy sideInputWindowingStrategy = - rehydratedComponents.getWindowingStrategy( - sideInputPCollection.getWindowingStrategyId()); - tagToSideInputSpecMap.put( - new TupleTag<>(entry.getKey()), - SideInputSpec.create( - rehydratedComponents.getCoder(sideInputPCollection.getCoderId()), - sideInputWindowingStrategy.getWindowFn().windowCoder(), - PCollectionViewTranslation.viewFnFromProto(entry.getValue().getViewFn()), - PCollectionViewTranslation.windowMappingFnFromProto( - entry.getValue().getWindowMappingFn()))); - } - } catch (InvalidProtocolBufferException exn) { - throw new IllegalArgumentException("Malformed ParDoPayload", exn); - } catch (IOException exn) { - throw new IllegalArgumentException("Malformed ParDoPayload", exn); - } - - ImmutableListMultimap.Builder<TupleTag<?>, FnDataReceiver<WindowedValue<?>>> - tagToConsumerBuilder = ImmutableListMultimap.builder(); - for (Map.Entry<String, String> entry : pTransform.getOutputsMap().entrySet()) { - tagToConsumerBuilder.putAll( - new TupleTag<>(entry.getKey()), pCollectionIdsToConsumers.get(entry.getValue())); - } - ListMultimap<TupleTag<?>, FnDataReceiver<WindowedValue<?>>> tagToConsumer = - tagToConsumerBuilder.build(); - - @SuppressWarnings({"unchecked", "rawtypes"}) - DoFnRunner<InputT, OutputT> runner = new FnApiDoFnRunner<>( - pipelineOptions, - beamFnStateClient, - ptransformId, - processBundleInstructionId, - doFn, - inputCoder, - (Collection<FnDataReceiver<WindowedValue<OutputT>>>) (Collection) - tagToConsumer.get(mainOutputTag), - tagToConsumer, - tagToSideInputSpecMap.build(), - windowingStrategy); - registerHandlers( - runner, - pTransform, - parDoPayload.getSideInputsMap().keySet(), - addStartFunction, - addFinishFunction, - pCollectionIdsToConsumers); - return runner; - } - } - - private static <InputT, OutputT> void registerHandlers( - DoFnRunner<InputT, OutputT> runner, - RunnerApi.PTransform pTransform, - Set<String> sideInputLocalNames, - Consumer<ThrowingRunnable> addStartFunction, - Consumer<ThrowingRunnable> addFinishFunction, - Multimap<String, FnDataReceiver<WindowedValue<?>>> pCollectionIdsToConsumers) { - // Register the appropriate handlers. - addStartFunction.accept(runner::startBundle); - for (String localInputName - : Sets.difference(pTransform.getInputsMap().keySet(), sideInputLocalNames)) { - pCollectionIdsToConsumers.put( - pTransform.getInputsOrThrow(localInputName), - (FnDataReceiver) (FnDataReceiver<WindowedValue<InputT>>) runner::processElement); - } - addFinishFunction.accept(runner::finishBundle); - } - ////////////////////////////////////////////////////////////////////////////////////////////////// - private final PipelineOptions pipelineOptions; - private final BeamFnStateClient beamFnStateClient; - private final String ptransformId; - private final Supplier<String> processBundleInstructionId; - private final DoFn<InputT, OutputT> doFn; - private final Coder<InputT> inputCoder; + private final Context<InputT, OutputT> context; private final Collection<FnDataReceiver<WindowedValue<OutputT>>> mainOutputConsumers; - private final Multimap<TupleTag<?>, FnDataReceiver<WindowedValue<?>>> outputMap; - private final Map<TupleTag<?>, SideInputSpec> sideInputSpecMap; - private final Map<StateKey, Object> stateKeyObjectCache; - private final WindowingStrategy windowingStrategy; + private FnApiStateAccessor stateAccessor; private final DoFnSignature doFnSignature; private final DoFnInvoker<InputT, OutputT> doFnInvoker; - private final StateBinder stateBinder; - private final Collection<ThrowingRunnable> stateFinalizers; - /** - * The lifetime of this member is only valid during {@link #processElement} - * and is null otherwise. - */ + private final DoFn<InputT, OutputT>.StartBundleContext startBundleContext; + private final ProcessBundleContext processContext; + private final DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext; + + /** Only valid during {@link #processElement}, null otherwise. */ private WindowedValue<InputT> currentElement; - /** - * The lifetime of this member is only valid during {@link #processElement} - * and is null otherwise. Review comment: Please keep this comment. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 115561) > Fn API streaming SDF support > ---------------------------- > > Key: BEAM-2939 > URL: https://issues.apache.org/jira/browse/BEAM-2939 > Project: Beam > Issue Type: Improvement > Components: beam-model > Reporter: Henning Rohde > Assignee: Eugene Kirpichov > Priority: Major > Labels: portability > Time Spent: 1h 40m > Remaining Estimate: 0h > > The Fn API should support streaming SDF. Detailed design TBD. > Once design is ready, expand subtasks similarly to BEAM-2822. -- This message was sent by Atlassian JIRA (v7.6.3#76005)