vortual opened a new issue #3919:
URL: https://github.com/apache/hudi/issues/3919


   **Describe the problem you faced**
   hudi version: 0.9.0
   hudi版本:0.9.0
   
   table type:  merge on read
   表类型:merge on read
   
   when the log file haven't compact,the base file haven't generate yet. spark 
query the table have an error
   当log文件还没被压缩生成对应的basefile时,spark查询会报错
   
   Error log:
   `org.apache.hudi.exception.HoodieException: Error obtaining data file/log 
file grouping
     at 
org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils.getRealtimeSplits(HoodieRealtimeInputFormatUtils.java:156)
     at 
org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat.getSplits(HoodieParquetRealtimeInputFormat.java:69)
     at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
     at scala.Option.getOrElse(Option.scala:121)
     at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
     at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
     at scala.Option.getOrElse(Option.scala:121)
     at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
     at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
     at scala.Option.getOrElse(Option.scala:121)
     at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
     at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:84)
     at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:84)
     at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
     at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
     at scala.collection.immutable.List.foreach(List.scala:381)
     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
     at scala.collection.immutable.List.map(List.scala:285)
     at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:84)
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
     at scala.Option.getOrElse(Option.scala:121)
     at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
     at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
     at scala.Option.getOrElse(Option.scala:121)
     at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
     at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
     at scala.Option.getOrElse(Option.scala:121)
     at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
     at 
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:314)
     at 
org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
     at 
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2861)
     at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2150)
     at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2150)
     at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2842)
     at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
     at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2841)
     at org.apache.spark.sql.Dataset.head(Dataset.scala:2150)
     at org.apache.spark.sql.Dataset.take(Dataset.scala:2363)
     at org.apache.spark.sql.Dataset.showString(Dataset.scala:241)
     at org.apache.spark.sql.Dataset.show(Dataset.scala:637)
     at org.apache.spark.sql.Dataset.show(Dataset.scala:596)
     at org.apache.spark.sql.Dataset.show(Dataset.scala:605)
     ... 48 elided
   Caused by: java.lang.NullPointerException
     at 
org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils.lambda$null$8(HoodieRealtimeInputFormatUtils.java:132)
     at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
     at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
     at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
     at 
java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
     at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
     at 
java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
     at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
     at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
     at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
     at 
java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
     at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
     at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
     at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
     at 
org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils.lambda$getRealtimeSplits$9(HoodieRealtimeInputFormatUtils.java:129)
     at java.util.HashMap$KeySet.forEach(HashMap.java:933)
     at 
org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils.getRealtimeSplits(HoodieRealtimeInputFormatUtils.java:102)
     ... 100 more
   `
   
   **To Reproduce**
   1. create kafka source
   `CREATE TABLE kafka_source(
    user_id STRING,
    order_amount BIGINT,
    log_ts TIMESTAMP(3),
    part STRING
    )WITH(
    'connector' = 'kafka',
    'topic' = 'flink_on_hudi_zrm',
    'properties.bootstrap.servers' = 'node2:6667',
    'scan.startup.mode'='earliest-offset',
    'properties.group.id' = 'testGroup',
    'format' = 'json'
   );`
   2.sync to hudi from kafka
   `CREATE TABLE kafka_source_hudi(
     user_id VARCHAR(20),
     order_amount BIGINT,
     log_ts TIMESTAMP(3),
     `part` VARCHAR(20)
   )
   PARTITIONED BY (`part`)
   WITH (
     'connector' = 'hudi',
     'path' = 'hdfs:///apps/hive/warehouse/test.db/kafka_source_hudi',
     'table.type' = 'MERGE_ON_READ',
     'write.bucket_assign.tasks' = '2',
     'write.precombine.field' = 'log_ts',
     'write.tasks' = '2',
     'hive_sync.enable' = 'true',
     'hive_sync.mode' = 'hms',
     'hive_sync.metastore.uris' = 'thrift://node7:9083',
     'hoodie.datasource.write.recordkey.field' = 'user_id',
     'compaction.tasks' = '4',
     'compaction.delta_commits' = '3'
   );`
   `insert into kafka_source_hudi select  * from  kafka_source;`
   3. create external table
   `CREATE EXTERNAL TABLE test.kafka_source_hudi_spark(     
     `_hoodie_commit_time` string,                    
      `_hoodie_commit_seqno` string,                   
      `_hoodie_record_key` string,                     
      `_hoodie_partition_path` string,                 
      `_hoodie_file_name` string,                
      user_id string,
     order_amount BIGINT,
     log_ts bigint)                                     
    PARTITIONED BY (                                   
      `part` string)                              
    ROW FORMAT SERDE                                   
      'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'  
    STORED AS INPUTFORMAT                              
      'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat' 
    OUTPUTFORMAT                                       
      'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
    LOCATION                                           
      'hdfs:///apps/hive/warehouse/test.db/kafka_source_hudi';
   
   alter table test.kafka_source_hudi_spark add if not exists 
partition(`part`='par1') location 
'hdfs:///apps/hive/warehouse/test.db/kafka_source_hudi/par1';
   alter table test.kafka_source_hudi_spark add if not exists 
partition(`part`='par2') location 
'hdfs:///apps/hive/warehouse/test.db/kafka_source_hudi/par2';`
   4.query the external table
   `select * from test.kafka_source_hudi_spark limit 10`
   
   **Expected behavior**
   
   A clear and concise description of what you expected to happen.
   
   **Environment Description**
   
   * Hudi version :0.9.0
   
   * Spark version :2.2.0
   
   * Hive version :1.2.1000
   
   * Hadoop version :2.7.3
   
   * Storage (HDFS/S3/GCS..) :HDFS
   
   * Running on Docker? (yes/no) :NO
   
   
   **Additional context**
   
   Add any other context about the problem here.
   
   **Stacktrace**
   
   ```Add the stacktrace of the error.```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to