adamjoneill edited a comment on issue #1325: presto - querying nested object in 
parquet file created by hudi
URL: https://github.com/apache/incubator-hudi/issues/1325#issuecomment-586284798
 
 
   @vinothchandar I've managed to reproduce with a simple spark.parallelize() 
example.
   
   test.scala
   ```
   import org.apache.spark._
   import org.apache.spark.sql._
   import org.apache.spark.sql.functions.{month, year, col, dayofmonth}
   import org.apache.spark.storage.StorageLevel
   import org.apache.spark.streaming.{Milliseconds, StreamingContext}
   import org.apache.spark.streaming.Duration
   import org.apache.spark.streaming.kinesis._
   import org.apache.spark.streaming.kinesis.KinesisInputDStream
   import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
   import org.apache.hudi.DataSourceWriteOptions
   import org.apache.hudi.config.HoodieWriteConfig
   import org.apache.hudi.hive.MultiPartKeysValueExtractor
   
   import org.apache.spark.sql.types._
   import org.apache.spark.sql.DataFrame
   
   object HudiScalaStreamHelloWorld {
       
       def main(args: Array[String]): Unit = {  
           val appName = "ScalaStreamExample"
           val batchInterval = Milliseconds(2000)
           val spark = SparkSession
               .builder()
               .appName(appName)
               .getOrCreate()
   
           val sparkContext = spark.sparkContext
           val streamingContext = new StreamingContext(sparkContext, 
batchInterval)
   
           import spark.implicits._
           val sc = sparkContext
   
           case class Bar(id: Int, name: String)
   
           // Uncomment following section based on example
   
           // START - Simple object included in array item
           // case class Foo(id: Int, bar: Bar) // foo with simple object
           // END - Simple object included in array item
           
           // START - Simple object not present in array item
           // missing the id: Int property
           case class Foo(bar: Bar) // foo without simple object
           // END - Simple object not present in array item
           
           
           case class Root(id: Int, foos: Array[Foo])
   
           // Uncomment following section based on example
   
           // START - Simple object included in array item
           // with simple item on foo in array
           // val dataFrame = sc.parallelize(Seq(Root(1, Array(Foo(1, Bar(1, 
"OneBar")))))).toDF()
           // END - Simple object included in array item
   
           // START - Simple object not present in array item
           // without simple item on foo in array
           val dataFrame = sc.parallelize(Seq(Root(1, Array(Foo(Bar(1, 
"OneBar")))))).toDF()
           // END - Simple object not present in array item
   
           dataFrame.printSchema()
   
           val hudiTableName = "order"
           val hudiTablePath = "s3://xxx-xxxx/path/" + hudiTableName
   
           // Set up our Hudi Data Source Options
           val hudiOptions = Map[String,String](
               DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
               HoodieWriteConfig.TABLE_NAME -> hudiTableName, 
               DataSourceWriteOptions.OPERATION_OPT_KEY ->
                   DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, 
               DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "id")
   
           
dataFrame.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath)
       }
   }
   ```
   
   deploy.sh
   ```
   sbt clean
   sbt package
   
   aws s3 cp ./target/scala-2.11/simple-project_2.11-1.0.jar 
s3://xxxx.xxxx/simple-project_2.11-1.0.jar
   
   aws emr add-steps --cluster-id j-AZQBZK81NAFT --steps 
Type=spark,Name=SimpleHudiTest,Args=[\
   --deploy-mode,cluster,\
   --master,yarn,\
   
--packages,\'org.apache.hudi:hudi-spark-bundle:0.5.0-incubating,org.apache.spark:spark-avro_2.11:2.4.4\',\
   --conf,spark.yarn.submit.waitAppCompletion=false,\
   --conf,yarn.log-aggregation-enable=true,\
   --conf,spark.dynamicAllocation.enabled=true,\
   --conf,spark.cores.max=4,\
   --conf,spark.network.timeout=300,\
   --conf,spark.serializer=org.apache.spark.serializer.KryoSerializer,\
   --conf,spark.sql.hive.convertMetastoreParquet=false,\
   --class,HudiScalaStreamHelloWorld,\
   s3://xxxx.xxx/simple-project_2.11-1.0.jar\
   ],ActionOnFailure=CONTINUE
   ```
   
   build.sbt
   ```
   name := "Simple Project"
   
   version := "1.0"
   
   scalaVersion := "2.11.12"
   
   libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.4"
   libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.4"
   libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % 
"2.4.4"
   libraryDependencies += "org.apache.hudi" % "hudi-spark-bundle" % 
"0.5.0-incubating"
   
   
   scalacOptions := Seq("-unchecked", "-deprecation")
   
   ```
   
   AWS glue job runs over the output s3 directory. 
   
   From the presto EMR instance the result when simple object included on the 
array item:
   ```
   presto:schema> select * from default;
    _hoodie_commit_time | _hoodie_commit_seqno | _hoodie_record_key | 
_hoodie_partition_path |                          _hoodie_file_name             
             | id |               foos                
   
---------------------+----------------------+--------------------+------------------------+---------------------------------------------------------------------+----+-----------------------------------
    20200214112552      | 20200214112552_0_1   | 1                  | default   
             | 
90190f46-d064-4c8b-ab6a-89ecc9b3ced4-0_0-5-8_20200214112552.parquet |  1 | 
[{id=1, bar={id=1, name=OneBar}}] 
   (1 row)
   
   Query 20200214_130009_00002_hej8h, FINISHED, 1 node
   
http://xx-xx-xxx-xx-xxx.x-xxxxx.compute.amazonaws.com:8889/ui/query.html?20200214_130009_00002_hej8h
   Splits: 17 total, 17 done (100.00%)
   CPU Time: 0.0s total,    23 rows/s, 25.6KB/s, 16% active
   Per Node: 0.1 parallelism,     1 rows/s, 2.09KB/s
   Parallelism: 0.1
   Peak Memory: 0B
   0:01 [1 rows, 1.1KB] [1 rows/s, 2.09KB/s]
   ```
   
   error message when running query on the example code without a simple 
property on the array item
   ```
   presto:schema> select * from default;
   
   Query 20200214_131047_00005_hej8h, FAILED, 1 node
   
http://xxx-xx-xxx-xx-xxx.xx-xxxx.compute.amazonaws.com:8889/ui/query.html?20200214_131047_00005_hej8h
   Splits: 17 total, 0 done (0.00%)
   CPU Time: 0.0s total,     0 rows/s,     0B/s, 0% active
   Per Node: 0.0 parallelism,     0 rows/s,     0B/s
   Parallelism: 0.0
   Peak Memory: 0B
   0:01 [0 rows, 0B] [0 rows/s, 0B/s]
   
   Query 20200214_131047_00005_hej8h failed: No value present
   java.util.NoSuchElementException: No value present
        at java.util.Optional.get(Optional.java:135)
        at 
com.facebook.presto.parquet.reader.ParquetReader.readArray(ParquetReader.java:156)
        at 
com.facebook.presto.parquet.reader.ParquetReader.readColumnChunk(ParquetReader.java:282)
        at 
com.facebook.presto.parquet.reader.ParquetReader.readBlock(ParquetReader.java:268)
        at 
com.facebook.presto.hive.parquet.ParquetPageSource$ParquetBlockLoader.load(ParquetPageSource.java:247)
        at 
com.facebook.presto.hive.parquet.ParquetPageSource$ParquetBlockLoader.load(ParquetPageSource.java:225)
        at 
com.facebook.presto.spi.block.LazyBlock.assureLoaded(LazyBlock.java:283)
        at 
com.facebook.presto.spi.block.LazyBlock.getLoadedBlock(LazyBlock.java:274)
        at com.facebook.presto.spi.Page.getLoadedPage(Page.java:261)
        at 
com.facebook.presto.operator.TableScanOperator.getOutput(TableScanOperator.java:254)
        at com.facebook.presto.operator.Driver.processInternal(Driver.java:379)
        at 
com.facebook.presto.operator.Driver.lambda$processFor$8(Driver.java:283)
        at com.facebook.presto.operator.Driver.tryWithLock(Driver.java:675)
        at com.facebook.presto.operator.Driver.processFor(Driver.java:276)
        at 
com.facebook.presto.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:1077)
        at 
com.facebook.presto.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:162)
        at 
com.facebook.presto.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:483)
        at 
com.facebook.presto.$gen.Presto_0_227____20200211_134743_1.run(Unknown Source)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   ```
   
   The difference between the 2 records can be highlighted in the following JSON
   
   Success:
   ```
   {
     "id": 1,
     "foos": [
       {
         "id": 1, // <-- simple value on the array item
         "bar": {
           "id": 1,
           "name": "OneBar"
         }
       }
     ]
   }
   ```
   fails
   ```
   {
     "id": 1,
     "foos": [
       {
         // missing simple property --> "id": 1,
         "bar": {
           "id": 1,
           "name": "OneBar"
         }
       }
     ]
   }
   ```
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to