adamjoneill commented on issue #1325: presto - querying nested object in parquet file created by hudi URL: https://github.com/apache/incubator-hudi/issues/1325#issuecomment-586284798 @vinothchandar I've managed to reproduce with a simple spark.parallelize() example. test.scala ``` import org.apache.spark._ import org.apache.spark.sql._ import org.apache.spark.sql.functions.{month, year, col, dayofmonth} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.streaming.Duration import org.apache.spark.streaming.kinesis._ import org.apache.spark.streaming.kinesis.KinesisInputDStream import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.spark.sql.types._ import org.apache.spark.sql.DataFrame object HudiScalaStreamHelloWorld { def main(args: Array[String]): Unit = { val appName = "ScalaStreamExample" val batchInterval = Milliseconds(2000) val spark = SparkSession .builder() .appName(appName) .getOrCreate() val sparkContext = spark.sparkContext val streamingContext = new StreamingContext(sparkContext, batchInterval) import spark.implicits._ val sc = sparkContext case class Bar(id: Int, name: String) // Uncomment following section based on example // START - Simple object included in array item // case class Foo(id: Int, bar: Bar) // foo with simple object // END - Simple object included in array item // START - Simple object not present in array item // missing the id: Int property case class Foo(bar: Bar) // foo without simple object // END - Simple object not present in array item case class Root(id: Int, foos: Array[Foo]) // Uncomment following section based on example // START - Simple object included in array item // with simple item on foo in array // val dataFrame = sc.parallelize(Seq(Root(1, Array(Foo(1, Bar(1, "OneBar")))))).toDF() // END - Simple object included in array item // START - Simple object not present in array item // without simple item on foo in array val dataFrame = sc.parallelize(Seq(Root(1, Array(Foo(Bar(1, "OneBar")))))).toDF() // END - Simple object not present in array item dataFrame.printSchema() val hudiTableName = "order" val hudiTablePath = "s3://xxx-xxxx/path/" + hudiTableName // Set up our Hudi Data Source Options val hudiOptions = Map[String,String]( DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", HoodieWriteConfig.TABLE_NAME -> hudiTableName, DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "id") dataFrame.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath) } } ``` deploy.sh ``` sbt clean sbt package aws s3 cp ./target/scala-2.11/simple-project_2.11-1.0.jar s3://xxxx.xxxx/simple-project_2.11-1.0.jar aws emr add-steps --cluster-id j-AZQBZK81NAFT --steps Type=spark,Name=SimpleHudiTest,Args=[\ --deploy-mode,cluster,\ --master,yarn,\ --packages,\'org.apache.hudi:hudi-spark-bundle:0.5.0-incubating,org.apache.spark:spark-avro_2.11:2.4.4\',\ --conf,spark.yarn.submit.waitAppCompletion=false,\ --conf,yarn.log-aggregation-enable=true,\ --conf,spark.dynamicAllocation.enabled=true,\ --conf,spark.cores.max=4,\ --conf,spark.network.timeout=300,\ --conf,spark.serializer=org.apache.spark.serializer.KryoSerializer,\ --conf,spark.sql.hive.convertMetastoreParquet=false,\ --class,HudiScalaStreamHelloWorld,\ s3://xxxx.xxx/simple-project_2.11-1.0.jar\ ],ActionOnFailure=CONTINUE ``` build.sbt ``` name := "Simple Project" version := "1.0" scalaVersion := "2.11.12" libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.4" libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.4" libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % "2.4.4" libraryDependencies += "org.apache.hudi" % "hudi-spark-bundle" % "0.5.0-incubating" scalacOptions := Seq("-unchecked", "-deprecation") ``` AWS glue job runs over the output s3 directory. From the presto EMR instance the result when simple object included on the array item: ``` presto:schema> select * from default; _hoodie_commit_time | _hoodie_commit_seqno | _hoodie_record_key | _hoodie_partition_path | _hoodie_file_name | id | foos ---------------------+----------------------+--------------------+------------------------+---------------------------------------------------------------------+----+----------------------------------- 20200214112552 | 20200214112552_0_1 | 1 | default | 90190f46-d064-4c8b-ab6a-89ecc9b3ced4-0_0-5-8_20200214112552.parquet | 1 | [{id=1, bar={id=1, name=OneBar}}] (1 row) Query 20200214_130009_00002_hej8h, FINISHED, 1 node http://xx-xx-xxx-xx-xxx.x-xxxxx.compute.amazonaws.com:8889/ui/query.html?20200214_130009_00002_hej8h Splits: 17 total, 17 done (100.00%) CPU Time: 0.0s total, 23 rows/s, 25.6KB/s, 16% active Per Node: 0.1 parallelism, 1 rows/s, 2.09KB/s Parallelism: 0.1 Peak Memory: 0B 0:01 [1 rows, 1.1KB] [1 rows/s, 2.09KB/s] ``` error message when running query on the example code without a simple property on the array item ``` presto:schema> select * from default; Query 20200214_131047_00005_hej8h, FAILED, 1 node http://xxx-xx-xxx-xx-xxx.xx-xxxx.compute.amazonaws.com:8889/ui/query.html?20200214_131047_00005_hej8h Splits: 17 total, 0 done (0.00%) CPU Time: 0.0s total, 0 rows/s, 0B/s, 0% active Per Node: 0.0 parallelism, 0 rows/s, 0B/s Parallelism: 0.0 Peak Memory: 0B 0:01 [0 rows, 0B] [0 rows/s, 0B/s] Query 20200214_131047_00005_hej8h failed: No value present java.util.NoSuchElementException: No value present at java.util.Optional.get(Optional.java:135) at com.facebook.presto.parquet.reader.ParquetReader.readArray(ParquetReader.java:156) at com.facebook.presto.parquet.reader.ParquetReader.readColumnChunk(ParquetReader.java:282) at com.facebook.presto.parquet.reader.ParquetReader.readBlock(ParquetReader.java:268) at com.facebook.presto.hive.parquet.ParquetPageSource$ParquetBlockLoader.load(ParquetPageSource.java:247) at com.facebook.presto.hive.parquet.ParquetPageSource$ParquetBlockLoader.load(ParquetPageSource.java:225) at com.facebook.presto.spi.block.LazyBlock.assureLoaded(LazyBlock.java:283) at com.facebook.presto.spi.block.LazyBlock.getLoadedBlock(LazyBlock.java:274) at com.facebook.presto.spi.Page.getLoadedPage(Page.java:261) at com.facebook.presto.operator.TableScanOperator.getOutput(TableScanOperator.java:254) at com.facebook.presto.operator.Driver.processInternal(Driver.java:379) at com.facebook.presto.operator.Driver.lambda$processFor$8(Driver.java:283) at com.facebook.presto.operator.Driver.tryWithLock(Driver.java:675) at com.facebook.presto.operator.Driver.processFor(Driver.java:276) at com.facebook.presto.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1077) at com.facebook.presto.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:162) at com.facebook.presto.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:483) at com.facebook.presto.$gen.Presto_0_227____20200211_134743_1.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services