bithw1 opened a new issue #2304:
URL: https://github.com/apache/hudi/issues/2304
Hi,
I have following simple code that do upsert 100 times(The code is at the end
of the question description), and I disable the auto clean during writes. When
the writes is done, there are about 100 parquets in the hoodie table folder.
After I run `cleans run` in the hudi-cli, there are only 11 parquets file
left in the hoodie table folder, that means, old version files has been cleaned
up after clean.
Then I run the `select count(1) from
hudi_hive_read_write_cow_disable_cleaner_1' and the result is still 101, looks
like no data is lost after clean.**I would like to make sure whether there is
no data lost after cleaning old version files. In my case, I have written 101
letters, and still get back 101 letters after clean, which works as my
expectation**
Another obervation is:
When I run `select `_hoodie_file_name` from
hudi_hive_read_write_cow_disable_cleaner_1;` the result shows the parquest
file name, but some of them have been deleted after cleaning, not sure whether
it is a bug that it shows the file name that is deleted(I think it should show
the file name where the data resides), and also this leads to another question,
since the corresponding file has been deleted, where does the data in hive
query result come from
```
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig,
HoodieWriteConfig}
import org.apache.hudi.index.HoodieIndex
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object COWDisableCleanerTest {
val spark = SparkSession.builder.appName("COWDisableCleanerTest")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.warehouse.dir", "hdfs:///user/hive/warehouse")
.enableHiveSupport().getOrCreate()
val hudi_table = "hudi_hive_read_write_cow_disable_cleaner_1"
val base_path = s"/data/hudi_demo/$hudi_table"
def run(df: DataFrame, round: Int) = {
val saveMode = round match {
case 0 => SaveMode.Overwrite
case _ => SaveMode.Append
}
df.write.format("hudi")
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "name")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY,
"creation_date")
.option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "xyz")
.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, hudi_table)
.option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
.option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt")
.option(DataSourceWriteOptions.HIVE_URL_OPT_KEY,
"jdbc:hive2://10.41.90.208:10000")
.option(HoodieWriteConfig.TABLE_NAME, hudi_table)
.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,
"org.apache.hudi.hive.MultiPartKeysValueExtractor")
.option(HoodieIndexConfig.INDEX_TYPE_PROP,
HoodieIndex.IndexType.GLOBAL_BLOOM.name())
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "dt")
.option(HoodieWriteConfig.HOODIE_AUTO_COMMIT_PROP, "false")
//在100次commit过程中不做clean
.option(HoodieCompactionConfig.AUTO_CLEAN_PROP, "false")
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
.mode(saveMode)
.save(base_path);
}
def main(args: Array[String]): Unit = {
import spark.implicits._
val order = MyOrder("Key-0", "Price-0", "2020-11-18 14:43:00",
"2020-11-19")
//Create table and insert 1 row
run(spark.createDataset(Seq(order)).toDF(), 1)
//Run 100 times and insert 100 rows ,one row per commit
(1 to 100).foreach {
i =>
val order = MyOrder("Key-" + i, "Price-" + i, "2020-11-18 14:43:" +
i, "2020-11-19")
val insertData = spark.createDataset(Seq(order)).toDF()
run(insertData, i)
}
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]