spyzzz opened a new issue #2175:
URL: https://github.com/apache/hudi/issues/2175
Hello,
Quick explanation of the situation, i've multiples kafka topics (one for
each table) containing CDC events sent by debezium.
I need to read in streaming thoses changes, and update corresponding table
in HIVE (1.2).
Tables could be huge (200M+ events) but CDC are not very huge, lets says
maximum few thousands per day per table.
so the first sync could be painful, but once its done, CDC could be pretty
''light''.
I first tried DeltaStream but i need to do specific operation, such as
filtering data, converting date so i'd rather do it in custom spark code to get
more flexibility.
I decided to use structured streaming to connect to all my topics (two
choice here: one stream connected to severeals topics, or on stream per topic)
1 -> this solution need a groupby topic to be able to save data in
corresponding table (not simple)
```
spark.readStream.format("kafka").options(xxxxx).option("subscribe","all-topics")
```
2 ->
This solution is easier to manage but its create lots of stream (more vcpu)
```
for (table <- tables ) {
spark.readStream.format("kafka").options(xxxxx).option("subscribe",table)
}
```
After this, i use writeStream in hudi format every 2min to write received
data to corresponding table :
```
writeStream
.trigger(Trigger.ProcessingTime("120 seconds"))
.foreachBatch((df,id) => {
df.write.format("org.apache.hudi")
.options(HudiUtils.getHudiOptions(table))
.options(HudiUtils.getHiveSyncOptions(table.name))
.options(HudiUtils.getCompactionOptions)
.mode(SaveMode.Append)
.save(config.pathConf.outputPath + "/out/" + table.name )
})
.option("checkpointLocation",config.pathConf.outputPath +
"/checkpoint/" + table.name)
.start()
```
Here are my configuration :
For HUDI :
```
Map(
TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE",
PRECOMBINE_FIELD_OPT_KEY -> "ts_ms",
RECORDKEY_FIELD_OPT_KEY -> table.pk,
OPERATION_OPT_KEY -> "upsert",
KEYGENERATOR_CLASS_OPT_KEY->
"org.apache.hudi.keygen.NonpartitionedKeyGenerator",
TABLE_NAME_OPT_KEY -> ("hudi_" + table.name),
"hoodie.table.name" -> ("hudi_" + table.name),
"hoodie.upsert.shuffle.parallelism" -> "2",
```
For Compaction :
```
Map(
"hoodie.compact.inline" -> "true",
"hoodie.compact.inline.max.delta.commits" -> "1",
"hoodie.cleaner.commits.retained" -> "1",
"hoodie.cleaner.fileversions.retained" -> "1",
"hoodie.clean.async" -> "false",
"hoodie.clean.automatic" ->"true",
"hoodie.parquet.compression.codec" -> "snappy"
)
```
For Spark :
```
.config("spark.executor.cores", "3")
.config("spark.executor.instances","5")
.config("spark.executor.memory", "2g")
.config("spark.rdd.compress","true")
.config("spark.shuffle.service.enabled","true")
.config("spark.sql.hive.convertMetastoreParquet","false")
.config("spark.kryoserializer.buffer.max","512m")
.config("spark.driver.memoryOverhead","1024")
.config("spark.executor.memoryOverhead","3072")
.config("spark.max.executor.failures","100")
```
**Expected behavior**
I tried this code with a unique topic with 24K records, and its takes more
than 5min to write to HDFS.
with multiple topics its hangs and can be pretty long...


**Environment Description**
* Hudi version : 0.6.0
* Spark version : 2.4.6
* Hive version : 1.2
* Hadoop version : 2.7
* Storage (HDFS/S3/GCS..) : HDFS
* Running on Docker? (yes/no) : no
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]