JoshuaZhuCN edited a comment on issue #3981: URL: https://github.com/apache/hudi/issues/3981#issuecomment-971393339
@xushiyan Hi,Here is my test code: ``` import com.leqee.sparktool.date.DateUtil import com.leqee.sparktool.hoodie.HoodieProp import com.leqee.sparktool.spark.SparkTool import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, DataSourceOptionsHelper} import org.apache.spark.SparkConf import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.functions.{col, lit} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType, TimestampType} import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.model.HoodieCleaningPolicy import org.apache.hudi.config._ import org.apache.hudi.index.HoodieIndex import org.apache.hudi.keygen.constant.KeyGeneratorOptions object Test4 { def main(args: Array[String]): Unit = { val AD_DATE = "1980-01-01 00:00:00" val spark = SparkSession .builder() .config( new SparkConf() .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") . set("spark.master","local[2]") ) .getOrCreate() val data = Seq( Row(1, "A", 10, DateUtil.now()), Row(2, "B", 20, DateUtil.now()), Row(3, "C", 30, DateUtil.now())) val schema = StructType(List( StructField("id", IntegerType), StructField("name", StringType), StructField("age", IntegerType), StructField("dt", StringType))) val df = spark.createDataFrame(spark.sparkContext.makeRDD(data), schema) df.show(false) df.write.format("org.apache.hudi") .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), "id") .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "id") .option(HoodieWriteConfig.TBL_NAME.key(), "tb_hbase_test") .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option("hoodie.datasource.write.table.type", "MERGE_ON_READ") .option("hoodie.index.type", "HBASE") .option("hoodie.index.hbase.zkport", "2181") .option("hoodie.hbase.index.update.partition.path", "true") .option("hoodie.index.hbase.max.qps.fraction", "10000") .option("hoodie.index.hbase.min.qps.fraction", "1000") .option("hoodie.index.hbase.table", "hudi:tb_hbase_test") .option("hoodie.index.hbase.zknode.path", "/hbase") .option("hoodie.index.hbase.get.batch.size", "1000") .option("hoodie.index.hbase.zkquorum", "127.0.0.1") .option("hoodie.index.hbase.sleep.ms.for.get.batch", "100") .option("hoodie.index.hbase.sleep.ms.for.get.batch", "10") .option("hoodie.index.hbase.max.qps.per.region.server", "1000") .option("hoodie.index.hbase.zk.session_timeout_ms", "5000") .option("hoodie.index.hbase.desired_puts_time_in_secs", "3600") .option(HoodieProp.INDEX_HBASE_ZKPORT_PROP, HoodieProp.INDEX_HBASE_ZKPORT_VALUE_DEFAULT) .option(HoodieProp.INDEX_HBASE_UPDATE_PARTITION_PATH_ENABLE_PROP, HoodieProp.INDEX_HBASE_UPDATE_PARTITION_PATH_ENABLE_VALUE_DEFAULT) .option(HoodieProp.INDEX_HBASE_QPS_ALLOCATOR_CLASS_NAME_PROP, HoodieProp.INDEX_HBASE_QPS_ALLOCATOR_CLASS_NAME_VALUE_DEFAULT) .option(HoodieProp.INDEX_HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP, HoodieProp.INDEX_HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_VALUE_DEFAULT) .option(HoodieProp.INDEX_HBASE_ROLLBACK_SYNC_ENABLE_PROP, HoodieProp.INDEX_HBASE_ROLLBACK_SYNC_ENABLE_VALUE_DEFAULT) .option(HoodieProp.INDEX_HBASE_GET_BATCH_SIZE_PROP, HoodieProp.INDEX_HBASE_GET_BATCH_SIZE_VALUE_DEFAULT) .option(HoodieProp.INDEX_HBASE_ZKPATH_QPS_ROOT_PROP, HoodieProp.INDEX_HBASE_ZKPATH_QPS_ROOT_VALUE_DEFAULT) .option(HoodieProp.INDEX_HBASE_MAX_QPS_PER_REGION_SERVER_PROP, HoodieProp.INDEX_HBASE_MAX_QPS_PER_REGION_SERVER_VALUE_DEFAULT) .option(HoodieProp.INDEX_HBASE_MAX_QPS_FRACTION_PROP, HoodieProp.INDEX_HABSE_MAX_QPS_FRACTION_VALUE_DEFAULT) .option(HoodieProp.INDEX_HABSE_MIN_QPS_FRACTION_PROP, HoodieProp.INDEX_HBASE_MIN_QPS_FRACTION_VALUE_DEFAULT) .option(HoodieProp.INDEX_HBASE_ZK_CONNECTION_TIMEOUT_MS_PROP, HoodieProp.INDEX_HBASE_ZK_CONNECTION_TIMEOUT_MS_VALUE_DEFAULT) .option(HoodieProp.INDEX_HBASE_COMPUTE_QPS_DYNAMICALLY_PROP, HoodieProp.INDEX_HBASE_COMPUTE_QPS_DYNAMICALLY_VALUE_DEFAULT) .option(HoodieProp.INDEX_HBASE_QPS_FRACTION_PROP, HoodieProp.INDEX_HBASE_QPS_FRACTION_VALUE_DEFAULT) .option(HoodieProp.INDEX_HBASE_ZK_SESSION_TIMEOUT_MS_PROP, HoodieProp.INDEX_HBASE_ZK_SESSION_TIMEOUT_MS_VALUE_DEFAULT) .option(HoodieProp.INDEX_HBASE_PUT_BATCH_SIZE_PROP, HoodieProp.INDEX_HBASE_PUT_BATCH_SIZE_VALUE_DEFAULT) .option(HoodieProp.INDEX_HBASE_DESIRED_PUTS_TIME_IN_SECONDS_PROP, HoodieProp.INDEX_HBASE_DESIRED_PUTS_TIME_IN_SECONDS_VALUE_DEFAULT) .mode(SaveMode.Overwrite) .save("hdfs://localhost:9000/hoodie/tb_hbase_test") println("===================Snapshot Read==============================") spark.read .format("hudi") .option("mergeSchema", "true") .option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) .load("hdfs://localhost:9000/hoodie/tb_hbase_test/default/*") .show(false) println("==============================================================") println("===================Incremental Read==============================") spark.read .format("hudi") .option("mergeSchema", "true") .option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), AD_DATE) .load("hdfs://localhost:9000/hoodie/tb_hbase_test/default/*") .show(false) println("==============================================================") spark.close() } } ``` ***The output is as follows: ``` com.leqee.ontariosync.function.Test4 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties +---+----+---+-----------------------+ |id |name|age|dt | +---+----+---+-----------------------+ |1 |A |10 |2021-11-17 17:21:40.508| |2 |B |20 |2021-11-17 17:21:40.508| |3 |C |30 |2021-11-17 17:21:40.508| +---+----+---+-----------------------+ ===================Snapshot Read============================== +-------------------+--------------------+------------------+----------------------+-----------------+---+----+---+---+ |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name|id |name|age|dt | +-------------------+--------------------+------------------+----------------------+-----------------+---+----+---+---+ +-------------------+--------------------+------------------+----------------------+-----------------+---+----+---+---+ ============================================================== ===================Incremental Read============================== +-------------------+--------------------+------------------+----------------------+--------------------------------------+---+----+---+-----------------------+ |_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name |id |name|age|dt | +-------------------+--------------------+------------------+----------------------+--------------------------------------+---+----+---+-----------------------+ |20211117172143 |20211117172143_0_1 |1 |default |f66c7e31-56ac-49cc-b147-9a1f27908c7f-0|1 |A |10 |2021-11-17 17:21:40.508| |20211117172143 |20211117172143_0_2 |2 |default |f66c7e31-56ac-49cc-b147-9a1f27908c7f-0|2 |B |20 |2021-11-17 17:21:40.508| |20211117172143 |20211117172143_0_3 |3 |default |f66c7e31-56ac-49cc-b147-9a1f27908c7f-0|3 |C |30 |2021-11-17 17:21:40.508| +-------------------+--------------------+------------------+----------------------+--------------------------------------+---+----+---+-----------------------+ ============================================================== ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org