My first guess would be that you did not put all jars into the lib folder.

To help us understand this, do you start the cluster manually, or via YARN?

On Thu, Sep 24, 2015 at 4:59 PM, Hanan Meyer <ha...@scalabill.it> wrote:

> Hi
> Thanks for the fast response
> I Have tried the walk-around by excluding the Jars from the
> RemoteEnvironment's init line :
> ExecutionEnvironment env =
> ExecutionEnvironment.createRemoteEnvironment(FLINK_URL, FLINK_PORT);
> instrad of :
> ExecutionEnvironment env =
> ExecutionEnvironment.createRemoteEnvironment(FLINK_URL, FLINK_PORT,  list
> of Jars ......);
> I copied the jars to the Flink's Lib folder and when I submit my job I get
> the following exception which is  caused because
> Flink can't find my Jars and Types :
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Cannot initialize task 'CHAIN DataSource
> (at createInput(ExecutionEnvironment.java:502)
> (org.apache.flink.api.java.io.AvroInputFormat)) ->
> Filter (Filter at generateCsv(FlinkCSVProducer.java:51)) -> FlatMap
> (FlatMap at generateCsv(FlinkCSVProducer.java:78))':
> Deserializing the InputFormat (File Input
> (hdfs://localhost:9000/data/kpi/38fbbdef-d822-4e13-9031-faff907469df))
> failed:
> Could not read the user code wrapper: com.scalabill.it.pa.event.Event
> at org.apache.flink.client.program.Client.run(Client.java:413)
> at org.apache.flink.client.program.Client.run(Client.java:356)
> at org.apache.flink.client.program.Client.run(Client.java:349)
> at
>
> org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:89)
> at
> org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:82)
> at
>
> org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:71)
> at
>
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
> at org.apache.flink.api.java.DataSet.count(DataSet.java:391)
> at
>
> com.scalabill.it.pa.core.FlinkCSVProducer.generateCsv(FlinkCSVProducer.java:70)
> at
>
> com.scalabill.it.pa.core.FlinkDriver.generateChannelsCSVsforThisBackendServer(FlinkDriver.java:94)
> Have I been doing the walk-around currently ?
> Can you try to reproduce it in your environment  ?
> Thanks for your attention!
> Hanan Meyer
>
> On Thu, Sep 24, 2015 at 4:58 PM, Till Rohrmann <till.rohrm...@gmail.com>
> wrote:
>
> > Hi Hanan,
> >
> > you're right that currently every time you submit a job to the Flink
> > cluster, all user code jars are uploaded and overwrite possibly existing
> > files. This is not really necessary if they don't change. Maybe we should
> > add a check that already existing files on the JobManager are not
> uploaded
> > again by the JobClient. This should improve the performance for your use
> > case.
> >
> > The corresponding JIRA issue is
> > https://issues.apache.org/jira/browse/FLINK-2760.
> >
> > Cheers,
> > Till
> >
> > On Thu, Sep 24, 2015 at 1:31 PM, Hanan Meyer <ha...@scalabill.it> wrote:
> >
> > > Hello All
> > >
> > > I use Flink in order to filter data from Hdfs and write it back as CSV.
> > >
> > > I keep getting the "Checking and uploading JAR files" on every DataSet
> > > filtering action or
> > > executionEnvironment execution.
> > >
> > > I use ExecutionEnvironment.createRemoteEnvironment(ip+jars..) because I
> > > launch Flink from
> > > a J2EE Aplication Server .
> > >
> > > The Jars serialization and transportation takes a huge part of the
> > > execution time .
> > > Is there a way to force Flink to pass the Jars only once?
> > >
> > > Please advise
> > >
> > > Thanks,
> > >
> > > Hanan Meyer
> > >
> >
>

Reply via email to