bithw1 opened a new issue, #6425:
URL: https://github.com/apache/hudi/issues/6425
Hi,
I am working with Hudi 0.9.0, and I have following code that writes 10
records to MOR hudi table(one record for each spark job). There are 11 commits
in total,
when I look at the files written in disk, there is `NO` log file , and there
are 11 parquet files. Looks I am writing to a COW table, not sure where the
problem is,
```
package org.example
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
import org.apache.hudi.index.HoodieIndex
import org.apache.spark.sql.{SaveMode, SparkSession}
case class Order(
name: String,
price: String,
creation_date: String)
object Hudi003_Demo {
val spark = SparkSession.builder.appName(this.getClass.getSimpleName).
config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.enableHiveSupport()
.master("local[1]")
.getOrCreate()
def write_data(i: Int): Unit = {
val hudi_table_name = this.getClass.getSimpleName
val base_path = "/data/hudi_demo/" + hudi_table_name
import spark.implicits._
val order = Order(name = s"order_$i", price = s"$i-11.3", creation_date
= s"date-0")
val insertData = spark.createDataset(Seq(order))
//DataFrame Write
var writer = insertData.write.format("hudi")
.option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), "name")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY,
"creation_date")
.option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
.option(HoodieIndexConfig.INDEX_TYPE_PROP,
HoodieIndex.IndexType.GLOBAL_BLOOM.name())
.option("hoodie.insert.shuffle.parallelism", "1")
.option("hoodie.upsert.shuffle.parallelism", "1")
.option(HoodieWriteConfig.TABLE_NAME, hudi_table_name)
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,
"creation_date")
//Write to MOR Table
.option(DataSourceWriteOptions.TABLE_TYPE.key(),
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.option("hoodie.compact.inline", "false")
.option("hoodie.compact.inline.max.delta.commits", "1")
writer.mode(SaveMode.Append)
.save(base_path)
}
def test1(): Unit = {
(0 to 10).foreach {
i => write_data(i)
}
}
def main(args: Array[String]): Unit = {
test1()
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]