harishchanderramesh opened a new issue #2089: URL: https://github.com/apache/hudi/issues/2089
**Describe the problem you faced** My MOR Hudi Table is on S3. I would need to query the table frequently ( say once every minute ) from spark sql. I use https://hudi.apache.org/docs/querying_data.html#spark-sql and https://hudi.apache.org/docs/querying_data.html#hive both the methods to read as I register my hudi table to hive as well. I am facing the following problem intermittently ( Mostly ). ERROR: org.apache.hudi.exception.HoodieIOException: IOException when reading log file **To Reproduce** **_Steps to reproduce the behavior:_** **1. Create a MOR table on S3 with following configurations.** ``` df.write.format("org.apache.hudi") \ .option("hoodie.table.name", tableName) \ .option("hoodie.datasource.write.table.name", tableName) \ .option("hoodie.datasource.write.operation", "upsert") \ .option("hoodie.datasource.write.table.type", "MERGE_ON_READ") \ .option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.ComplexKeyGenerator") \ .option("hoodie.datasource.write.recordkey.field", "callguid,meeting_uuid") \ .option("hoodie.datasource.write.partitionpath.field", "creation_date") \ .option("hoodie.datasource.write.precombine.field", "cdctimestamp") \ .option("hoodie.datasource.hive_sync.jdbcurl","jdbc:hive2://ip-10-11-4-248.corp.bluejeans.com:10000") \ .option("hoodie.datasource.hive_sync.enable", "true") \ .option("hoodie.datasource.hive_sync.assume_date_partitioning", "false") \ .option("hoodie.datasource.hive_sync.database", "default") \ .option("hoodie.datasource.hive_sync.partition_extractor_class","org.apache.hudi.hive.MultiPartKeysValueExtractor") \ .option("hoodie.datasource.hive_sync.partition_fields","creation_date") \ .option("hoodie.datasource.hive_sync.table",tableName) \ .option("hoodie.datasource.write.payload.class","com.bluejeans.bi.hudi.nonnull.NonNullMergePayload") \ .option("hoodie.datasource.write.hive_style_partitioning", "true") \ .option("hoodie.compact.inline.max.delta.commits", "20") \ .option("hoodie.upsert.shuffle.parallelism", "1500") \ .option("hoodie.insert.shuffle.parallelism", "1500") \ .option("hoodie.index.type","BLOOM") \ .option("hoodie.logfile.to.parquet.compression.ratio","0.35") \ .option("DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY","true") \ .mode("append").save(basePath) ``` **2. Try to read it from the pyspark shell.** **Step 1 -** To launch pyspark shell, following command. ``` pyspark --jars /usr/lib/spark/external/lib/spark-avro.jar,/usr/share/aws/aws-java-sdk/aws-java-sdk-glue-1.11.759.jar,/usr/share/aws/hmclient/lib/aws-glue-datacatalog-hive2-client-1.12.0.jar,s3://bjnbi-emr-bootstrap/bi-nonnull-merge-1.0.1-SNAPSHOT.jar --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.3 --conf spark.sql.hive.convertMetastoreParquet=false --num-executors 10 --driver-memory 7g --executor-memory 2g --master yarn-client ``` **Step 2 -** ``` from pyspark import SparkContext from pyspark import SQLContext from pyspark import SparkConf from pyspark.sql import SparkSession from pyspark.sql.types import * from pyspark.sql import functions as F from pyspark.sql.window import Window from pyspark import StorageLevel from datetime import datetime import time from pyspark.sql.functions import lit from pyspark.sql import Row from pyspark.sql import HiveContext sc_conf = SparkConf() sc =SparkContext() sqlContext = SQLContext(sc) #this works sqlContext.sql("show tables").show(truncate = False) # this throws error sqlContext.sql("select count(*) from endpoints_rt where creation_date='2020-09-14'").show(truncate = False) ``` 3. If the above line of code runs, try to run it agian after a couple of minutes. It fails after the second or third time. 4. The above code is based on spark sql. I tried the same with Hivecontext too. There was no difference in the behaviour. **Expected behavior** I would need the query to run on the latest snapshot data and provide the output without error **Environment Description** * Hudi version : 0.5.3 * Spark version : 2.4.4 * Hive version : 2.3.6-amzn-0 * Hadoop version : 2.8.5-amzn-5 * Storage (HDFS/S3/GCS..) : S3 * Running on Docker? (yes/no) : no **Additional context** I was getting duplicates while trying to read as a hudi dataframe and register a temp view and run sql. that is the reason to try this way to read the hudi table. If there is any other optimal way to read it, please suggest. **Stacktrace** ``` >>> sqlContext.sql("select count(*) from endpoints_rt where creation_date='2020-09-14'").show(truncate = False) 20/09/14 17:34:01 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf. 20/09/14 17:34:01 WARN HiveConf: HiveConf of name hive.server2.thrift.url does not exist [Stage 0:> (0 + 24) / 288]20/09/14 17:37:16 WARN TaskSetManager: Lost task 13.0 in stage 0.0 (TID 13, ip-10-11-4-110.corp.bluejeans.com, executor 8): org.apache.hudi.exception.HoodieIOException: IOException when reading log file at org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:244) at org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner.<init>(HoodieMergedLogRecordScanner.java:81) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.getMergedLogRecordScanner(RealtimeCompactedRecordReader.java:69) at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.<init>(RealtimeCompactedRecordReader.java:52) at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.constructRecordReader(HoodieRealtimeRecordReader.java:69) at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.<init>(HoodieRealtimeRecordReader.java:47) at org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat.getRecordReader(HoodieParquetRealtimeInputFormat.java:253) at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:267) at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:266) at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:224) at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:95) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346) at org.apache.spark.rdd.RDD.iterator(RDD.scala:310) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
