Can you minimize the code snippet with which we can get this
`NotSerializableException`
exception?

Thanks,


-----
Prashant Sharma
Spark Technology Center
http://www.spark.tc/
--


On Sun, Jan 1, 2017 at 9:36 AM, khyati <khyati.s...@guavus.com> wrote:

> Getting error for the following code snippet:
>
> object SparkTaskTry extends Logging {
>   63   /**
>   64    * Extends the normal Try constructor to allow TaskKilledExceptions
> to propagate
>   65    */
>   66   def apply[T](r: => T): Try[T] =
>   67     try scala.util.Success(r) catch {
>   68       case e: TaskKilledException => throw e
>   69       case NonFatal(e) =>
>   70         logInfo("Caught and Ignored Exception: " + e.toString)
>   71         e.printStackTrace()
>   72         Failure(e)
>   73     }
>   74 }
>
> override def buildScan(
>  349       requiredColumns: Array[String],
>  350       filters: Array[Filter],
>  351       inputFiles: Array[FileStatus],
>  352       broadcastedConf: Broadcast[SerializableConfiguration]):
> RDD[Row]
> = {
>  353     val useMetadataCache =
> sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA)
>  354     val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
>  355     val assumeBinaryIsString = sqlContext.conf.
> isParquetBinaryAsString
>  356     val assumeInt96IsTimestamp =
> sqlContext.conf.isParquetINT96AsTimestamp
>  357     val followParquetFormatSpec =
> sqlContext.conf.followParquetFormatSpec
>  358
>  359     // When merging schemas is enabled and the column of the given
> filter does not exist,
>  360     // Parquet emits an exception which is an issue of Parquet
> (PARQUET-389).
>  361     val safeParquetFilterPushDown = !shouldMergeSchemas &&
> parquetFilterPushDown
>  362
>  363     // Parquet row group size. We will use this value as the value for
>  364     // mapreduce.input.fileinputformat.split.minsize and
> mapred.min.split.size if the value
>  365     // of these flags are smaller than the parquet row group size.
>  366     val parquetBlockSize =
> ParquetOutputFormat.getLongBlockSize(broadcastedConf.value.value)
>  367
>  368     // Create the function to set variable Parquet confs at both
> driver
> and executor side.
>  369     val initLocalJobFuncOpt =
>  370       ParquetRelation.initializeLocalJobFunc(
>  371         requiredColumns,
>  372         filters,
>  373         dataSchema,
>  374         parquetBlockSize,
>  375         useMetadataCache,
>  376         safeParquetFilterPushDown,
>  377         assumeBinaryIsString,
>  378         assumeInt96IsTimestamp,
>  379         followParquetFormatSpec) _
>  380
>  381     // Create the function to set input paths at the driver side.
>  382     val setInputPaths =
>  383       ParquetRelation.initializeDriverSideJobFunc(inputFiles,
> parquetBlockSize) _
>  384
>  385     Utils.withDummyCallSite(sqlContext.sparkContext) {
>  386       new RDD[Try[InternalRow]](sqlContext.sparkContext, Nil) with
> Logging {
>  387
>  388         override def getPartitions: Array[SparkPartition] =
> internalRDD.getPartitions
>  389
>  390         override def getPreferredLocations(split: SparkPartition):
> Seq[String] =
>  391           internalRDD.getPreferredLocations(split)
>  392
>  393         override def checkpoint() {
>  394           // Do nothing. Hadoop RDD should not be checkpointed.
>  395         }
>  396
>  397         override def persist(storageLevel: StorageLevel): this.type =
> {
>  398           super.persist(storageLevel)
>  399         }
>  400
>  401         val internalRDD: SqlNewHadoopRDD[InternalRow] = new
> SqlNewHadoopRDD(
>  402         sc = sqlContext.sparkContext,
>  403         broadcastedConf = broadcastedConf,
>  404         initDriverSideJobFuncOpt = Some(setInputPaths),
>  405         initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
>  406         inputFormatClass = if (isSplittable) {
>  407           classOf[ParquetInputFormat[InternalRow]]
>  408         } else {
>  409           classOf[ParquetRowInputFormatIndivisible]
>  410         },
>  411         valueClass = classOf[InternalRow]) {
>  412
>  413         val cacheMetadata = useMetadataCache
>  414
>  415         @transient val cachedStatuses = inputFiles.map { f =>
>  416           // In order to encode the authority of a Path containing
> special characters such as '/'
>  417           // (which does happen in some S3N credentials), we need to
> use the string returned by the
>  418           // URI of the path to create a new Path.
>  419           val pathWithEscapedAuthority = escapePathUserInfo(f.getPath)
>  420           new FileStatus(
>  421             f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
> f.getModificationTime,
>  422             f.getAccessTime, f.getPermission, f.getOwner, f.getGroup,
> pathWithEscapedAuthority)
>  423         }.toSeq
>  424
>  425         private def escapePathUserInfo(path: Path): Path = {
>  426           val uri = path.toUri
>  427           new Path(new URI(
>  428             uri.getScheme, uri.getRawUserInfo, uri.getHost,
> uri.getPort, uri.getPath,
>  429             uri.getQuery, uri.getFragment))
>  430         }
>  431
>  432         // Overridden so we can inject our own cached files statuses.
>  433         override def getPartitions: Array[SparkPartition] = {
>  434           val inputFormat = new ParquetInputFormat[InternalRow] {
>  435             override def listStatus(jobContext: JobContext):
> JList[FileStatus] = {
>  436               if (cacheMetadata) cachedStatuses else
> super.listStatus(jobContext)
>  437             }
>  438           }
>  439
>  440           val jobContext = newJobContext(getConf(isDriverSide =
> true),
> jobId)
>  441           val rawSplits = inputFormat.getSplits(jobContext)
>  442
>  443           Array.tabulate[SparkPartition](rawSplits.size) { i =>
>  444             new SqlNewHadoopPartition(id, i,
> rawSplits(i).asInstanceOf[InputSplit with Writable])
>  445           }
>  446         }
>  447       }
>  448
>  449         override def compute(part: SparkPartition, context:
> TaskContext):
>  450         InterruptibleIterator[Try[InternalRow]] = {
>  451           val iter: Iterator[InternalRow] =
> internalRDD.constructIter(part, context)
>  452           val tryIter = new Iterator[Try[InternalRow]] {
>  453             override def next(): Try[InternalRow] = {
>  454               val readAttempt = SparkTaskTry(iter.next())
>  455               readAttempt
>  456             }
>  457
>  458             override def hasNext: Boolean = {
>  459               SparkTaskTry[Boolean](iter.hasNext) match {
>  460                 case scala.util.Success(r) => r
>  461                 case _ => false
>  462               }
>  463             }
>  464           }
>  465           new InterruptibleIterator[Try[InternalRow]](context,
> tryIter)
>  466         }
>  467
>  468       }.filter(_.isSuccess).map(_.get)
>  469         .asInstanceOf[RDD[Row]]  // type erasure hack to pass
> RDD[InternalRow] as RDD[Row]
>  470     }
>  471   }
>
> Error StackTrace :
> Caused by: java.io.NotSerializableException:
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
> Serialization stack:
>         - object not serializable (class:
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation, value:
> ParquetRelation)
>         - field (class:
> org.apache.spark.sql.execution.datasources.parquet.
> ParquetRelation$$anonfun$buildInternalScan$1,
> name: $outer, type: class
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation)
>         - object (class
> org.apache.spark.sql.execution.datasources.parquet.
> ParquetRelation$$anonfun$buildInternalScan$1,
> <function0>)
>         - field (class:
> org.apache.spark.sql.execution.datasources.parquet.
> ParquetRelation$$anonfun$buildInternalScan$1$$anon$2,
> name: $outer, type: class
> org.apache.spark.sql.execution.datasources.parquet.
> ParquetRelation$$anonfun$buildInternalScan$1)
>         - object (class
> org.apache.spark.sql.execution.datasources.parquet.
> ParquetRelation$$anonfun$buildInternalScan$1$$anon$2,
> $anonfun$buildInternalScan$1$$anon$2[2] at )
>         - field (class: org.apache.spark.NarrowDependency, name: _rdd,
> type: class
> org.apache.spark.rdd.RDD)
>         - object (class org.apache.spark.OneToOneDependency,
> org.apache.spark.OneToOneDependency@47293a0b)
>         - writeObject data (class: scala.collection.immutable.$
> colon$colon)
>         - object (class scala.collection.immutable.$colon$colon,
> List(org.apache.spark.OneToOneDependency@47293a0b))
>         - field (class: org.apache.spark.rdd.RDD, name:
> org$apache$spark$rdd$RDD$$dependencies_, type: interface
> scala.collection.Seq)
>         - object (class org.apache.spark.rdd.MapPartitionsRDD,
> MapPartitionsRDD[4]
> at )
>         - field (class: org.apache.spark.rdd.MapPartitionsRDD, name:
> prev, type:
> class org.apache.spark.rdd.RDD)
>         - object (class org.apache.spark.rdd.MapPartitionsRDD,
> MapPartitionsRDD[5]
> at )
>         - field (class: org.apache.spark.sql.execution.PhysicalRDD, name:
> rdd,
> type: class org.apache.spark.rdd.RDD)
>         - object (class org.apache.spark.sql.execution.PhysicalRDD, Scan
> ParquetRelation[_1#0] InputPaths:
> hdfs://CRUX2-SETUP:9000/data/testdir/data1.parquet
> )
>         - field (class: org.apache.spark.sql.execution.ConvertToSafe,
> name: child,
> type: class org.apache.spark.sql.execution.SparkPlan)
>         - object (class org.apache.spark.sql.execution.ConvertToSafe,
> ConvertToSafe
> +- Scan ParquetRelation[_1#0] InputPaths:
> hdfs://CRUX2-SETUP:9000/data/testdir/data1.parquet
> )
>         - field (class: org.apache.spark.sql.execution.ConvertToSafe$$
> anonfun$2,
> name: $outer, type: class org.apache.spark.sql.execution.ConvertToSafe)
>         - object (class org.apache.spark.sql.execution.ConvertToSafe$$
> anonfun$2,
> <function1>)
>         at
> org.apache.spark.serializer.SerializationDebugger$.improveException(
> SerializationDebugger.scala:40)
>         at
> org.apache.spark.serializer.JavaSerializationStream.
> writeObject(JavaSerializer.scala:47)
>         at
> org.apache.spark.serializer.JavaSerializerInstance.
> serialize(JavaSerializer.scala:101)
>         at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(
> ClosureCleaner.scala:301)
>         ... 78 more
>
> Please help!
>
>
>
> --
> View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/Task-not-Serializable-Exception-
> tp20417.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>

Reply via email to