Hi Hanan, Could you please, by any chance, run your program in a local cluster with your dependencies in the lib folder? You can use "./bin/start-local.sh" and try submitting your program to localhost. That would help us to find out if it is a YARN issue.
Thanks, Max On Fri, Sep 25, 2015 at 8:39 AM, Hanan Meyer <ha...@scalabill.it> wrote: > Hi > > I rechecked that I put all my Jars in the Lib folder . > I have also noticed that it fails while loading my first Pojo class . > > I start the cluster via Yarn using Flink 0.9.1 . > > Thanks , > > Mr Hanan Meyer > > > On Thu, Sep 24, 2015 at 6:08 PM, Stephan Ewen <se...@apache.org> wrote: > > > My first guess would be that you did not put all jars into the lib > folder. > > > > To help us understand this, do you start the cluster manually, or via > YARN? > > > > On Thu, Sep 24, 2015 at 4:59 PM, Hanan Meyer <ha...@scalabill.it> wrote: > > > > > Hi > > > Thanks for the fast response > > > I Have tried the walk-around by excluding the Jars from the > > > RemoteEnvironment's init line : > > > ExecutionEnvironment env = > > > ExecutionEnvironment.createRemoteEnvironment(FLINK_URL, FLINK_PORT); > > > instrad of : > > > ExecutionEnvironment env = > > > ExecutionEnvironment.createRemoteEnvironment(FLINK_URL, FLINK_PORT, > list > > > of Jars ......); > > > I copied the jars to the Flink's Lib folder and when I submit my job I > > get > > > the following exception which is caused because > > > Flink can't find my Jars and Types : > > > org.apache.flink.client.program.ProgramInvocationException: The program > > > execution failed: Cannot initialize task 'CHAIN DataSource > > > (at createInput(ExecutionEnvironment.java:502) > > > (org.apache.flink.api.java.io.AvroInputFormat)) -> > > > Filter (Filter at generateCsv(FlinkCSVProducer.java:51)) -> FlatMap > > > (FlatMap at generateCsv(FlinkCSVProducer.java:78))': > > > Deserializing the InputFormat (File Input > > > (hdfs://localhost:9000/data/kpi/38fbbdef-d822-4e13-9031-faff907469df)) > > > failed: > > > Could not read the user code wrapper: com.scalabill.it.pa.event.Event > > > at org.apache.flink.client.program.Client.run(Client.java:413) > > > at org.apache.flink.client.program.Client.run(Client.java:356) > > > at org.apache.flink.client.program.Client.run(Client.java:349) > > > at > > > > > > > > > org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89) > > > at > > > > > > org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82) > > > at > > > > > > > > > org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:71) > > > at > > > > > > > > > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) > > > at org.apache.flink.api.java.DataSet.count(DataSet.java:391) > > > at > > > > > > > > > com.scalabill.it.pa.core.FlinkCSVProducer.generateCsv(FlinkCSVProducer.java:70) > > > at > > > > > > > > > com.scalabill.it.pa.core.FlinkDriver.generateChannelsCSVsforThisBackendServer(FlinkDriver.java:94) > > > Have I been doing the walk-around currently ? > > > Can you try to reproduce it in your environment ? > > > Thanks for your attention! > > > Hanan Meyer > > > > > > On Thu, Sep 24, 2015 at 4:58 PM, Till Rohrmann < > till.rohrm...@gmail.com> > > > wrote: > > > > > > > Hi Hanan, > > > > > > > > you're right that currently every time you submit a job to the Flink > > > > cluster, all user code jars are uploaded and overwrite possibly > > existing > > > > files. This is not really necessary if they don't change. Maybe we > > should > > > > add a check that already existing files on the JobManager are not > > > uploaded > > > > again by the JobClient. This should improve the performance for your > > use > > > > case. > > > > > > > > The corresponding JIRA issue is > > > > https://issues.apache.org/jira/browse/FLINK-2760. > > > > > > > > Cheers, > > > > Till > > > > > > > > On Thu, Sep 24, 2015 at 1:31 PM, Hanan Meyer <ha...@scalabill.it> > > wrote: > > > > > > > > > Hello All > > > > > > > > > > I use Flink in order to filter data from Hdfs and write it back as > > CSV. > > > > > > > > > > I keep getting the "Checking and uploading JAR files" on every > > DataSet > > > > > filtering action or > > > > > executionEnvironment execution. > > > > > > > > > > I use ExecutionEnvironment.createRemoteEnvironment(ip+jars..) > > because I > > > > > launch Flink from > > > > > a J2EE Aplication Server . > > > > > > > > > > The Jars serialization and transportation takes a huge part of the > > > > > execution time . > > > > > Is there a way to force Flink to pass the Jars only once? > > > > > > > > > > Please advise > > > > > > > > > > Thanks, > > > > > > > > > > Hanan Meyer > > > > > > > > > > > > > > >