Hi,

I am using hudi datasource writer to write data using parquet. I have
created a test table I am reading that table and using driver as record
level key and creating a new table test2. I am doing this process twice. So
when I run it second time it puts all log files in the directory 2015/03/16.

After running code twice my directory structure looks like this

*2015/03/16*
.34146665-f851-488c-a71b-6a7d93097652_20190527124832.log.1
.5a7b4fff-43b2-49f7-a920-73ae693f6bac_20190527123959.log.1
.7972ab32-f7e1-425d-bf11-f51237159a86_20190527124832.log.1
.hoodie_partition_metadata
5a7b4fff-43b2-49f7-a920-73ae693f6bac_1_20190527123959.parquet

*2015/03/17*
.hoodie_partition_metadata
7972ab32-f7e1-425d-bf11-f51237159a86_2_20190527123959.parquet

*2016/03/15*
.hoodie_partition_metadata
34146665-f851-488c-a71b-6a7d93097652_0_20190527123959.parquet

Due to this I am facing error like parquet file not found while running
compaction.
 I am including my code here for your reference.Thanks

object write {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
      .master("local")
      .appName("KafkaHTrial")
      .enableHiveSupport()
      .getOrCreate()


    val fields:List[String]=List("begin_lat", "begin_lon", "driver",
"end_lat", "end_lon", "fare", "partition", "rider","timestamp")
    val cols=fields.map(col)

    val hoodieROViewDF = spark.read.format("com.uber.hoodie").load("hdfs://
a.com:9000/user/hive/warehouse/test/*/*/*/*")

    val l=hoodieROViewDF.select(cols:_*)

    l.write.format("com.uber.hoodie")
      .option("hoodie.compact.inline", "false")

.option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY,DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "driver")
      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,
"partition")
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
      .option(HoodieWriteConfig.TABLE_NAME, "test2")
      .mode(SaveMode.Append).save("hdfs://
a.com:9000/user/hive/warehouse/test2")

  }


}

Reply via email to