zherenyu831 opened a new issue #2041:
URL: https://github.com/apache/hudi/issues/2041


   **_Tips before filing an issue_**
   
   - Have you gone through our 
[FAQs](https://cwiki.apache.org/confluence/display/HUDI/FAQ)?
   
   - Join the mailing list to engage in conversations and get faster support at 
[email protected].
   
   - If you have triaged this as a bug, then file an 
[issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   
   **Describe the problem you faced**
   
   First problem
   ```
   scala> val df2 = 
spark.read.format("org.apache.hudi").load("s3://daas-hudi-test/york_test/york_compaction_test2/*")
   java.lang.NoSuchMethodError: 
org.apache.spark.sql.execution.datasources.PartitionedFile.<init>(Lorg/apache/spark/sql/catalyst/InternalRow;Ljava/lang/String;JJ[Ljava/lang/String;)V
     at 
org.apache.hudi.MergeOnReadSnapshotRelation.$anonfun$buildFileIndex$1(MergeOnReadSnapshotRelation.scala:145)
     at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
     at scala.collection.Iterator.foreach(Iterator.scala:941)
     at scala.collection.Iterator.foreach$(Iterator.scala:941)
     at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
     at scala.collection.IterableLike.foreach(IterableLike.scala:74)
     at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
     at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
     at scala.collection.TraversableLike.map(TraversableLike.scala:238)
     at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
     at scala.collection.AbstractTraversable.map(Traversable.scala:108)
     at 
org.apache.hudi.MergeOnReadSnapshotRelation.buildFileIndex(MergeOnReadSnapshotRelation.scala:142)
     at 
org.apache.hudi.MergeOnReadSnapshotRelation.<init>(MergeOnReadSnapshotRelation.scala:72)
     at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:87)
     at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:51)
     at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
     at 
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
     at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
     at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
   ```
   
   Second Problem
   ```
   A needed class was not found. This could be due to an error in your runpath. 
Missing class: org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat
   java.lang.NoClassDefFoundError: 
org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat
        at java.lang.ClassLoader.defineClass1(Native Method)
        at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
        at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
        at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
        at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.ClassLoader.defineClass1(Native Method)
   ```
   As I checked with org.apache.hudi.hadoop.HoodieParquetInputFormat,  below 
imports are not solved in intellij
   ```
   import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
   import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
   import org.apache.hadoop.hive.ql.plan.TableScanDesc;
   ```
   
   **To Reproduce**
   
   Steps to reproduce the behavior of first problem
   1.  
   ```
   spark-shell --master yarn   --executor-cores 2   --packages 
org.apache.hudi:hudi-spark-bundle_2.12:0.6.0,org.apache.spark:spark-avro_2.12:2.4.4,org.apache.avro:avro:1.8.2
   --driver-memory 1G   --executor-memory 1G   --conf 
"spark.serializer=org.apache.spark.serializer.KryoSerializer"   --conf 
"spark.dynamicAllocation.minExecutors=1"   --conf 
"spark.dynamicAllocation.maxExecutors=1"   --conf "spark.executor.instances=1"  
 --conf spark.sql.hive.convertMetastoreParquet=false
   ```
   
   2.
   ```
   import java.sql.Timestamp
   import org.apache.hudi.DataSourceWriteOptions._
   import org.apache.hudi.common.model.HoodieCleaningPolicy
   import org.apache.hudi.config.HoodieCompactionConfig._
   import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME
   import org.apache.hudi.DataSourceWriteOptions._
   import org.apache.spark.sql.{SaveMode, SparkSession}
   import org.apache.hudi.hive.MultiPartKeysValueExtractor
   import spark.implicits._
   
   
   val tableName = "york_compaction_test2"
   val hudiOptions = Map[String,String](
         TABLE_NAME -> tableName,
         PRECOMBINE_FIELD_OPT_KEY -> "timestamp_column",
         RECORDKEY_FIELD_OPT_KEY -> "id",
         PARTITIONPATH_FIELD_OPT_KEY -> "date_string",
         TABLE_TYPE_OPT_KEY -> MOR_TABLE_TYPE_OPT_VAL,
         CLEANER_INCREMENTAL_MODE -> "true",
         CLEANER_POLICY_PROP -> 
HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name(),
         CLEANER_FILE_VERSIONS_RETAINED_PROP -> "2",
         "hoodie.compaction.strategy" -> 
"org.apache.hudi.table.compact.strategy.UnBoundedCompactionStrategy",
         "hoodie.upsert.shuffle.parallelism" -> "50",
         "hoodie.compact.inline" -> "true",
         "hoodie.compact.inline.max.delta.commits" -> "5",
         "hoodie.filesystem.view.incr.timeline.sync.enable" -> "true",
         HIVE_URL_OPT_KEY -> 
"jdbc:hive2://ip-10-221-36-98.ap-northeast-1.compute.internal:10000",
         HIVE_SYNC_ENABLED_OPT_KEY -> "true", 
         HIVE_DATABASE_OPT_KEY -> "default", 
         HIVE_TABLE_OPT_KEY -> tableName,
         HIVE_PARTITION_FIELDS_OPT_KEY -> "date_string",
         HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> 
classOf[MultiPartKeysValueExtractor].getName,
       )
   val dateSMap: Map[Int, String] = Map(
       0-> "2020-07",
       1-> "2020-08",
       2-> "2020-09",
   )
   val dateMap: Map[Int, Timestamp] = Map(
       0-> Timestamp.valueOf("2010-07-01 11:00:15"),
       1-> Timestamp.valueOf("2010-08-01 11:00:15"),
       2-> Timestamp.valueOf("2010-09-01 11:00:15"),
   )
   var seq = Seq(
       (0, "value", dateMap(0), dateSMap(0))
   )
   for(i <- 1 to 5000) {
       seq :+= (i, "value", dateMap(i % 3), dateSMap(i % 3))
   }
   
   val df = seq.toDF("id", "string_column", "timestamp_column", 
"date_string").withColumn("timestamp_column", 
date_format(col("timestamp_column"), "yyyy-MM-dd HH:mm:ss"))
   
   
df.write.format("hudi").options(hudiOptions).mode(SaveMode.Overwrite).save(s"s3a://daas-hudi-test/york_test/$tableName")
   
   val df2 = 
spark.read.format("org.apache.hudi").load("s3://daas-hudi-test/york_test/york_compaction_test2/*")
   
   ```
   
   Steps to reproduce the behavior of second problem
   1. pom
   ```
   val sparkVersion = "2.4.4"
   
   libraryDependencies ++= Seq(
     "org.apache.spark" %% "spark-core" % sparkVersion % Provided,
     "org.apache.spark" %% "spark-sql" % sparkVersion % Provided,
     "org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion,
   
     // Needed for Hudi
     // NB: we use a version of Hudi that is published locally and that 
contains a patch which fixes compaction:
     // https://github.com/apache/hudi/pull/1857
     "org.apache.hudi" %% "hudi-spark-bundle" % "0.6.0" % Provided,
     "org.apache.avro" % "avro" % "1.8.2" % Provided,
     "org.apache.spark" %% "spark-avro" % sparkVersion % Provided,
   )
   ```
   
   2. code
   ```
   val tblName: String = "dummy"
     val viewName:String = tblName + "_view"
     val primaryKey: String = "id"
     val preCombineKey: String = Constants.TS_FIELD
     val partitionField: String = "age"
     val basePath: String = "/tmp/hudi-tests/"
     val hudiOptions: Map[String, String] = Map(
       TABLE_NAME -> tblName,
       TABLE_TYPE_OPT_KEY -> MOR_TABLE_TYPE_OPT_VAL,
       RECORDKEY_FIELD_OPT_KEY -> primaryKey,
       PRECOMBINE_FIELD_OPT_KEY -> preCombineKey,
       PARTITIONPATH_FIELD_OPT_KEY -> partitionField,
       "hoodie.insert.shuffle.parallelism" -> "1",
       "hoodie.bulkinsert.shuffle.parallelism" -> "1",
       "hoodie.upsert.shuffle.parallelism" -> "1",
       "hoodie.datasource.compaction.async.enable" -> "true",
       "hoodie.compact.inline.max.delta.commits" -> "1"
     )
   
      spark = SparkSession.builder()
         .master("local[*]")
         .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
         .config("spark.sql.shuffle.partitions", "1")
         .config("spark.ui.enabled", "false")
         .appName("hudi-processing-test")
         .getOrCreate()
       spark.sparkContext.setLogLevel("warn")
   
       import spark.implicits._
   
       val df = Seq (
         (1, "Bob", 10, Timestamp.valueOf("2020-06-27 00:00:00"), 100.toLong, 
BinlogEventType.Insert.value),
         (1, "BobTheBuilder", 10, Timestamp.valueOf("2020-06-27 00:00:00"), 
101.toLong, BinlogEventType.Update.value)
       ).toDF("id", "name", "age", "created_at", Constants.TS_FIELD, 
Constants.EVENT_TYPE_FIELD)
   
       df
         .write
         .format("hudi")
         .options(hudiOptions + ((PAYLOAD_CLASS_OPT_KEY, 
EventsWriter.DEFAULT_PAYLOAD_CLASS)))
         .mode(SaveMode.Append)
         .save(basePath)
   
       spark.read
         .format("hudi")
         .load(basePath + "/*/")
         .createOrReplaceTempView(viewName)
   ```
   
   **Expected behavior**
   
   A clear and concise description of what you expected to happen.
   
   **Environment Description**
   
   * Hudi version : 0.6.0
   
   * Spark version : 2.4.4
   
   * Hive version : 3.1.2
   
   * Hadoop version : 3.2.1
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : NO
   
   
   **Additional context**
   
   Add any other context about the problem here.
   
   **Stacktrace**
   
   ```Add the stacktrace of the error.```
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to