Thank you Boyuan for the explanation! This explains why it did not work since 
Samza does not wire in SamzaPipelineRunner when executing in portable mode yet. 

I will create a ticket to update Samza runner.

Best,
Ke

> On Feb 4, 2021, at 12:07 PM, Boyuan Zhang <[email protected]> wrote:
> 
> Hi Ke,
> 
>  is it expected that Create.of will be expanded to a SDF
> In Java SDK, Create.of will be expanded into CreateSource, which will be 
> wrapped into SDF implementation.
> 
>  with regular pardo:v1 urn?
> No, the runner should run SplittableParDoExpander[1] to expand SDF into 
> SPLITTABLE_PAIR_WITH_RESTRICTION_URN, 
> SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN and 
> SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN.  
> 
> I do see that SamzaPipelineRunner running the expansion[2]. Can you double 
> check whether your job invokes that code path?
> [1]  
> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SplittableParDoExpander.java
>  
> <https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SplittableParDoExpander.java>
> [2] 
> https://github.com/apache/beam/blob/master/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java#L42-L47
>  
> <https://github.com/apache/beam/blob/master/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java#L42-L47>
>  
> 
> On Thu, Feb 4, 2021 at 11:31 AM Ke Wu <[email protected] 
> <mailto:[email protected]>> wrote:
> Hello Beamers,
> 
> I am trying out a simple pipeline to be executed on PortableRunner:
> 
> ````
> PortablePipelineOptions options = 
> PipelineOptionsFactory.fromArgs(args).as(PortablePipelineOptions.class);
> options.setJobEndpoint(some_url);
> options.setDefaultEnvironmentType("LOOPBACK");
> options.setRunner(PortableRunner.class);
> 
> Pipeline pipeline = Pipeline.create(options);
> 
> pipeline
>     .apply(Create.of("1", "2", "3”))
>     .apply(…print to console...);
> 
> pipeline.run()
> ```
> 
> This pipeline works with runners such as SamzaRunner, however, when in 
> portable mode, it does not work. 
> 
> I did some debugging and it turns out that it failed because when 
> Read.BoundedSourceAsSDFWrapperFn processElement(), the corresponding 
> RestrictionTracke is null. This seems to be caused the expanded SDF transform 
> has urn of "beam:transform:pardo:v1”, in which case FnApiDoFnRunner is 
> created with 
> 
> ```
> mainInputConsumer = this::processElementForParDo; 
> ```
> 
> which does not create tracker at all. I do see the other processing method 
> such as 
> processElementForSplitRestriction()
> processElementForWindowObservingSplitRestriction()
> processElementForTruncateRestriction() 
> 
> etc are creating trackers properly before invoking DoFn, however, they are 
> requiring a different Urn for the Transform.
> 
> My questions here are, did I miss anything? is it expected that Create.of 
> will be expanded to a SDF with regular pardo:v1 urn? If Yes, then what is the 
> expected behavior when FnApiDoFnRunner invokes 
> Read.BoundedSourceAsSDFWrapperFn?
> 
> Best,
> Ke
>  
> 

Reply via email to