Hi, I had the same issue before. The problem is save and read are using different threads. When the read thread reads the file that was not completely finished saving, you will have a parquet file not found error. Add Thread.sleep(1000) between save and read could solve the problem in a hacky way.
On Mon, May 27, 2019 at 12:44 AM Jaimin Shah <[email protected]> wrote: > Hi, > > I am using hudi datasource writer to write data using parquet. I have > created a test table I am reading that table and using driver as record > level key and creating a new table test2. I am doing this process twice. So > when I run it second time it puts all log files in the directory > 2015/03/16. > > After running code twice my directory structure looks like this > > *2015/03/16* > .34146665-f851-488c-a71b-6a7d93097652_20190527124832.log.1 > .5a7b4fff-43b2-49f7-a920-73ae693f6bac_20190527123959.log.1 > .7972ab32-f7e1-425d-bf11-f51237159a86_20190527124832.log.1 > .hoodie_partition_metadata > 5a7b4fff-43b2-49f7-a920-73ae693f6bac_1_20190527123959.parquet > > *2015/03/17* > .hoodie_partition_metadata > 7972ab32-f7e1-425d-bf11-f51237159a86_2_20190527123959.parquet > > *2016/03/15* > .hoodie_partition_metadata > 34146665-f851-488c-a71b-6a7d93097652_0_20190527123959.parquet > > Due to this I am facing error like parquet file not found while running > compaction. > I am including my code here for your reference.Thanks > > object write { > def main(args: Array[String]): Unit = { > val spark = SparkSession.builder().config("spark.serializer", > "org.apache.spark.serializer.KryoSerializer") > .master("local") > .appName("KafkaHTrial") > .enableHiveSupport() > .getOrCreate() > > > val fields:List[String]=List("begin_lat", "begin_lon", "driver", > "end_lat", "end_lon", "fare", "partition", "rider","timestamp") > val cols=fields.map(col) > > val hoodieROViewDF = spark.read.format("com.uber.hoodie").load("hdfs:// > a.com:9000/user/hive/warehouse/test/*/*/*/*") > > val l=hoodieROViewDF.select(cols:_*) > > l.write.format("com.uber.hoodie") > .option("hoodie.compact.inline", "false") > > > .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY,DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL) > .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "driver") > .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, > "partition") > .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp") > .option(HoodieWriteConfig.TABLE_NAME, "test2") > .mode(SaveMode.Append).save("hdfs:// > a.com:9000/user/hive/warehouse/test2") > > } > > > } >
