Hey Friends,

I’m working on a POC that involves reading and writing parquet files mid dag.  
Writes are working but I’m struggling with getting reads working due to 
serialization issues. I’ve got code that works in master=local but not in yarn. 
 So here are my questions.


  1.  Is there an easy way to tell if a particular function in spark will be 
run on the driver or the executor?  My current system is that if the function 
uses the spark session it runs on the driver but….
  2.  Where does FileFormat.buildReaderWithPartitionValues(..) run?  The driver 
or the executor?  Dyue to the spark session I was suspecting that it was run on 
the driver and then the resulting iterator was sent to the executor to run the 
read but I’ve been running into serialization issues.

19/04/11 12:35:29 ERROR org.apache.spark.internal.Logging$class TaskSetManager: 
Failed to serialize task 26, not attempting to retry it.
java.io.NotSerializableException: scala.collection.Iterator$$anon$12
Serialization stack:
                - object not serializable (class: 
scala.collection.Iterator$$anon$12, value: non-empty iterator)
                - writeObject data (class: 
scala.collection.immutable.List$SerializationProxy)
                - object (class 
scala.collection.immutable.List$SerializationProxy, 
scala.collection.immutable.List$SerializationProxy@6993864a)
                - writeReplace data (class: 
scala.collection.immutable.List$SerializationProxy)
                - object (class scala.collection.immutable.$colon$colon, 
List(non-empty iterator))
                - field (class: 
com.amazon.horizon.azulene.datasource.AzuleneSplit, name: readers, type: class 
scala.collection.immutable.List)

Is there something I’m missing here?

Here’s the code I’m using to read records.

def read(path: 
String,spark:SparkSession,fileSchema:StructType,requiredSchema:StructType):Iterator[InternalRow]
 = {
  val partitionSchema = StructType(Seq.empty)
  val status = spark.fs.getFileStatus(path)

  val pFile = new PartitionedFile(
    partitionValues = InternalRow.empty,//This should be empty for non 
partitioned values
    filePath = path.toString,
    start = 0,
    length = status.getLen
  )

  val readFile: (PartitionedFile) => Iterator[Any] = //Iterator[InternalRow]
    new ParquetFileFormat().buildReaderWithPartitionValues(
      sparkSession = spark,
      dataSchema = fileSchema,
      partitionSchema = partitionSchema,//this should be empty for non 
partitioned fields
      requiredSchema = requiredSchema,
      filters = Seq.empty,
      options = Map.empty,
      hadoopConf = 
spark.sparkContext.hadoopConfiguration//relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
    )

  import scala.collection.JavaConverters._

  val i: Iterator[Any] = readFile(pFile)
  val rows = i.flatMap(_ match {
    case r: InternalRow => Seq(r)
    case b: ColumnarBatch => b.rowIterator().asScala
  })

  rows
}


Cheers Andrew

Reply via email to