JoshuaZhuCN edited a comment on issue #3981:
URL: https://github.com/apache/hudi/issues/3981#issuecomment-971393339


   @xushiyan Hi,Here is my test code:
   
   ```
   import com.leqee.sparktool.date.DateUtil
   import com.leqee.sparktool.hoodie.HoodieProp
   import com.leqee.sparktool.spark.SparkTool
   import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
DataSourceOptionsHelper}
   import org.apache.spark.SparkConf
   import org.apache.spark.sql.{Row, SaveMode, SparkSession}
   import org.apache.spark.sql.functions.{col, lit}
   import org.apache.spark.sql.types.{IntegerType, StringType, StructField, 
StructType, TimestampType}
   import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER
   import org.apache.hudi.common.config.HoodieMetadataConfig
   import org.apache.hudi.common.model.HoodieCleaningPolicy
   import org.apache.hudi.config._
   import org.apache.hudi.index.HoodieIndex
   import org.apache.hudi.keygen.constant.KeyGeneratorOptions
   
   object Test4 {
       def main(args: Array[String]): Unit = {
           val AD_DATE = "1980-01-01 00:00:00"
           val spark = SparkSession
               .builder()
               .config(
                   new SparkConf()
                       .set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
                   .   set("spark.master","local[2]")
               )
               .getOrCreate()
   
           val data = Seq(
               Row(1, "A", 10, DateUtil.now()),
               Row(2, "B", 20, DateUtil.now()),
               Row(3, "C", 30, DateUtil.now()))
   
           val schema = StructType(List(
               StructField("id", IntegerType),
               StructField("name", StringType),
               StructField("age", IntegerType),
               StructField("dt", StringType)))
   
           val df = spark.createDataFrame(spark.sparkContext.makeRDD(data), 
schema)
   
           df.show(false)
   
           df.write.format("org.apache.hudi")
               .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), "id")
               .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "id")
               .option(HoodieWriteConfig.TBL_NAME.key(), "tb_hbase_test")
               .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
               .option("hoodie.datasource.write.table.type", "MERGE_ON_READ")
               .option("hoodie.index.type", "HBASE")
               .option("hoodie.index.hbase.zkport", "2181")
               .option("hoodie.hbase.index.update.partition.path", "true")
               .option("hoodie.index.hbase.max.qps.fraction", "10000")
               .option("hoodie.index.hbase.min.qps.fraction", "1000")
               .option("hoodie.index.hbase.table", "hudi:tb_hbase_test")
               .option("hoodie.index.hbase.zknode.path", "/hbase")
               .option("hoodie.index.hbase.get.batch.size", "1000")
               .option("hoodie.index.hbase.zkquorum", "127.0.0.1")
               .option("hoodie.index.hbase.sleep.ms.for.get.batch", "100")
               .option("hoodie.index.hbase.sleep.ms.for.get.batch", "10")
               .option("hoodie.index.hbase.max.qps.per.region.server", "1000")
               .option("hoodie.index.hbase.zk.session_timeout_ms", "5000")
               .option("hoodie.index.hbase.desired_puts_time_in_secs", "3600")
               .option(HoodieProp.INDEX_HBASE_ZKPORT_PROP, 
HoodieProp.INDEX_HBASE_ZKPORT_VALUE_DEFAULT)
               
.option(HoodieProp.INDEX_HBASE_UPDATE_PARTITION_PATH_ENABLE_PROP, 
HoodieProp.INDEX_HBASE_UPDATE_PARTITION_PATH_ENABLE_VALUE_DEFAULT)
               .option(HoodieProp.INDEX_HBASE_QPS_ALLOCATOR_CLASS_NAME_PROP, 
HoodieProp.INDEX_HBASE_QPS_ALLOCATOR_CLASS_NAME_VALUE_DEFAULT)
               .option(HoodieProp.INDEX_HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP, 
HoodieProp.INDEX_HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_VALUE_DEFAULT)
               .option(HoodieProp.INDEX_HBASE_ROLLBACK_SYNC_ENABLE_PROP, 
HoodieProp.INDEX_HBASE_ROLLBACK_SYNC_ENABLE_VALUE_DEFAULT)
               .option(HoodieProp.INDEX_HBASE_GET_BATCH_SIZE_PROP, 
HoodieProp.INDEX_HBASE_GET_BATCH_SIZE_VALUE_DEFAULT)
               .option(HoodieProp.INDEX_HBASE_ZKPATH_QPS_ROOT_PROP, 
HoodieProp.INDEX_HBASE_ZKPATH_QPS_ROOT_VALUE_DEFAULT)
               .option(HoodieProp.INDEX_HBASE_MAX_QPS_PER_REGION_SERVER_PROP, 
HoodieProp.INDEX_HBASE_MAX_QPS_PER_REGION_SERVER_VALUE_DEFAULT)
               .option(HoodieProp.INDEX_HBASE_MAX_QPS_FRACTION_PROP, 
HoodieProp.INDEX_HABSE_MAX_QPS_FRACTION_VALUE_DEFAULT)
               .option(HoodieProp.INDEX_HABSE_MIN_QPS_FRACTION_PROP, 
HoodieProp.INDEX_HBASE_MIN_QPS_FRACTION_VALUE_DEFAULT)
               .option(HoodieProp.INDEX_HBASE_ZK_CONNECTION_TIMEOUT_MS_PROP, 
HoodieProp.INDEX_HBASE_ZK_CONNECTION_TIMEOUT_MS_VALUE_DEFAULT)
               .option(HoodieProp.INDEX_HBASE_COMPUTE_QPS_DYNAMICALLY_PROP, 
HoodieProp.INDEX_HBASE_COMPUTE_QPS_DYNAMICALLY_VALUE_DEFAULT)
               .option(HoodieProp.INDEX_HBASE_QPS_FRACTION_PROP, 
HoodieProp.INDEX_HBASE_QPS_FRACTION_VALUE_DEFAULT)
               .option(HoodieProp.INDEX_HBASE_ZK_SESSION_TIMEOUT_MS_PROP, 
HoodieProp.INDEX_HBASE_ZK_SESSION_TIMEOUT_MS_VALUE_DEFAULT)
               .option(HoodieProp.INDEX_HBASE_PUT_BATCH_SIZE_PROP, 
HoodieProp.INDEX_HBASE_PUT_BATCH_SIZE_VALUE_DEFAULT)
               
.option(HoodieProp.INDEX_HBASE_DESIRED_PUTS_TIME_IN_SECONDS_PROP, 
HoodieProp.INDEX_HBASE_DESIRED_PUTS_TIME_IN_SECONDS_VALUE_DEFAULT)
               .mode(SaveMode.Overwrite)
               .save("hdfs://localhost:9000/hoodie/tb_hbase_test")
   
           println("===================Snapshot 
Read==============================")
           spark.read
               .format("hudi")
               .option("mergeSchema", "true")
               .option(DataSourceReadOptions.QUERY_TYPE.key(), 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
               .load("hdfs://localhost:9000/hoodie/tb_hbase_test/default/*")
               .show(false)
           
println("==============================================================")
           println("===================Incremental 
Read==============================")
           spark.read
               .format("hudi")
               .option("mergeSchema", "true")
               .option(DataSourceReadOptions.QUERY_TYPE.key(), 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
               .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key(), AD_DATE)
               .load("hdfs://localhost:9000/hoodie/tb_hbase_test/default/*")
               .show(false)
           
println("==============================================================")
           
           spark.close()
       }
   }
   
   ```
   
   ***The output is as follows:
   ```
   com.leqee.ontariosync.function.Test4
   Using Spark's default log4j profile: 
org/apache/spark/log4j-defaults.properties
   +---+----+---+-----------------------+
   |id |name|age|dt                     |
   +---+----+---+-----------------------+
   |1  |A   |10 |2021-11-17 17:21:40.508|
   |2  |B   |20 |2021-11-17 17:21:40.508|
   |3  |C   |30 |2021-11-17 17:21:40.508|
   +---+----+---+-----------------------+
   
   ===================Snapshot Read==============================
   
+-------------------+--------------------+------------------+----------------------+-----------------+---+----+---+---+
   
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name|id
 |name|age|dt |
   
+-------------------+--------------------+------------------+----------------------+-----------------+---+----+---+---+
   
+-------------------+--------------------+------------------+----------------------+-----------------+---+----+---+---+
   
   ==============================================================
   ===================Incremental Read==============================
   
+-------------------+--------------------+------------------+----------------------+--------------------------------------+---+----+---+-----------------------+
   
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name
                     |id |name|age|dt                     |
   
+-------------------+--------------------+------------------+----------------------+--------------------------------------+---+----+---+-----------------------+
   |20211117172143     |20211117172143_0_1  |1                 |default         
      |f66c7e31-56ac-49cc-b147-9a1f27908c7f-0|1  |A   |10 |2021-11-17 
17:21:40.508|
   |20211117172143     |20211117172143_0_2  |2                 |default         
      |f66c7e31-56ac-49cc-b147-9a1f27908c7f-0|2  |B   |20 |2021-11-17 
17:21:40.508|
   |20211117172143     |20211117172143_0_3  |3                 |default         
      |f66c7e31-56ac-49cc-b147-9a1f27908c7f-0|3  |C   |30 |2021-11-17 
17:21:40.508|
   
+-------------------+--------------------+------------------+----------------------+--------------------------------------+---+----+---+-----------------------+
   
   ==============================================================
   
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to