bithw1 commented on issue #2276:
URL: https://github.com/apache/hudi/issues/2276#issuecomment-733441100
The code that create/upsert the table is as follows, I have explicitly
specified the following two lines to disable compaction.
.option(HoodieCompactionConfig.INLINE_COMPACT_PROP, "false")
.option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY, "false")
Not sure how I could be able to exercise the compaction feature with
code..Could you please help? @bvaradar ,Thanks!
```
package org.example.hudi
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig,
HoodieWriteConfig}
import org.apache.hudi.index.HoodieIndex
import org.apache.spark.sql.{SaveMode, SparkSession}
case class MyOrder(
name: String,
price: String,
creation_date: String,
dt: String)
object MORWorkTest {
val overwrite1Data = Seq(
MyOrder("A", "1", "2020-11-18 14:43:32", "2020-11-19"),
MyOrder("B", "1", "2020-11-18 14:42:21", "2020-11-19"),
MyOrder("C", "1", "2020-11-18 14:47:19", "2020-11-19"),
MyOrder("D", "1", "2020-11-18 14:46:50", "2020-11-19")
)
val insertUpdate1Data = Seq(
MyOrder("A", "2", "2020-11-18 14:50:32", "2020-11-19"),
MyOrder("B", "2", "2020-11-18 14:50:21", "2020-11-19"),
MyOrder("C", "2", "2020-11-18 14:50:19", "2020-11-19"),
MyOrder("D", "2", "2020-11-18 14:50:50", "2020-11-19")
)
val insertUpdate2Data = Seq(
MyOrder("A", "3", "2020-11-18 14:53:32", "2020-11-19"),
MyOrder("B", "3", "2020-11-18 14:52:21", "2020-11-19"),
MyOrder("C", "3", "2020-11-18 14:57:19", "2020-11-19"),
MyOrder("D", "3", "2020-11-18 14:56:50", "2020-11-19")
)
val spark = SparkSession.builder.appName("MORTest")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.warehouse.dir", "hdfs:///user/hive/warehouse")
.enableHiveSupport().getOrCreate()
val hudi_table = "hudi_hive_read_write_mor_5"
val base_path = s"/data/hudi_demo/$hudi_table"
def run(op: Int) = {
val (data, saveMode) = op match {
case 1 => (overwrite1Data, SaveMode.Overwrite)
case 2 => (insertUpdate1Data, SaveMode.Append)
case 3 => (insertUpdate2Data, SaveMode.Append)
}
import spark.implicits._
val insertData = spark.createDataset(data)
insertData.write.format("hudi")
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "name")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY,
"creation_date")
.option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "xyz")
.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, hudi_table)
.option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
.option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt")
//table type: MOR
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
//disable async compact
.option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY, "false")
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP,
100)
//disable inline compact
.option(HoodieCompactionConfig.INLINE_COMPACT_PROP, "false")
.option(DataSourceWriteOptions.HIVE_URL_OPT_KEY,
"jdbc:hive2://10.41.90.208:10000")
.option(HoodieWriteConfig.TABLE_NAME, hudi_table)
.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,
"org.apache.hudi.hive.MultiPartKeysValueExtractor")
.option(HoodieIndexConfig.INDEX_TYPE_PROP,
HoodieIndex.IndexType.GLOBAL_BLOOM.name())
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "dt")
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
.mode(saveMode)
.save(base_path);
}
def main(args: Array[String]): Unit = {
//do overwrite
run(1)
//do upsert
run(2)
//do upsert
run(3)
println("===MOR is done=====")
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]