Can you minimize the code snippet with which we can get this `NotSerializableException` exception?
Thanks, ----- Prashant Sharma Spark Technology Center http://www.spark.tc/ -- On Sun, Jan 1, 2017 at 9:36 AM, khyati <khyati.s...@guavus.com> wrote: > Getting error for the following code snippet: > > object SparkTaskTry extends Logging { > 63 /** > 64 * Extends the normal Try constructor to allow TaskKilledExceptions > to propagate > 65 */ > 66 def apply[T](r: => T): Try[T] = > 67 try scala.util.Success(r) catch { > 68 case e: TaskKilledException => throw e > 69 case NonFatal(e) => > 70 logInfo("Caught and Ignored Exception: " + e.toString) > 71 e.printStackTrace() > 72 Failure(e) > 73 } > 74 } > > override def buildScan( > 349 requiredColumns: Array[String], > 350 filters: Array[Filter], > 351 inputFiles: Array[FileStatus], > 352 broadcastedConf: Broadcast[SerializableConfiguration]): > RDD[Row] > = { > 353 val useMetadataCache = > sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA) > 354 val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown > 355 val assumeBinaryIsString = sqlContext.conf. > isParquetBinaryAsString > 356 val assumeInt96IsTimestamp = > sqlContext.conf.isParquetINT96AsTimestamp > 357 val followParquetFormatSpec = > sqlContext.conf.followParquetFormatSpec > 358 > 359 // When merging schemas is enabled and the column of the given > filter does not exist, > 360 // Parquet emits an exception which is an issue of Parquet > (PARQUET-389). > 361 val safeParquetFilterPushDown = !shouldMergeSchemas && > parquetFilterPushDown > 362 > 363 // Parquet row group size. We will use this value as the value for > 364 // mapreduce.input.fileinputformat.split.minsize and > mapred.min.split.size if the value > 365 // of these flags are smaller than the parquet row group size. > 366 val parquetBlockSize = > ParquetOutputFormat.getLongBlockSize(broadcastedConf.value.value) > 367 > 368 // Create the function to set variable Parquet confs at both > driver > and executor side. > 369 val initLocalJobFuncOpt = > 370 ParquetRelation.initializeLocalJobFunc( > 371 requiredColumns, > 372 filters, > 373 dataSchema, > 374 parquetBlockSize, > 375 useMetadataCache, > 376 safeParquetFilterPushDown, > 377 assumeBinaryIsString, > 378 assumeInt96IsTimestamp, > 379 followParquetFormatSpec) _ > 380 > 381 // Create the function to set input paths at the driver side. > 382 val setInputPaths = > 383 ParquetRelation.initializeDriverSideJobFunc(inputFiles, > parquetBlockSize) _ > 384 > 385 Utils.withDummyCallSite(sqlContext.sparkContext) { > 386 new RDD[Try[InternalRow]](sqlContext.sparkContext, Nil) with > Logging { > 387 > 388 override def getPartitions: Array[SparkPartition] = > internalRDD.getPartitions > 389 > 390 override def getPreferredLocations(split: SparkPartition): > Seq[String] = > 391 internalRDD.getPreferredLocations(split) > 392 > 393 override def checkpoint() { > 394 // Do nothing. Hadoop RDD should not be checkpointed. > 395 } > 396 > 397 override def persist(storageLevel: StorageLevel): this.type = > { > 398 super.persist(storageLevel) > 399 } > 400 > 401 val internalRDD: SqlNewHadoopRDD[InternalRow] = new > SqlNewHadoopRDD( > 402 sc = sqlContext.sparkContext, > 403 broadcastedConf = broadcastedConf, > 404 initDriverSideJobFuncOpt = Some(setInputPaths), > 405 initLocalJobFuncOpt = Some(initLocalJobFuncOpt), > 406 inputFormatClass = if (isSplittable) { > 407 classOf[ParquetInputFormat[InternalRow]] > 408 } else { > 409 classOf[ParquetRowInputFormatIndivisible] > 410 }, > 411 valueClass = classOf[InternalRow]) { > 412 > 413 val cacheMetadata = useMetadataCache > 414 > 415 @transient val cachedStatuses = inputFiles.map { f => > 416 // In order to encode the authority of a Path containing > special characters such as '/' > 417 // (which does happen in some S3N credentials), we need to > use the string returned by the > 418 // URI of the path to create a new Path. > 419 val pathWithEscapedAuthority = escapePathUserInfo(f.getPath) > 420 new FileStatus( > 421 f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, > f.getModificationTime, > 422 f.getAccessTime, f.getPermission, f.getOwner, f.getGroup, > pathWithEscapedAuthority) > 423 }.toSeq > 424 > 425 private def escapePathUserInfo(path: Path): Path = { > 426 val uri = path.toUri > 427 new Path(new URI( > 428 uri.getScheme, uri.getRawUserInfo, uri.getHost, > uri.getPort, uri.getPath, > 429 uri.getQuery, uri.getFragment)) > 430 } > 431 > 432 // Overridden so we can inject our own cached files statuses. > 433 override def getPartitions: Array[SparkPartition] = { > 434 val inputFormat = new ParquetInputFormat[InternalRow] { > 435 override def listStatus(jobContext: JobContext): > JList[FileStatus] = { > 436 if (cacheMetadata) cachedStatuses else > super.listStatus(jobContext) > 437 } > 438 } > 439 > 440 val jobContext = newJobContext(getConf(isDriverSide = > true), > jobId) > 441 val rawSplits = inputFormat.getSplits(jobContext) > 442 > 443 Array.tabulate[SparkPartition](rawSplits.size) { i => > 444 new SqlNewHadoopPartition(id, i, > rawSplits(i).asInstanceOf[InputSplit with Writable]) > 445 } > 446 } > 447 } > 448 > 449 override def compute(part: SparkPartition, context: > TaskContext): > 450 InterruptibleIterator[Try[InternalRow]] = { > 451 val iter: Iterator[InternalRow] = > internalRDD.constructIter(part, context) > 452 val tryIter = new Iterator[Try[InternalRow]] { > 453 override def next(): Try[InternalRow] = { > 454 val readAttempt = SparkTaskTry(iter.next()) > 455 readAttempt > 456 } > 457 > 458 override def hasNext: Boolean = { > 459 SparkTaskTry[Boolean](iter.hasNext) match { > 460 case scala.util.Success(r) => r > 461 case _ => false > 462 } > 463 } > 464 } > 465 new InterruptibleIterator[Try[InternalRow]](context, > tryIter) > 466 } > 467 > 468 }.filter(_.isSuccess).map(_.get) > 469 .asInstanceOf[RDD[Row]] // type erasure hack to pass > RDD[InternalRow] as RDD[Row] > 470 } > 471 } > > Error StackTrace : > Caused by: java.io.NotSerializableException: > org.apache.spark.sql.execution.datasources.parquet.ParquetRelation > Serialization stack: > - object not serializable (class: > org.apache.spark.sql.execution.datasources.parquet.ParquetRelation, value: > ParquetRelation) > - field (class: > org.apache.spark.sql.execution.datasources.parquet. > ParquetRelation$$anonfun$buildInternalScan$1, > name: $outer, type: class > org.apache.spark.sql.execution.datasources.parquet.ParquetRelation) > - object (class > org.apache.spark.sql.execution.datasources.parquet. > ParquetRelation$$anonfun$buildInternalScan$1, > <function0>) > - field (class: > org.apache.spark.sql.execution.datasources.parquet. > ParquetRelation$$anonfun$buildInternalScan$1$$anon$2, > name: $outer, type: class > org.apache.spark.sql.execution.datasources.parquet. > ParquetRelation$$anonfun$buildInternalScan$1) > - object (class > org.apache.spark.sql.execution.datasources.parquet. > ParquetRelation$$anonfun$buildInternalScan$1$$anon$2, > $anonfun$buildInternalScan$1$$anon$2[2] at ) > - field (class: org.apache.spark.NarrowDependency, name: _rdd, > type: class > org.apache.spark.rdd.RDD) > - object (class org.apache.spark.OneToOneDependency, > org.apache.spark.OneToOneDependency@47293a0b) > - writeObject data (class: scala.collection.immutable.$ > colon$colon) > - object (class scala.collection.immutable.$colon$colon, > List(org.apache.spark.OneToOneDependency@47293a0b)) > - field (class: org.apache.spark.rdd.RDD, name: > org$apache$spark$rdd$RDD$$dependencies_, type: interface > scala.collection.Seq) > - object (class org.apache.spark.rdd.MapPartitionsRDD, > MapPartitionsRDD[4] > at ) > - field (class: org.apache.spark.rdd.MapPartitionsRDD, name: > prev, type: > class org.apache.spark.rdd.RDD) > - object (class org.apache.spark.rdd.MapPartitionsRDD, > MapPartitionsRDD[5] > at ) > - field (class: org.apache.spark.sql.execution.PhysicalRDD, name: > rdd, > type: class org.apache.spark.rdd.RDD) > - object (class org.apache.spark.sql.execution.PhysicalRDD, Scan > ParquetRelation[_1#0] InputPaths: > hdfs://CRUX2-SETUP:9000/data/testdir/data1.parquet > ) > - field (class: org.apache.spark.sql.execution.ConvertToSafe, > name: child, > type: class org.apache.spark.sql.execution.SparkPlan) > - object (class org.apache.spark.sql.execution.ConvertToSafe, > ConvertToSafe > +- Scan ParquetRelation[_1#0] InputPaths: > hdfs://CRUX2-SETUP:9000/data/testdir/data1.parquet > ) > - field (class: org.apache.spark.sql.execution.ConvertToSafe$$ > anonfun$2, > name: $outer, type: class org.apache.spark.sql.execution.ConvertToSafe) > - object (class org.apache.spark.sql.execution.ConvertToSafe$$ > anonfun$2, > <function1>) > at > org.apache.spark.serializer.SerializationDebugger$.improveException( > SerializationDebugger.scala:40) > at > org.apache.spark.serializer.JavaSerializationStream. > writeObject(JavaSerializer.scala:47) > at > org.apache.spark.serializer.JavaSerializerInstance. > serialize(JavaSerializer.scala:101) > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable( > ClosureCleaner.scala:301) > ... 78 more > > Please help! > > > > -- > View this message in context: http://apache-spark- > developers-list.1001551.n3.nabble.com/Task-not-Serializable-Exception- > tp20417.html > Sent from the Apache Spark Developers List mailing list archive at > Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > >