bithw1 opened a new issue #2297:
URL: https://github.com/apache/hudi/issues/2297
Hi,
I have following code snippet that writes 101 times to the hoodie table. The
1st time is using Overwrite mode,and the other 100 times is using Append mode.
I have following 3 observations, the 2nd one is totally unexpected, it looks
like a bug, could some one help take a look?
I have enable the hive sync during write. After the write is done. I do two
queries in hive:
1. `select * from xyz.hudi_hive_read_write_cow_cleaner_1`
The answer is correct, that is, 101 rows is queried out.
2. `select count(1) from xyz.hudi_hive_read_write_cow_cleaner_1`
The answer is 1056, which is definitely wrong, it should be 104.
3. When I look at the HDFS files, there are only 11 parquests there, I have
thought it should 101, I guess there is small file merging during write?If so,
could you please point me to the hudi source code where small file merging
happens.
```
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
import org.apache.hudi.index.HoodieIndex
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object COWCleanerTest {
val spark = SparkSession.builder.appName("COWCleanerTest")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.warehouse.dir", "hdfs:///user/hive/warehouse")
.enableHiveSupport().getOrCreate()
val hudi_table = "hudi_hive_read_write_cow_cleaner_1"
val base_path = s"/data/hudi_demo/$hudi_table"
def run(df: DataFrame, round: Int) = {
val saveMode = round match {
case 0 => SaveMode.Overwrite
case _ => SaveMode.Append
}
df.write.format("hudi")
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "name")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY,
"creation_date")
.option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "xyz")
.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, hudi_table)
.option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
.option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt")
.option(DataSourceWriteOptions.HIVE_URL_OPT_KEY,
"jdbc:hive2://10.41.90.208:10000")
.option(HoodieWriteConfig.TABLE_NAME, hudi_table)
.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,
"org.apache.hudi.hive.MultiPartKeysValueExtractor")
.option(HoodieIndexConfig.INDEX_TYPE_PROP,
HoodieIndex.IndexType.GLOBAL_BLOOM.name())
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "dt")
.option(HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP, "false")
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
.mode(saveMode)
.save(base_path);
}
def main(args: Array[String]): Unit = {
import spark.implicits._
val order = MyOrder("Key-0", "Price-0", "2020-11-18 14:43:00",
"2020-11-19")
//Create table and insert 1 row
run(spark.createDataset(Seq(order)).toDF(), 1)
//Run 100 times and insert 100 rows ,one row per commit
(1 to 100).foreach {
i =>
val order = MyOrder("Key-" + i, "Price-" + i, "2020-11-18 14:43:" +
i, "2020-11-19")
val insertData = spark.createDataset(Seq(order)).toDF()
run(insertData, i)
}
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]