stackfun opened a new issue #2362:
URL: https://github.com/apache/hudi/issues/2362


   Incremental Query fails on MOR Table using hive sql if there are partitions 
that have no incremental changes.
   
   Environment:
   Hudi 0.6.0
   GCP Dataproc 1.4 
   Pyspark 2.4.5
   
   Also reproducible in local docker-based setup with hdfs
   
   
   Repro Code:
   ```python
   output_path = "hdfs://namenode:8020/usr/hive/warehouse/hudi_demo"
   
   import time
   from datetime import datetime
   from pyspark.sql import functions
   
   # Generate Hudi Trips Data
   dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
   inserts = 
sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
   df = spark.read.json(sc.parallelize(inserts, 2))
   
   # Insert in Hudi Format (MOR)
   hudi_options = {
       "hoodie.table.name": "hudi_trips",
       "hoodie.datasource.write.recordkey.field": "uuid",
       "hoodie.datasource.write.partitionpath.field": "partitionpath",
       "hoodie.datasource.write.table.name": "hudi_trips",
       "hoodie.datasource.write.table.type": "MERGE_ON_READ",
       "hoodie.datasource.write.precombine.field": "ts",
       "hoodie.compact.inline": True,
       "hoodie.datasource.hive_sync.enable": True,
       "hoodie.datasource.hive_sync.database": "default",
       "hoodie.datasource.hive_sync.table": "hudi_trips",
       "hoodie.datasource.hive_sync.username": "hive",
       "hoodie.datasource.hive_sync.password": "hive",
       "hoodie.datasource.hive_sync.jdbcurl": "jdbc:hive2://hiveserver:10000",
   }
   
df.write.format("hudi").options(**hudi_options).mode("overwrite").save(output_path)
   
   # Update Part of the Data and Upsert
   before_update = datetime.now()
   time.sleep(1)
   df = df.limit(1)
   df = df.withColumn("fare", functions.col("fare") + 1)
   
df.write.format("hudi").options(**hudi_options).mode("append").save(output_path)
   
   # Incremental Query
   hiveCtx = HiveContext(sc)
   hiveCtx.sql('set hoodie.hudi_trips.consume.mode=INCREMENTAL')
   hiveCtx.sql(f'set 
hoodie.hudi_trips.consume.start.timestamp={before_update.strftime("%Y%m%d%H%M%S")}')
   hiveCtx.sql('select * from default.hudi_trips_ro limit 5').show() # Fails 
here
   ```
   
   Here's the error
   ```
   Py4JJavaError: An error occurred while calling o3474.showString.
   : org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, 
tree:
   Exchange SinglePartition
   +- *(1) LocalLimit 5
      +- Scan hive default.hudi_trips_ro [_hoodie_commit_time#6501, 
_hoodie_commit_seqno#6502, _hoodie_record_key#6503, 
_hoodie_partition_path#6504, _hoodie_file_name#6505, begin_lat#6506, 
begin_lon#6507, driver#6508, end_lat#6509, end_lon#6510, fare#6511, 
partitionpath#6512, rider#6513, ts#6514, uuid#6515, continent#6516, 
country#6517, city#6518], HiveTableRelation `default`.`hudi_trips_ro`, 
org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe, 
[_hoodie_commit_time#6501, _hoodie_commit_seqno#6502, _hoodie_record_key#6503, 
_hoodie_partition_path#6504, _hoodie_file_name#6505, begin_lat#6506, 
begin_lon#6507, driver#6508, end_lat#6509, end_lon#6510, fare#6511, 
partitionpath#6512, rider#6513, ts#6514, uuid#6515], [continent#6516, 
country#6517, city#6518]
   
        at 
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
        at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
        at 
org.apache.spark.sql.execution.BaseLimitExec$class.inputRDDs(limit.scala:62)
        at 
org.apache.spark.sql.execution.GlobalLimitExec.inputRDDs(limit.scala:108)
        at 
org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
        at 
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
        at 
org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
        at 
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
        at 
org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
        at 
org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
        at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
        at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
        at 
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3369)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
        at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
        at sun.reflect.GeneratedMethodAccessor284.invoke(Unknown Source)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: java.lang.IllegalArgumentException: Can not create a Path from an 
empty string
        at org.apache.hadoop.fs.Path.checkPathArg(Path.java:126)
        at org.apache.hadoop.fs.Path.<init>(Path.java:134)
        at org.apache.hadoop.util.StringUtils.stringToPath(StringUtils.java:245)
        at 
org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:411)
        at 
org.apache.hudi.hadoop.HoodieParquetInputFormat.listStatusForIncrementalMode(HoodieParquetInputFormat.java:152)
        at 
org.apache.hudi.hadoop.HoodieParquetInputFormat.listStatus(HoodieParquetInputFormat.java:95)
        at 
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:204)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
        at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
        at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
        at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:84)
        at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:84)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.immutable.List.map(List.scala:296)
        at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:84)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
        at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
        at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
        at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
        at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:94)
        at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:323)
        at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:91)
        at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
        at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
        at 
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
        ... 43 more
   ```
   
   I'm not too familiar with the hudi code base, but I think 
`HoodieInputFormatUtils.getAffectedPartitions` should return Option.empty() if 
the partition string is an empty string.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to