ankit0811 commented on issue #11742:
URL: https://github.com/apache/hudi/issues/11742#issuecomment-2285378620
Hey, since this is just a POC, its less than $100.
However, the storage cost is half than the listing cost so want to
understand if theres an incorrect config that we are using or is this generally
expected.
> Are you running this on AWS Glue or EMR?
We have our own local clusters and deploy spark using spark-operator (k8s)
> What version of Apache Hudi are you using?
0.15.0
> Which version of Spark are you working with?
3.4.3
> Could you provide the full script, including the code and configurations?
This is the hoodie writer config
```java
df = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "kafka-host-port")
.option("subscribe", "topicName")
.option("auto.offset.reset", "latest")
.option("failOnDataLoss", "false")
.load();
df = df
.selectExpr("CAST(value AS STRING) as value")
.select(
col("value"),
from_json(col("value"), schema).alias("__rows"))
.select(
col("__rows.time_stamp").alias("time_stamp"),
col("..."),
col("..."),
col("..."),,
explode(col("__rows.nestedColumn")).alias("__data")
)
.select(
col("time_stamp"),
col("...."),
col("...."),
col("....");
DataStreamWriter<Row> streamWriter = df.writeStream()
.format("hudi")
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
.option("hoodie.delete.shuffle.parallelism", "2")
.option(EMBEDDED_TIMELINE_SERVER_ENABLE.key(), "true")
.option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "col1, col2, col3,
col4")
.option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "time_stamp")
// Clustering + Compaction config
.option(ASYNC_CLUSTERING_ENABLE.key(), "true")
.option("hoodie.clustering.plan.strategy.max.bytes.per.group", "524288000")
// Metadata Config
.option(ENABLE_METADATA_INDEX_COLUMN_STATS.key(), "true")
.option(ASYNC_CLEAN.key(), "true")
.option(HoodieWriteConfig.TBL_NAME.key(), newTName)
.option(HoodieTableConfig.TYPE.key(), HoodieTableType.COPY_ON_WRITE.name())
.option("hoodie.datasource.write.operation", WriteOperationType.INSERT.value())
.option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "ts_date")
.option("checkpointLocation",
newCheckPoint)
.outputMode(OutputMode.Append());
streamWriter.trigger(new
ProcessingTimeTrigger(120000)).start(newBasePath).awaitTermination();
```
Have pasted the relevant piece of code only. Let me kn ow if this helps.
> What type of data are you working with, and which indexing option are you
currently using?
the source is a kafka stream where we expect some IOT related data. The
volume will be ~ each device sends some data at least twice a minute.
The current volume per day for the above topic is ~11Gb compressed data per
day. (FYI this is only dev data and we expect this to be at least 50 times in
prod).
We plan to partition this by date. As of now we haven't configured any
indexing and are using the default options only (Post your suggestion)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]