bkosuru commented on issue #9172:
URL: https://github.com/apache/hudi/issues/9172#issuecomment-1644703216
Hi @ad1happy2go, Did you use dataproc serverless(batches)?
It failed with incremental query(set the following option on the reader)
reader.option(BEGIN_INSTANTTIME_OPT_KEY, "20220123")
Also you need a big dataset. You cannot reproduce with few MB data. It
worked fine for me on a dataset size 420MB
My dataset size is 66GB. The data was loaded into Hudi on 2022-09-06. So
loading the entire data using incremental query
The dataset has 4 columns(s,p,o,g). The data is all Strings. 2 columns are
used as partitions. There is only one partition for one column and 100
partitions for the second column.
Here is the hudi options used:
val DELETED_COL = "isDeleted"
private val AVG_RECORD_SIZE =
256 // approx bytes of our average record, contra Hudi default
assumption of 1024
private val ONE_GIGABYTE =
1024 * 1024 * 1024 // used for Parquet file size & block size
private val BLOOM_MAX_ENTRIES = ONE_GIGABYTE / (2 * AVG_RECORD_SIZE)
def save(
df: DataFrame,
operation: Operation,
output: String,
tableName: String,
parallelism: Int,
saveMode: SaveMode
): Unit = {
df.write
.format(HUDI_FORMAT)
// DataSourceWriteOptions
.option(operation.parallelismOption, parallelism)
.options(
if (operation == InsertDedup)
Map(INSERT_DROP_DUPS_OPT_KEY -> true.toString)
else Map[String, String]()
)
.option(HIVE_STYLE_PARTITIONING_OPT_KEY, true)
.option(KEYGENERATOR_CLASS_OPT_KEY, classOf[SpoKeyGenerator].getName)
.option(
OPERATION_OPT_KEY,
operation.hudiOp
) // insert, bulk_insert, upsert, or delete
.option(PARTITIONPATH_FIELD_OPT_KEY, "g, p")
.option(PRECOMBINE_FIELD_OPT_KEY, DELETED_COL)
.option(RECORDKEY_FIELD_OPT_KEY, "s, o")
.option(URL_ENCODE_PARTITIONING_OPT_KEY, true)
// HoodieIndexConfig
.option(HOODIE_BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES,
BLOOM_MAX_ENTRIES)
.option(BLOOM_INDEX_FILTER_TYPE, BloomFilterTypeCode.DYNAMIC_V0.name)
// HoodieCompactionConfig
// For first commit to a hudi table, to determine how many records can
fit into a data file
// Useful for hudi copy; can be tweaked if filecount differs from the
source; default 1024
.option(COPY_ON_WRITE_TABLE_RECORD_SIZE_ESTIMATE, 64)
// Commit history; MIN should be less than MAX; CLEANER should be less
than MIN
.option(CLEANER_COMMITS_RETAINED_PROP, Integer.MAX_VALUE - 2)
.option(MIN_COMMITS_TO_KEEP_PROP, Integer.MAX_VALUE - 1)
.option(MAX_COMMITS_TO_KEEP_PROP, Integer.MAX_VALUE)
// HoodieStorageConfig
.option(LOGFILE_SIZE_MAX_BYTES, ONE_GIGABYTE / 0.35)
.option(PARQUET_BLOCK_SIZE_BYTES, ONE_GIGABYTE)
.option(
PARQUET_FILE_MAX_BYTES,
ONE_GIGABYTE
) // Current hadoop cfg uses 256MB block size.
// HoodieWriteConfig
.option(EMBEDDED_TIMELINE_SERVER_ENABLED, false)
.option(HoodieWriteConfig.TABLE_NAME, tableName)
.option("hoodie.metadata.enable", false)
.option("hoodie.index.type", "BLOOM")
.mode(saveMode)
.save(path)
}
class SpoKeyGenerator(props: TypedProperties)
extends ComplexKeyGenerator(props) {
def hash128(s: String): String = {
val h: Array[Long] = MurmurHash3.hash128(s.getBytes)
h(0).toString + h(1).toString
}
override def getRecordKey(record: GenericRecord): String = {
val s = HoodieAvroUtils.getNestedFieldValAsString(record, "s", false,
false)
val o = HoodieAvroUtils.getNestedFieldValAsString(record, "o", false,
false)
genKey(s, o)
}
private def genKey(s: String, o: String): String = hash128(s + o)
override def getRecordKey(row: Row): String = {
val s = row.getAs(0).toString
val o = row.getAs(1).toString
genKey(s, o)
}
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]