Hello Beamers,

I am trying out a simple pipeline to be executed on PortableRunner:

````
PortablePipelineOptions options = 
PipelineOptionsFactory.fromArgs(args).as(PortablePipelineOptions.class);
options.setJobEndpoint(some_url);
options.setDefaultEnvironmentType("LOOPBACK");
options.setRunner(PortableRunner.class);

Pipeline pipeline = Pipeline.create(options);

pipeline
    .apply(Create.of("1", "2", "3”))
    .apply(…print to console...);

pipeline.run()
```

This pipeline works with runners such as SamzaRunner, however, when in portable 
mode, it does not work. 

I did some debugging and it turns out that it failed because when 
Read.BoundedSourceAsSDFWrapperFn processElement(), the corresponding 
RestrictionTracke is null. This seems to be caused the expanded SDF transform 
has urn of "beam:transform:pardo:v1”, in which case FnApiDoFnRunner is created 
with 

```
mainInputConsumer = this::processElementForParDo; 
```

which does not create tracker at all. I do see the other processing method such 
as 
processElementForSplitRestriction()
processElementForWindowObservingSplitRestriction()
processElementForTruncateRestriction() 

etc are creating trackers properly before invoking DoFn, however, they are 
requiring a different Urn for the Transform.

My questions here are, did I miss anything? is it expected that Create.of will 
be expanded to a SDF with regular pardo:v1 urn? If Yes, then what is the 
expected behavior when FnApiDoFnRunner invokes Read.BoundedSourceAsSDFWrapperFn?

Best,
Ke
 

Reply via email to