JoshuaZhuCN edited a comment on issue #3981:
URL: https://github.com/apache/hudi/issues/3981#issuecomment-971393339
@xushiyan Hi,Here is my test code:
```
import com.leqee.sparktool.date.DateUtil
import com.leqee.sparktool.hoodie.HoodieProp
import com.leqee.sparktool.spark.SparkTool
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
DataSourceOptionsHelper}
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField,
StructType, TimestampType}
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.HoodieCleaningPolicy
import org.apache.hudi.config._
import org.apache.hudi.index.HoodieIndex
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
object Test4 {
def main(args: Array[String]): Unit = {
val AD_DATE = "1980-01-01 00:00:00"
val spark = SparkSession
.builder()
.config(
new SparkConf()
.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
. set("spark.master","local[2]")
)
.getOrCreate()
val data = Seq(
Row(1, "A", 10, DateUtil.now()),
Row(2, "B", 20, DateUtil.now()),
Row(3, "C", 30, DateUtil.now()))
val schema = StructType(List(
StructField("id", IntegerType),
StructField("name", StringType),
StructField("age", IntegerType),
StructField("dt", StringType)))
val df = spark.createDataFrame(spark.sparkContext.makeRDD(data),
schema)
df.show(false)
df.write.format("org.apache.hudi")
.option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), "id")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "id")
.option(HoodieWriteConfig.TBL_NAME.key(), "tb_hbase_test")
.option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option("hoodie.datasource.write.table.type", "MERGE_ON_READ")
.option("hoodie.index.type", "HBASE")
.option("hoodie.index.hbase.zkport", "2181")
.option("hoodie.hbase.index.update.partition.path", "true")
.option("hoodie.index.hbase.max.qps.fraction", "10000")
.option("hoodie.index.hbase.min.qps.fraction", "1000")
.option("hoodie.index.hbase.table", "hudi:tb_hbase_test")
.option("hoodie.index.hbase.zknode.path", "/hbase")
.option("hoodie.index.hbase.get.batch.size", "1000")
.option("hoodie.index.hbase.zkquorum", "127.0.0.1")
.option("hoodie.index.hbase.sleep.ms.for.get.batch", "100")
.option("hoodie.index.hbase.sleep.ms.for.get.batch", "10")
.option("hoodie.index.hbase.max.qps.per.region.server", "1000")
.option("hoodie.index.hbase.zk.session_timeout_ms", "5000")
.option("hoodie.index.hbase.desired_puts_time_in_secs", "3600")
.option(HoodieProp.INDEX_HBASE_ZKPORT_PROP,
HoodieProp.INDEX_HBASE_ZKPORT_VALUE_DEFAULT)
.option(HoodieProp.INDEX_HBASE_UPDATE_PARTITION_PATH_ENABLE_PROP,
HoodieProp.INDEX_HBASE_UPDATE_PARTITION_PATH_ENABLE_VALUE_DEFAULT)
.option(HoodieProp.INDEX_HBASE_QPS_ALLOCATOR_CLASS_NAME_PROP,
HoodieProp.INDEX_HBASE_QPS_ALLOCATOR_CLASS_NAME_VALUE_DEFAULT)
.option(HoodieProp.INDEX_HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP,
HoodieProp.INDEX_HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_VALUE_DEFAULT)
.option(HoodieProp.INDEX_HBASE_ROLLBACK_SYNC_ENABLE_PROP,
HoodieProp.INDEX_HBASE_ROLLBACK_SYNC_ENABLE_VALUE_DEFAULT)
.option(HoodieProp.INDEX_HBASE_GET_BATCH_SIZE_PROP,
HoodieProp.INDEX_HBASE_GET_BATCH_SIZE_VALUE_DEFAULT)
.option(HoodieProp.INDEX_HBASE_ZKPATH_QPS_ROOT_PROP,
HoodieProp.INDEX_HBASE_ZKPATH_QPS_ROOT_VALUE_DEFAULT)
.option(HoodieProp.INDEX_HBASE_MAX_QPS_PER_REGION_SERVER_PROP,
HoodieProp.INDEX_HBASE_MAX_QPS_PER_REGION_SERVER_VALUE_DEFAULT)
.option(HoodieProp.INDEX_HBASE_MAX_QPS_FRACTION_PROP,
HoodieProp.INDEX_HABSE_MAX_QPS_FRACTION_VALUE_DEFAULT)
.option(HoodieProp.INDEX_HABSE_MIN_QPS_FRACTION_PROP,
HoodieProp.INDEX_HBASE_MIN_QPS_FRACTION_VALUE_DEFAULT)
.option(HoodieProp.INDEX_HBASE_ZK_CONNECTION_TIMEOUT_MS_PROP,
HoodieProp.INDEX_HBASE_ZK_CONNECTION_TIMEOUT_MS_VALUE_DEFAULT)
.option(HoodieProp.INDEX_HBASE_COMPUTE_QPS_DYNAMICALLY_PROP,
HoodieProp.INDEX_HBASE_COMPUTE_QPS_DYNAMICALLY_VALUE_DEFAULT)
.option(HoodieProp.INDEX_HBASE_QPS_FRACTION_PROP,
HoodieProp.INDEX_HBASE_QPS_FRACTION_VALUE_DEFAULT)
.option(HoodieProp.INDEX_HBASE_ZK_SESSION_TIMEOUT_MS_PROP,
HoodieProp.INDEX_HBASE_ZK_SESSION_TIMEOUT_MS_VALUE_DEFAULT)
.option(HoodieProp.INDEX_HBASE_PUT_BATCH_SIZE_PROP,
HoodieProp.INDEX_HBASE_PUT_BATCH_SIZE_VALUE_DEFAULT)
.option(HoodieProp.INDEX_HBASE_DESIRED_PUTS_TIME_IN_SECONDS_PROP,
HoodieProp.INDEX_HBASE_DESIRED_PUTS_TIME_IN_SECONDS_VALUE_DEFAULT)
.mode(SaveMode.Overwrite)
.save("hdfs://localhost:9000/hoodie/tb_hbase_test")
println("===================Snapshot
Read==============================")
spark.read
.format("hudi")
.option("mergeSchema", "true")
.option(DataSourceReadOptions.QUERY_TYPE.key(),
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
.load("hdfs://localhost:9000/hoodie/tb_hbase_test/default/*")
.show(false)
println("==============================================================")
println("===================Incremental
Read==============================")
spark.read
.format("hudi")
.option("mergeSchema", "true")
.option(DataSourceReadOptions.QUERY_TYPE.key(),
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), AD_DATE)
.load("hdfs://localhost:9000/hoodie/tb_hbase_test/default/*")
.show(false)
println("==============================================================")
spark.close()
}
}
```
***The output is as follows:
```
com.leqee.ontariosync.function.Test4
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
+---+----+---+-----------------------+
|id |name|age|dt |
+---+----+---+-----------------------+
|1 |A |10 |2021-11-17 17:21:40.508|
|2 |B |20 |2021-11-17 17:21:40.508|
|3 |C |30 |2021-11-17 17:21:40.508|
+---+----+---+-----------------------+
===================Snapshot Read==============================
+-------------------+--------------------+------------------+----------------------+-----------------+---+----+---+---+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name|id
|name|age|dt |
+-------------------+--------------------+------------------+----------------------+-----------------+---+----+---+---+
+-------------------+--------------------+------------------+----------------------+-----------------+---+----+---+---+
==============================================================
===================Incremental Read==============================
+-------------------+--------------------+------------------+----------------------+--------------------------------------+---+----+---+-----------------------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name
|id |name|age|dt |
+-------------------+--------------------+------------------+----------------------+--------------------------------------+---+----+---+-----------------------+
|20211117172143 |20211117172143_0_1 |1 |default
|f66c7e31-56ac-49cc-b147-9a1f27908c7f-0|1 |A |10 |2021-11-17
17:21:40.508|
|20211117172143 |20211117172143_0_2 |2 |default
|f66c7e31-56ac-49cc-b147-9a1f27908c7f-0|2 |B |20 |2021-11-17
17:21:40.508|
|20211117172143 |20211117172143_0_3 |3 |default
|f66c7e31-56ac-49cc-b147-9a1f27908c7f-0|3 |C |30 |2021-11-17
17:21:40.508|
+-------------------+--------------------+------------------+----------------------+--------------------------------------+---+----+---+-----------------------+
==============================================================
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]