Thanks a ton for the help!

Is there a standardized way of converting the internal row to rows?

I’ve tried this but im getting an exception

val enconder = RowEncoder(df.schema)
val rows = readFile(pFile).flatMap(_ match {
  case r: InternalRow => Seq(r)
  case b: ColumnarBatch => b.rowIterator().asScala
})
  .map(enconder.fromRow(_))
  .toList

java.lang.RuntimeException: Error while decoding: 
java.lang.UnsupportedOperationException: Cannot evaluate expression: 
getcolumnbyordinal(0, IntegerType)
createexternalrow(getcolumnbyordinal(0, IntegerType), getcolumnbyordinal(1, 
IntegerType), getcolumnbyordinal(2, StringType).toString, 
StructField(pk,IntegerType,false), StructField(ordering,IntegerType,false), 
StructField(col_a,StringType,true))

                at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:305)
                at 
com.amazon.horizon.azulene.ParquetReadTests$$anonfun$2.apply(ParquetReadTests.scala:100)
                at 
com.amazon.horizon.azulene.ParquetReadTests$$anonfun$2.apply(ParquetReadTests.scala:100)

From: Ryan Blue <rb...@netflix.com.INVALID>
Reply-To: "rb...@netflix.com" <rb...@netflix.com>
Date: Thursday, March 21, 2019 at 3:32 PM
To: "Long, Andrew" <loand...@amazon.com.invalid>
Cc: "dev@spark.apache.org" <dev@spark.apache.org>, "u...@spark.apache.org" 
<u...@spark.apache.org>, "horizon-...@amazon.com" <horizon-...@amazon.com>
Subject: Re: Manually reading parquet files.

You're getting InternalRow instances. They probably have the data you want, but 
the toString representation doesn't match the data for InternalRow.

On Thu, Mar 21, 2019 at 3:28 PM Long, Andrew <loand...@amazon.com.invalid> 
wrote:
Hello Friends,

I’m working on a performance improvement that reads additional parquet files in 
the middle of a lambda and I’m running into some issues.  This is what id like 
todo


ds.mapPartitions(x=>{
  //read parquet file in and perform an operation with x
})


Here’s my current POC code but I’m getting nonsense back from the row reader.

import com.amazon.horizon.azulene.util.SparkFileUtils._

spark.conf.set("spark.sql.parquet.enableVectorizedReader","false")

val data = List(
  TestRow(1,1,"asdf"),
  TestRow(2,1,"asdf"),
  TestRow(3,1,"asdf"),
  TestRow(4,1,"asdf")
)

val df = spark.createDataFrame(data)

val folder = Files.createTempDirectory("azulene-test")

val folderPath = folder.toAbsolutePath.toString + "/"
df.write.mode("overwrite").parquet(folderPath)

val files = spark.fs.listStatus(folder.toUri)

val file = files(1)//skip _success file

val partitionSchema = StructType(Seq.empty)
val dataSchema = df.schema
val fileFormat = new ParquetFileFormat()

val path = file.getPath

val status = spark.fs.getFileStatus(path)

val pFile = new PartitionedFile(
  partitionValues = InternalRow.empty,//This should be empty for non 
partitioned values
  filePath = path.toString,
  start = 0,
  length = status.getLen
)

val readFile: (PartitionedFile) => Iterator[Any] = //Iterator[InternalRow]
  fileFormat.buildReaderWithPartitionValues(
    sparkSession = spark,
    dataSchema = dataSchema,
    partitionSchema = partitionSchema,//this should be empty for non 
partitioned feilds
    requiredSchema = dataSchema,
    filters = Seq.empty,
    options = Map.empty,
    hadoopConf = 
spark.sparkContext.hadoopConfiguration//relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
  )

import scala.collection.JavaConverters._

val rows = readFile(pFile).flatMap(_ match {
  case r: InternalRow => Seq(r)

  // This doesn't work. vector mode is doing something screwy
  case b: ColumnarBatch => b.rowIterator().asScala
}).toList

println(rows)
//List([0,1,5b,2000000004,66647361])
//??this is wrong I think????

Has anyone attempted something similar?

Cheers Andrew



--
Ryan Blue
Software Engineer
Netflix

Reply via email to