[ 
https://issues.apache.org/jira/browse/FLINK-2617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14730694#comment-14730694
 ] 

Fabian Hueske commented on FLINK-2617:
--------------------------------------

Yes, that could be a fix.
But I think it would be good to know, why that happens because it might bite us 
at other places as well. Maybe we need to guard all calls to Hadoop classes by 
locks.
I looked into the code and it is not obvious. All HadoopInputFormats (Flink's 
wrapper) are configured with different Configuration objects. 

> ConcurrentModificationException when using HCatRecordReader to access a hive 
> table
> ----------------------------------------------------------------------------------
>
>                 Key: FLINK-2617
>                 URL: https://issues.apache.org/jira/browse/FLINK-2617
>             Project: Flink
>          Issue Type: Bug
>          Components: Hadoop Compatibility
>            Reporter: Arnaud Linz
>            Priority: Critical
>
> I don't know if it's a hcat or a flink problem, but when reading a hive table 
> in a cluster with many slots (20 threads per container), I systematically run 
> into a {{ConcurrentModificationException}} in a copy method of a 
> {{Configuration}} object that change during the copy.
> From what I understand, this object comes from 
> {{TaskAttemptContext.getConfiguration()}} created by 
> {{HadoopUtils.instantiateTaskAttemptContext(configuration, new 
> TaskAttemptID());}} 
> Maybe the {{job.Configuration}} object passed to the constructor of 
> {{HadoopInputFormatBase}} should be cloned somewhere?
> Stack trace is :
> {code}
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Job execution failed.
> org.apache.flink.client.program.Client.run(Client.java:413)
> org.apache.flink.client.program.Client.run(Client.java:356)
> org.apache.flink.client.program.Client.run(Client.java:349)
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
> com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:73)
> com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:69)
> com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:50)
> com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:88)
> com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44)
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> java.lang.reflect.Method.invoke(Method.java:606)
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
> org.apache.flink.client.program.Client.run(Client.java:315)
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582)
> org.apache.flink.client.CliFrontend.run(CliFrontend.java:288)
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878)
> org.apache.flink.client.CliFrontend.main(CliFrontend.java:920)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
> scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
> akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> akka.actor.ActorCell.invoke(ActorCell.scala:487)
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> akka.dispatch.Mailbox.run(Mailbox.scala:221)
> akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by:  java.util.ConcurrentModificationException
> java.util.HashMap$HashIterator.nextEntry(HashMap.java:926)
> java.util.HashMap$KeyIterator.next(HashMap.java:960)
> java.util.AbstractCollection.addAll(AbstractCollection.java:341)
> java.util.HashSet.<init>(HashSet.java:117)
> org.apache.hadoop.conf.Configuration.<init>(Configuration.java:554)
> org.apache.hadoop.mapred.JobConf.<init>(JobConf.java:439)
> org.apache.hive.hcatalog.common.HCatUtil.getJobConfFromContext(HCatUtil.java:637)
> org.apache.hive.hcatalog.mapreduce.HCatRecordReader.createBaseRecordReader(HCatRecordReader.java:112)
> org.apache.hive.hcatalog.mapreduce.HCatRecordReader.initialize(HCatRecordReader.java:91)
> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.open(HadoopInputFormatBase.java:182)
> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.open(HadoopInputFormatBase.java:56)
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> java.lang.Thread.run(Thread.java:744)
> {code}
> Flink "user" code looks like:
> {code}
> import java.io.IOException;
> import java.io.Serializable;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.common.functions.RichFlatMapFunction;
> import org.apache.flink.api.common.functions.RichMapFunction;
> import org.apache.flink.api.common.io.FileOutputFormat;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.core.fs.FileSystem.WriteMode;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import org.apache.flink.util.Collector;
> import org.apache.hadoop.io.NullWritable;
> import org.apache.hadoop.io.compress.CompressionCodec;
> import org.apache.hadoop.mapreduce.InputFormat;
> import org.apache.hadoop.mapreduce.Job;
> import org.apache.hive.hcatalog.data.DefaultHCatRecord;
> import org.apache.hive.hcatalog.data.schema.HCatSchema;
> import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
> (...) 
>         final Job job = Job.getInstance();
>         @SuppressWarnings({ "unchecked", "rawtypes" })
>         final HadoopInputFormat<NullWritable, DefaultHCatRecord> inputFormat 
> = new HadoopInputFormat<NullWritable, 
>         DefaultHCatRecord>(
>             (InputFormat) HCatInputFormat.setInput(job, dbName, tableName, 
> filter), //
>             NullWritable.class, //
>             DefaultHCatRecord.class, //
>             job);
>         final HCatSchema inputSchema = 
> HCatInputFormat.getTableSchema(job.getConfiguration());
>         @SuppressWarnings("serial")
>         final DataSet<T> dataSet = cluster
>             .createInput(inputFormat)
>             .flatMap(new FlatMapFunction<Tuple2<NullWritable, 
> DefaultHCatRecord>, T>() {
>                 @Override
>                 public void flatMap(Tuple2<NullWritable, DefaultHCatRecord> 
> value, Collector<T> out) throws Exception { // NOPMD
>                     final T record = createBean(value.f1, inputSchema);
>                     out.collect(record);
>                 }
>             }).returns(beanClass);
> (...)            
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to