harishchanderramesh opened a new issue #2089:
URL: https://github.com/apache/hudi/issues/2089


   **Describe the problem you faced**
   
   My MOR Hudi Table is on S3.
   I would need to query the table frequently ( say once every minute ) from 
spark sql.
   I use https://hudi.apache.org/docs/querying_data.html#spark-sql and 
https://hudi.apache.org/docs/querying_data.html#hive both the methods to read 
as I register my hudi table to hive as well.
   I am facing the following problem intermittently ( Mostly ).
   
   ERROR: org.apache.hudi.exception.HoodieIOException: IOException when reading 
log file
   
   **To Reproduce**
   
   **_Steps to reproduce the behavior:_**
   
   **1. Create a MOR table on S3 with following configurations.**
   ```
   df.write.format("org.apache.hudi") \
                       .option("hoodie.table.name", tableName) \
                       .option("hoodie.datasource.write.table.name", tableName) 
\
                       .option("hoodie.datasource.write.operation", "upsert") \
                       .option("hoodie.datasource.write.table.type", 
"MERGE_ON_READ") \
                       .option("hoodie.datasource.write.keygenerator.class", 
"org.apache.hudi.keygen.ComplexKeyGenerator") \
                       .option("hoodie.datasource.write.recordkey.field", 
"callguid,meeting_uuid") \
                       .option("hoodie.datasource.write.partitionpath.field", 
"creation_date") \
                       .option("hoodie.datasource.write.precombine.field", 
"cdctimestamp") \
                       
.option("hoodie.datasource.hive_sync.jdbcurl","jdbc:hive2://ip-10-11-4-248.corp.bluejeans.com:10000")
 \
                       .option("hoodie.datasource.hive_sync.enable", "true") \
                       
.option("hoodie.datasource.hive_sync.assume_date_partitioning", "false") \
                       .option("hoodie.datasource.hive_sync.database", 
"default") \
                       
.option("hoodie.datasource.hive_sync.partition_extractor_class","org.apache.hudi.hive.MultiPartKeysValueExtractor")
 \
                       
.option("hoodie.datasource.hive_sync.partition_fields","creation_date") \
                       .option("hoodie.datasource.hive_sync.table",tableName) \
                       
.option("hoodie.datasource.write.payload.class","com.bluejeans.bi.hudi.nonnull.NonNullMergePayload")
 \
                       
.option("hoodie.datasource.write.hive_style_partitioning", "true") \
                       .option("hoodie.compact.inline.max.delta.commits", "20") 
\
                       .option("hoodie.upsert.shuffle.parallelism", "1500") \
                       .option("hoodie.insert.shuffle.parallelism", "1500") \
                       .option("hoodie.index.type","BLOOM") \
                       
.option("hoodie.logfile.to.parquet.compression.ratio","0.35") \
                       
.option("DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY","true") \
                       .mode("append").save(basePath)
   ```
   **2. Try to read it from the pyspark shell.**
   **Step 1 -** To launch pyspark shell, following command.
   ```
   pyspark --jars 
/usr/lib/spark/external/lib/spark-avro.jar,/usr/share/aws/aws-java-sdk/aws-java-sdk-glue-1.11.759.jar,/usr/share/aws/hmclient/lib/aws-glue-datacatalog-hive2-client-1.12.0.jar,s3://bjnbi-emr-bootstrap/bi-nonnull-merge-1.0.1-SNAPSHOT.jar
 --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.3 --conf 
spark.sql.hive.convertMetastoreParquet=false --num-executors 10 --driver-memory 
7g --executor-memory 2g  --master yarn-client
   ```
   **Step 2 -** 
   ```
   from pyspark import SparkContext
   from pyspark import SQLContext
   from pyspark import SparkConf
   from pyspark.sql import SparkSession
   from pyspark.sql.types import *
   from pyspark.sql import functions as F
   from pyspark.sql.window import Window
   from pyspark import StorageLevel
   from datetime import datetime
   import time
   from pyspark.sql.functions import lit
   from pyspark.sql import Row
   from pyspark.sql import HiveContext
   
   sc_conf = SparkConf()
   sc =SparkContext()
   sqlContext = SQLContext(sc)
   
   #this works
   sqlContext.sql("show tables").show(truncate = False)
   # this throws error
   sqlContext.sql("select count(*) from endpoints_rt where 
creation_date='2020-09-14'").show(truncate = False)
   ```
   3. If the above line of code runs, try to run it agian after a couple of 
minutes. It fails after the second or third time.
   4. The above code is based on spark sql. I tried the same with Hivecontext 
too. There was no difference in the behaviour.
   
   
   **Expected behavior**
   
   I would need the query to run on the latest snapshot data and provide the 
output without error
   
   **Environment Description**
   
   * Hudi version : 0.5.3
   
   * Spark version : 2.4.4
   
   * Hive version : 2.3.6-amzn-0
   
   * Hadoop version : 2.8.5-amzn-5
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   I was getting duplicates while trying to read as a hudi dataframe and 
register a temp view and run sql. that is the reason to try this way to read 
the hudi table.
   If there is any other optimal way to read it, please suggest.
   
   **Stacktrace**
   
   ```
   >>> sqlContext.sql("select count(*) from endpoints_rt where 
creation_date='2020-09-14'").show(truncate = False)
   20/09/14 17:34:01 WARN Utils: Truncated the string representation of a plan 
since it was too large. This behavior can be adjusted by setting 
'spark.debug.maxToStringFields' in SparkEnv.conf.
   20/09/14 17:34:01 WARN HiveConf: HiveConf of name hive.server2.thrift.url 
does not exist
   [Stage 0:>                                                       (0 + 24) / 
288]20/09/14 17:37:16 WARN TaskSetManager: Lost task 13.0 in stage 0.0 (TID 13, 
ip-10-11-4-110.corp.bluejeans.com, executor 8): 
org.apache.hudi.exception.HoodieIOException: IOException when reading log file 
        at 
org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:244)
        at 
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:81)
        at 
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.getMergedLogRecordScanner(RealtimeCompactedRecordReader.java:69)
        at 
org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.<init>(RealtimeCompactedRecordReader.java:52)
        at 
org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.constructRecordReader(HoodieRealtimeRecordReader.java:69)
        at 
org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.<init>(HoodieRealtimeRecordReader.java:47)
        at 
org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat.getRecordReader(HoodieParquetRealtimeInputFormat.java:253)
        at 
org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:267)
        at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:266)
        at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:224)
        at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:95)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   ```
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to