rangareddy commented on issue #12931:
URL: https://github.com/apache/hudi/issues/12931#issuecomment-2717687846
Hi @robbik
I used the following code to reproduce this issue, but I was not able to
reproduce it.
```sh
export SPARK_VERSION=3.5
spark-shell --packages
org.apache.hudi:hudi-spark$SPARK_VERSION-bundle_2.12:1.0.1 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf
'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
\
--conf
'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
```
```scala
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableConfig._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.config.HoodieCleanConfig
import org.apache.hudi.config.HoodieIndexConfig
import org.apache.hudi.config.HoodieCompactionConfig
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.hive.HiveSyncConfig
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.config.HoodieStorageConfig
import org.apache.hudi.index.HoodieIndex
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.sync.common.HoodieSyncConfig
import spark.implicits._
val tableName = "trips_table"
val basePath = "s3a://warehouse/trips_table"
val columns = Seq("ts","uuid","rider","driver","fare","city")
val data =
Seq((1695159649087L,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
(1695091554788L,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70
,"san_francisco"),
(1695046462179L,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90
,"san_francisco"),
(1695516137016L,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"
),
(1695115999911L,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai"));
val hudiOptions = Map(
"hoodie.write.record.merge.mode" -> "COMMIT_TIME_ORDERING",
"hoodie.datasource.write.partitionpath.field" -> "city",
"hoodie.table.name" -> tableName,
"hoodie.datasource.write.table.type" -> "MERGE_ON_READ",
"hoodie.metadata.enable" -> "true",
"hoodie.metadata.index.column.stats.enable" -> "true",
"hoodie.metadata.index.partition.stats.enable" -> "true",
"hoodie.metadata.index.bloom.filter.enable" -> "true",
"hoodie.metadata.record.index.enable" -> "true",
"hoodie.metadata.record.index.max.filegroup.count" -> "500000000",
"hoodie.metadata.record.index.min.filegroup.count" -> "100",
HoodieIndexConfig.INDEX_TYPE.key() ->
HoodieIndex.IndexType.RECORD_INDEX.name(),
HoodieIndexConfig.RECORD_INDEX_UPDATE_PARTITION_PATH_ENABLE.key() ->
"true",
HoodieIndexConfig.RECORD_INDEX_USE_CACHING.key() -> "true",
HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE.key() ->
"true",
HoodieIndexConfig.BLOOM_INDEX_USE_CACHING.key() -> "true",
HoodieIndexConfig.BLOOM_INDEX_USE_METADATA.key() -> "true",
HoodieCleanConfig.CLEANER_POLICY.key() -> "KEEP_LATEST_COMMITS",
HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> "10",
HoodieCompactionConfig.INLINE_COMPACT.key() -> "true",
HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "20",
HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key() -> "0",
HoodieMetadataConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.key() -> "true",
HoodieSyncConfig.META_SYNC_ENABLED.key() -> "false",
HiveSyncConfig.HIVE_SYNC_ENABLED.key() -> "false",
HoodieTableConfig.TYPE.key() -> HoodieTableType.MERGE_ON_READ.name(),
HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.key() -> "snappy"
)
var insertsDF = spark.createDataFrame(data).toDF(columns:_*)
insertsDF.write.format("hudi").
options(hudiOptions).
mode(Overwrite).
save(basePath)
spark.read.format("hudi").load(basePath).show()
val deletesDF = spark.read.format("hudi").load(basePath).filter($"rider" ===
"rider-F")
deletesDF.write.format("hudi").
option("hoodie.datasource.write.operation", "delete").
options(hudiOptions).
mode(Append).
save(basePath)
spark.read.format("hudi").load(basePath).show()
val updatesDf = spark.read.format("hudi").load(basePath).filter($"rider" ===
"rider-D").withColumn("fare", col("fare") * 10)
updatesDf.write.format("hudi").
option("hoodie.datasource.write.operation", "upsert").
options(hudiOptions).
mode(Append).
save(basePath)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]