Re: OptionalDataException in spark
Hi, We are encountering java OptionalDataException in one of our spark jobs. All the tasks of a stage passes (or atleast we do not see any error), but stage fails with above exception while getting task result. And and this exception gets printed on driver. Any pointers in this regard would be helpful. Here is the stack-trace: java.io.OptionalDataException at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1555) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:174) at scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:174) at scala.collection.mutable.HashTable$class.init(HashTable.scala:109) at scala.collection.mutable.HashMap.init(HashMap.scala:40) at scala.collection.mutable.HashMap.readObject(HashMap.scala:174) at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1$$anonfun$apply$mcV$sp$2.apply(TaskResult.scala:67) at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1$$anonfun$apply$mcV$sp$2.apply(TaskResult.scala:66) at scala.collection.immutable.Range.foreach(Range.scala:160) at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:66) at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply(TaskResult.scala:55) at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply(TaskResult.scala:55) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1326) at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:55) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2076) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2025) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:108) at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:64) at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply(TaskResultGetter.scala:63) at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply(TaskResultGetter.scala:63) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945) at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:62) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Thanks [Image removed by sender.]<http://adobe.com/> Ravi Aggarwal 408.536.6719 (tel) E11-355 Computer Scientist 669.214.1491 (cell) San Jose, CA, 95110, US Adobe. Make It an Experience. raagg...@adobe.com Adobe.com<http://www.adobe.com/> Any additional, business necessary information, such as legal requirements for your region can go here – Arial, size 8, italicized. Otherwise, delete this text.
RE: OutOfMemory when doing joins in spark 2.0 while same code runs fine in spark 1.5.2
Hi Ian, Thanks for the information. I think you are referring to post http://apache-spark-user-list.1001560.n3.nabble.com/How-spark-decides-whether-to-do-BroadcastHashJoin-or-SortMergeJoin-td27369.html. Yeah I could solve above issue of mine using spark.sql.autoBroadcastJoinThreshold=-1, so that it always results in Sort-Merge join instead of BroadcastHashJoin, Rather ideal fix for me is to calculate size of my custom default source (BaseRelation’s sizeInBytes) in right manner, to make spark planner take appropriate decision for me. Thanks Ravi From: ianoconn...@gmail.com [mailto:ianoconn...@gmail.com] On Behalf Of Ian O'Connell Sent: Wednesday, July 20, 2016 11:05 PM To: Ravi Aggarwal <raagg...@adobe.com> Cc: Ted Yu <yuzhih...@gmail.com>; user <user@spark.apache.org> Subject: Re: OutOfMemory when doing joins in spark 2.0 while same code runs fine in spark 1.5.2 Ravi did your issue ever get solved for this? I think i've been hitting the same thing, it looks like the spark.sql.autoBroadcastJoinThreshold stuff isn't kicking in as expected, if I set that to -1 then the computation proceeds successfully. On Tue, Jun 14, 2016 at 12:28 AM, Ravi Aggarwal <raagg...@adobe.com<mailto:raagg...@adobe.com>> wrote: Hi, Is there any breakthrough here? I had one more observation while debugging the issue Here are the 4 types of data I had: Da -> stored in parquet Di -> stored in parquet Dl1 -> parquet version of lookup Dl2 -> hbase version of lookup Joins performed and type of join done by spark: Da and Di Sort-merge failed (OOM) Da and Dl1 B-H passed Da and Dl2 Sort-Mergepassed Di and Dl1B-H passed Di and Dl2Sort-Mergefailed (OOM) From entries I can deduce that problem is with sort-merge join involving Di. So the hbase thing is out of equation, that is not the culprit. In physical plan I could see there are only two operations that are done additionally in sort-merge as compared to Broadcast-hash. ==> Exchange Hashpartitioning ==> Sort And finally sort-merge join. Can we deduce anything from this? Thanks Ravi From: Ravi Aggarwal Sent: Friday, June 10, 2016 12:31 PM To: 'Ted Yu' <yuzhih...@gmail.com<mailto:yuzhih...@gmail.com>> Cc: user <user@spark.apache.org<mailto:user@spark.apache.org>> Subject: RE: OutOfMemory when doing joins in spark 2.0 while same code runs fine in spark 1.5.2 Hi Ted, Thanks for the reply. Here is the code Btw – df.count is running fine on dataframe generated from this default source. I think it is something in the combination of join and hbase data source that is creating issue. But not sure entirely. I have also dumped the physical plans of both approaches s3a/s3a join and s3a/hbase join, In case you want that let me know. import org.apache.hadoop.fs.FileStatus import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.{TableInputFormat} import org.apache.hadoop.hbase._ import org.apache.hadoop.mapreduce.Job import org.apache.spark.rdd.RDD import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.CatalystTypeConverters import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} import org.apache.spark.sql.execution.datasources.{OutputWriterFactory, FileFormat} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.sql._ import org.slf4j.LoggerFactory class DefaultSource extends SchemaRelationProvider with FileFormat { override def createRelation(sqlContext: SQLContext, parameters: Map[String, String], schema: StructType) = { new HBaseRelation(schema, parameters)(sqlContext) } def inferSchema(sparkSession: SparkSession, options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = ??? def prepareWrite(sparkSession: SparkSession, job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = ??? } object HBaseConfigurationUtil { lazy val logger = LoggerFactory.getLogger("HBaseConfigurationUtil") val hbaseConfiguration = (tableName: String, hbaseQuorum: String) => { val conf = HBaseConfiguration.create() conf.set(TableInputFormat.INPUT_TABLE, tableName) conf.set("hbase.mapred.outputtable", tableName) conf.set("hbase.zookeeper.quorum", hbaseQuorum) conf } } class HBaseRelation(val schema: StructType, parameters: Map[String, String]) (@transient val sqlContext: SQLContext) extends BaseRelation with TableScan { import sqlContext.sparkContext override def buildScan(): RDD[Row] = { val bcDataSchema = sparkContext.broadcast(schema) val tableName = parameters.get("path") match
RE: OutOfMemory when doing joins in spark 2.0 while same code runs fine in spark 1.5.2
Hi, Is there any breakthrough here? I had one more observation while debugging the issue Here are the 4 types of data I had: Da -> stored in parquet Di -> stored in parquet Dl1 -> parquet version of lookup Dl2 -> hbase version of lookup Joins performed and type of join done by spark: Da and Di Sort-merge failed (OOM) Da and Dl1 B-H passed Da and Dl2 Sort-Mergepassed Di and Dl1B-H passed Di and Dl2Sort-Mergefailed (OOM) From entries I can deduce that problem is with sort-merge join involving Di. So the hbase thing is out of equation, that is not the culprit. In physical plan I could see there are only two operations that are done additionally in sort-merge as compared to Broadcast-hash. è Exchange Hashpartitioning è Sort And finally sort-merge join. Can we deduce anything from this? Thanks Ravi From: Ravi Aggarwal Sent: Friday, June 10, 2016 12:31 PM To: 'Ted Yu' <yuzhih...@gmail.com> Cc: user <user@spark.apache.org> Subject: RE: OutOfMemory when doing joins in spark 2.0 while same code runs fine in spark 1.5.2 Hi Ted, Thanks for the reply. Here is the code Btw – df.count is running fine on dataframe generated from this default source. I think it is something in the combination of join and hbase data source that is creating issue. But not sure entirely. I have also dumped the physical plans of both approaches s3a/s3a join and s3a/hbase join, In case you want that let me know. import org.apache.hadoop.fs.FileStatus import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.{TableInputFormat} import org.apache.hadoop.hbase._ import org.apache.hadoop.mapreduce.Job import org.apache.spark.rdd.RDD import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.CatalystTypeConverters import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} import org.apache.spark.sql.execution.datasources.{OutputWriterFactory, FileFormat} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.sql._ import org.slf4j.LoggerFactory class DefaultSource extends SchemaRelationProvider with FileFormat { override def createRelation(sqlContext: SQLContext, parameters: Map[String, String], schema: StructType) = { new HBaseRelation(schema, parameters)(sqlContext) } def inferSchema(sparkSession: SparkSession, options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = ??? def prepareWrite(sparkSession: SparkSession, job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = ??? } object HBaseConfigurationUtil { lazy val logger = LoggerFactory.getLogger("HBaseConfigurationUtil") val hbaseConfiguration = (tableName: String, hbaseQuorum: String) => { val conf = HBaseConfiguration.create() conf.set(TableInputFormat.INPUT_TABLE, tableName) conf.set("hbase.mapred.outputtable", tableName) conf.set("hbase.zookeeper.quorum", hbaseQuorum) conf } } class HBaseRelation(val schema: StructType, parameters: Map[String, String]) (@transient val sqlContext: SQLContext) extends BaseRelation with TableScan { import sqlContext.sparkContext override def buildScan(): RDD[Row] = { val bcDataSchema = sparkContext.broadcast(schema) val tableName = parameters.get("path") match { case Some(t) => t case _ => throw new RuntimeException("Table name (path) not provided in parameters") } val hbaseQuorum = parameters.get("hbaseQuorum") match { case Some(s: String) => s case _ => throw new RuntimeException("hbaseQuorum not provided in options") } val rdd = sparkContext.newAPIHadoopRDD( HBaseConfigurationUtil.hbaseConfiguration(tableName, hbaseQuorum), classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result] ) val rowRdd = rdd .map(tuple => tuple._2) .map { record => val cells: java.util.List[Cell] = record.listCells() val splitRec = cells.toArray.foldLeft(Array(CellUtil.cloneRow(cells.get(0 {(a, b) => a :+ CellUtil.cloneValue(b.asInstanceOf[Cell]) } val keyFieldName = bcDataSchema.value.fields.filter(e => e.metadata.contains("isPrimary") && e.metadata.getBoolean("isPrimary"))(0).name val schemaArr = cells.toArray.foldLeft(Array(keyFieldName)) {(a, b) => { val fieldCell = b.asInstanceOf[Cell] a :+ new String(fieldCell.getQualifierArray).substring(fieldCell.getQualifierOffset, fieldCell.getQualifierLength + fieldCell.getQualifierOffset)
RE: OutOfMemory when doing joins in spark 2.0 while same code runs fine in spark 1.5.2
Hi Ted, Thanks for the reply. Here is the code Btw – df.count is running fine on dataframe generated from this default source. I think it is something in the combination of join and hbase data source that is creating issue. But not sure entirely. I have also dumped the physical plans of both approaches s3a/s3a join and s3a/hbase join, In case you want that let me know. import org.apache.hadoop.fs.FileStatus import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.{TableInputFormat} import org.apache.hadoop.hbase._ import org.apache.hadoop.mapreduce.Job import org.apache.spark.rdd.RDD import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.CatalystTypeConverters import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} import org.apache.spark.sql.execution.datasources.{OutputWriterFactory, FileFormat} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.sql._ import org.slf4j.LoggerFactory class DefaultSource extends SchemaRelationProvider with FileFormat { override def createRelation(sqlContext: SQLContext, parameters: Map[String, String], schema: StructType) = { new HBaseRelation(schema, parameters)(sqlContext) } def inferSchema(sparkSession: SparkSession, options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = ??? def prepareWrite(sparkSession: SparkSession, job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = ??? } object HBaseConfigurationUtil { lazy val logger = LoggerFactory.getLogger("HBaseConfigurationUtil") val hbaseConfiguration = (tableName: String, hbaseQuorum: String) => { val conf = HBaseConfiguration.create() conf.set(TableInputFormat.INPUT_TABLE, tableName) conf.set("hbase.mapred.outputtable", tableName) conf.set("hbase.zookeeper.quorum", hbaseQuorum) conf } } class HBaseRelation(val schema: StructType, parameters: Map[String, String]) (@transient val sqlContext: SQLContext) extends BaseRelation with TableScan { import sqlContext.sparkContext override def buildScan(): RDD[Row] = { val bcDataSchema = sparkContext.broadcast(schema) val tableName = parameters.get("path") match { case Some(t) => t case _ => throw new RuntimeException("Table name (path) not provided in parameters") } val hbaseQuorum = parameters.get("hbaseQuorum") match { case Some(s: String) => s case _ => throw new RuntimeException("hbaseQuorum not provided in options") } val rdd = sparkContext.newAPIHadoopRDD( HBaseConfigurationUtil.hbaseConfiguration(tableName, hbaseQuorum), classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result] ) val rowRdd = rdd .map(tuple => tuple._2) .map { record => val cells: java.util.List[Cell] = record.listCells() val splitRec = cells.toArray.foldLeft(Array(CellUtil.cloneRow(cells.get(0 {(a, b) => a :+ CellUtil.cloneValue(b.asInstanceOf[Cell]) } val keyFieldName = bcDataSchema.value.fields.filter(e => e.metadata.contains("isPrimary") && e.metadata.getBoolean("isPrimary"))(0).name val schemaArr = cells.toArray.foldLeft(Array(keyFieldName)) {(a, b) => { val fieldCell = b.asInstanceOf[Cell] a :+ new String(fieldCell.getQualifierArray).substring(fieldCell.getQualifierOffset, fieldCell.getQualifierLength + fieldCell.getQualifierOffset) } } val res = Map(schemaArr.zip(splitRec).toArray: _*) val recordFields = res.map(value => { val colDataType = try { bcDataSchema.value.fields.filter(_.name == value._1)(0).dataType } catch { case e: ArrayIndexOutOfBoundsException => throw new RuntimeException("Schema doesn't contain the fieldname") } CatalystTypeConverters.convertToScala( Cast(Literal(value._2), colDataType).eval(), colDataType) }).toArray Row(recordFields: _*) } rowRdd } } Thanks Ravi From: Ted Yu [mailto:yuzhih...@gmail.com] Sent: Thursday, June 9, 2016 7:56 PM To: Ravi Aggarwal <raagg...@adobe.com> Cc: user <user@spark.apache.org> Subject: Re: OutOfMemory when doing joins in spark 2.0 while same code runs fine in spark 1.5.2 bq. Read data from hbase using custom DefaultSource (implemented using TableScan) Did you use the DefaultSource from hbase-spark module in hbase master branch ? If you wrote your own, mind sharing related code ? Thanks On Thu, Jun 9, 2016 at 2:53 AM, raaggarw <raagg...@adobe.com<mailto:raagg...@adobe.com>> wrote: Hi, I was