[ 
https://issues.apache.org/jira/browse/BEAM-3587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352390#comment-16352390
 ] 

Jean-Baptiste Onofré commented on BEAM-3587:
--------------------------------------------

Using 2.3.0-SNAPSHOT artifacts provided by Maven with the following pipeline 
works without problem with the Flink runner:

{code}
    public static final void main(String args[]) throws Exception {
        PipelineOptions options = 
PipelineOptionsFactory.fromArgs(args).create();
        Pipeline pipeline = Pipeline.create(options);

        pipeline
                .apply(TextIO.read().from("/home/jbonofre/artists.csv"))
                .apply(ParDo.of(new DoFn<String, String>() {
                    @ProcessElement
                    public void processElement(ProcessContext processContext) {
                        String element = processContext.element();
                        String[] split = element.split(",");
                        processContext.output(split[1]);
                    }
                }))
                .apply(Count.<String>perElement())
                .apply(MapElements.via(new SimpleFunction<KV<String, Long>, 
String>() {
                    public String apply(KV<String, Long> element) {
                        return "{\"" + element.getKey() + "\": \"" + 
element.getValue() + "\"}";
                    }
                }))
                .apply(TextIO.write().to("/home/jbonofre/label.json"));

        pipeline.run();
    }
{code}

I'm now testing with artifacts built by Gradle.

> User reports TextIO failure in FlinkRunner on master
> ----------------------------------------------------
>
>                 Key: BEAM-3587
>                 URL: https://issues.apache.org/jira/browse/BEAM-3587
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Kenneth Knowles
>            Assignee: Ben Sidhom
>            Priority: Blocker
>             Fix For: 2.3.0
>
>
> Reported here: 
> [https://lists.apache.org/thread.html/47b16c94032392782505415e010970fd2a9480891c55c2f7b5de92bd@%3Cuser.beam.apache.org%3E]
> "I'm trying to run a pipeline containing just a TextIO.read() step on a Flink 
> cluster, using the latest Beam git revision (ff37337). The job fails to start 
> with the Exception:
>   {{java.lang.UnsupportedOperationException: The transform  is currently not 
> supported.}}
> It does work with Beam 2.2.0 though. All code, logs, and reproduction steps  
> [https://github.com/pelletier/beam-flink-example]";
> My initial thoughts: I have a guess that this has to do with switching to 
> running from a portable pipeline representation, and it looks like there's a 
> non-composite transform with an empty URN and it threw a bad error message. 
> We can try to root cause but may also mitigate short-term by removing the 
> round-trip through pipeline proto for now.
> What is curious is that the ValidatesRunner and WordCountIT are working - 
> they only run on a local Flink, yet this seems to be a translation issue that 
> would occur for local or distributed runs.
> We need to certainly run this repro on the RC if we don't totally get to the 
> bottom of it quickly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to