vortual opened a new issue #3919:
URL: https://github.com/apache/hudi/issues/3919
**Describe the problem you faced**
hudi version: 0.9.0
hudi版本:0.9.0
table type: merge on read
表类型:merge on read
when the log file haven't compact,the base file haven't generate yet. spark
query the table have an error
当log文件还没被压缩生成对应的basefile时,spark查询会报错
Error log:
`org.apache.hudi.exception.HoodieException: Error obtaining data file/log
file grouping
at
org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils.getRealtimeSplits(HoodieRealtimeInputFormatUtils.java:156)
at
org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat.getSplits(HoodieParquetRealtimeInputFormat.java:69)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:84)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:84)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:84)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:314)
at
org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2861)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2150)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2150)
at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2842)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2841)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2150)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2363)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:241)
at org.apache.spark.sql.Dataset.show(Dataset.scala:637)
at org.apache.spark.sql.Dataset.show(Dataset.scala:596)
at org.apache.spark.sql.Dataset.show(Dataset.scala:605)
... 48 elided
Caused by: java.lang.NullPointerException
at
org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils.lambda$null$8(HoodieRealtimeInputFormatUtils.java:132)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at
java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at
java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at
org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils.lambda$getRealtimeSplits$9(HoodieRealtimeInputFormatUtils.java:129)
at java.util.HashMap$KeySet.forEach(HashMap.java:933)
at
org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils.getRealtimeSplits(HoodieRealtimeInputFormatUtils.java:102)
... 100 more
`
**To Reproduce**
1. create kafka source
`CREATE TABLE kafka_source(
user_id STRING,
order_amount BIGINT,
log_ts TIMESTAMP(3),
part STRING
)WITH(
'connector' = 'kafka',
'topic' = 'flink_on_hudi_zrm',
'properties.bootstrap.servers' = 'node2:6667',
'scan.startup.mode'='earliest-offset',
'properties.group.id' = 'testGroup',
'format' = 'json'
);`
2.sync to hudi from kafka
`CREATE TABLE kafka_source_hudi(
user_id VARCHAR(20),
order_amount BIGINT,
log_ts TIMESTAMP(3),
`part` VARCHAR(20)
)
PARTITIONED BY (`part`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs:///apps/hive/warehouse/test.db/kafka_source_hudi',
'table.type' = 'MERGE_ON_READ',
'write.bucket_assign.tasks' = '2',
'write.precombine.field' = 'log_ts',
'write.tasks' = '2',
'hive_sync.enable' = 'true',
'hive_sync.mode' = 'hms',
'hive_sync.metastore.uris' = 'thrift://node7:9083',
'hoodie.datasource.write.recordkey.field' = 'user_id',
'compaction.tasks' = '4',
'compaction.delta_commits' = '3'
);`
`insert into kafka_source_hudi select * from kafka_source;`
3. create external table
`CREATE EXTERNAL TABLE test.kafka_source_hudi_spark(
`_hoodie_commit_time` string,
`_hoodie_commit_seqno` string,
`_hoodie_record_key` string,
`_hoodie_partition_path` string,
`_hoodie_file_name` string,
user_id string,
order_amount BIGINT,
log_ts bigint)
PARTITIONED BY (
`part` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
'hdfs:///apps/hive/warehouse/test.db/kafka_source_hudi';
alter table test.kafka_source_hudi_spark add if not exists
partition(`part`='par1') location
'hdfs:///apps/hive/warehouse/test.db/kafka_source_hudi/par1';
alter table test.kafka_source_hudi_spark add if not exists
partition(`part`='par2') location
'hdfs:///apps/hive/warehouse/test.db/kafka_source_hudi/par2';`
4.query the external table
`select * from test.kafka_source_hudi_spark limit 10`
**Expected behavior**
A clear and concise description of what you expected to happen.
**Environment Description**
* Hudi version :0.9.0
* Spark version :2.2.0
* Hive version :1.2.1000
* Hadoop version :2.7.3
* Storage (HDFS/S3/GCS..) :HDFS
* Running on Docker? (yes/no) :NO
**Additional context**
Add any other context about the problem here.
**Stacktrace**
```Add the stacktrace of the error.```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]