nmahmood630 opened a new issue #2987:
URL: https://github.com/apache/hudi/issues/2987


   **Describe the problem you faced**
   
   I am trying to make an incremental query to get changes to my Hudi table 
since the last commit, but when I run:
   
   ```
   commits = list(map(lambda row: row[0], spark.sql(
           "select distinct(_hoodie_commit_time) as commitTime from  
aggregated_device_geo_ips_snapshot order by commitTime").limit(
           50).collect()))
       begin_time = commits[len(commits) - 2]
   ```
   
   this only returns the latest commit (commits : ['20210525033453']), though I 
have upserted to the table 8 times. The .hoodie folder in S3 shows 8 commits: 
   ![Screen Shot 2021-05-24 at 10 57 07 
PM](https://user-images.githubusercontent.com/6695264/119437445-bf6b2400-bce3-11eb-9eb9-3a96a3eb1a0b.png)
   
   
   This is my code to read/write from the Hudi tables:
   
   `def write_aggregated_data_to_hudi_table(data_frame, args):
       hudi_table_name = 'aggregated_device_geo_ip'
       hudi_table_path = 's3://' + args[HUDI_TABLE_S3_BUCKET_NAME] + '/' + 
hudi_table_name
   
       hudi_options = {
           'hoodie.table.name': hudi_table_name,
           'hoodie.datasource.write.recordkey.field': 'customer_id, 
device_serial_number, device_type',
           'hoodie.datasource.write.partitionpath.field': 'last_event_date',
           'hoodie.datasource.write.keygenerator.class': 
'org.apache.hudi.keygen.ComplexKeyGenerator',
           'hoodie.datasource.write.table.name': hudi_table_path,
           'hoodie.datasource.write.operation': 'upsert',
           'hoodie.datasource.write.precombine.field': 'last_event_timestamp',
           'hoodie.upsert.shuffle.parallelism': 20,
           'hoodie.insert.shuffle.parallelism': 20
       }
   
       (
           data_frame
               .write
               .format("hudi")
               .options(**hudi_options)
               .mode("Append")
               .save(hudi_table_path)
        )
   
   def get_updated_device_geoips(spark, args):
       hudi_table_name = 'aggregated_device_geo_ip'
       hudi_table_path = 's3://' + args[HUDI_TABLE_S3_BUCKET_NAME] + '/' + 
hudi_table_name
   
       spark. \
           read. \
           format("hudi"). \
           load(hudi_table_path + "/*"). \
           createOrReplaceTempView("aggregated_device_geo_ips_snapshot")
   
       commits = list(map(lambda row: row[0], spark.sql(
           "select distinct(_hoodie_commit_time) as commitTime from  
aggregated_device_geo_ips_snapshot order by commitTime").limit(
           50).collect()))
       print(commits)
       begin_time = commits[len(commits) - 2]
       print(begin_time)
   
       # incrementally query data
       incremental_read_options = {
           'hoodie.datasource.query.type': 'incremental',
           'hoodie.datasource.read.begin.instanttime': begin_time,
       }
   
       aggregated_device_geo_ip_incremental_df = spark.read.format("hudi"). \
           options(**incremental_read_options). \
           load(hudi_table_path)
   
       # print("UPDATED_DEVICE_GEOIPS" + 
aggregated_device_geo_ip_incremental_df.to_string())
       return aggregated_device_geo_ip_incremental_df`
   
   **Expected behavior**
   
   I expect all the commits listed in the .hoodie folder to be returned so that 
I can select the previous commit to do an incremental query.
   
   **Environment Description**
   
   * Hudi version :
   
   * Spark version :
   
   * Hive version :
   
   * Hadoop version :
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   Add any other context about the problem here.
   
   **Stacktrace**
   
   ```Add the stacktrace of the error.```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to