stackfun opened a new issue #2362:
URL: https://github.com/apache/hudi/issues/2362
Incremental Query fails on MOR Table using hive sql if there are partitions
that have no incremental changes.
Environment:
Hudi 0.6.0
GCP Dataproc 1.4
Pyspark 2.4.5
Also reproducible in local docker-based setup with hdfs
Repro Code:
```python
output_path = "hdfs://namenode:8020/usr/hive/warehouse/hudi_demo"
import time
from datetime import datetime
from pyspark.sql import functions
# Generate Hudi Trips Data
dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
inserts =
sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
df = spark.read.json(sc.parallelize(inserts, 2))
# Insert in Hudi Format (MOR)
hudi_options = {
"hoodie.table.name": "hudi_trips",
"hoodie.datasource.write.recordkey.field": "uuid",
"hoodie.datasource.write.partitionpath.field": "partitionpath",
"hoodie.datasource.write.table.name": "hudi_trips",
"hoodie.datasource.write.table.type": "MERGE_ON_READ",
"hoodie.datasource.write.precombine.field": "ts",
"hoodie.compact.inline": True,
"hoodie.datasource.hive_sync.enable": True,
"hoodie.datasource.hive_sync.database": "default",
"hoodie.datasource.hive_sync.table": "hudi_trips",
"hoodie.datasource.hive_sync.username": "hive",
"hoodie.datasource.hive_sync.password": "hive",
"hoodie.datasource.hive_sync.jdbcurl": "jdbc:hive2://hiveserver:10000",
}
df.write.format("hudi").options(**hudi_options).mode("overwrite").save(output_path)
# Update Part of the Data and Upsert
before_update = datetime.now()
time.sleep(1)
df = df.limit(1)
df = df.withColumn("fare", functions.col("fare") + 1)
df.write.format("hudi").options(**hudi_options).mode("append").save(output_path)
# Incremental Query
hiveCtx = HiveContext(sc)
hiveCtx.sql('set hoodie.hudi_trips.consume.mode=INCREMENTAL')
hiveCtx.sql(f'set
hoodie.hudi_trips.consume.start.timestamp={before_update.strftime("%Y%m%d%H%M%S")}')
hiveCtx.sql('select * from default.hudi_trips_ro limit 5').show() # Fails
here
```
Here's the error
```
Py4JJavaError: An error occurred while calling o3474.showString.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute,
tree:
Exchange SinglePartition
+- *(1) LocalLimit 5
+- Scan hive default.hudi_trips_ro [_hoodie_commit_time#6501,
_hoodie_commit_seqno#6502, _hoodie_record_key#6503,
_hoodie_partition_path#6504, _hoodie_file_name#6505, begin_lat#6506,
begin_lon#6507, driver#6508, end_lat#6509, end_lon#6510, fare#6511,
partitionpath#6512, rider#6513, ts#6514, uuid#6515, continent#6516,
country#6517, city#6518], HiveTableRelation `default`.`hudi_trips_ro`,
org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe,
[_hoodie_commit_time#6501, _hoodie_commit_seqno#6502, _hoodie_record_key#6503,
_hoodie_partition_path#6504, _hoodie_file_name#6505, begin_lat#6506,
begin_lon#6507, driver#6508, end_lat#6509, end_lon#6510, fare#6511,
partitionpath#6512, rider#6513, ts#6514, uuid#6515], [continent#6516,
country#6517, city#6518]
at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
at
org.apache.spark.sql.execution.BaseLimitExec$class.inputRDDs(limit.scala:62)
at
org.apache.spark.sql.execution.GlobalLimitExec.inputRDDs(limit.scala:108)
at
org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
at
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
at
org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
at
org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
at
org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
at
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
at
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3369)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
at sun.reflect.GeneratedMethodAccessor284.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Can not create a Path from an
empty string
at org.apache.hadoop.fs.Path.checkPathArg(Path.java:126)
at org.apache.hadoop.fs.Path.<init>(Path.java:134)
at org.apache.hadoop.util.StringUtils.stringToPath(StringUtils.java:245)
at
org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:411)
at
org.apache.hudi.hadoop.HoodieParquetInputFormat.listStatusForIncrementalMode(HoodieParquetInputFormat.java:152)
at
org.apache.hudi.hadoop.HoodieParquetInputFormat.listStatus(HoodieParquetInputFormat.java:95)
at
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:84)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:84)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:296)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:84)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:94)
at
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:323)
at
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:91)
at
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
at
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
... 43 more
```
I'm not too familiar with the hudi code base, but I think
`HoodieInputFormatUtils.getAffectedPartitions` should return Option.empty() if
the partition string is an empty string.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]