t0il3ts0ap commented on issue #2934:
URL: https://github.com/apache/hudi/issues/2934#issuecomment-850312591


   @n3nash Sorry for late reply, was away on vacation
   1. We run one instance of deltastreamer job every 2 hrs on source table. 
Each run at the max sources 6 million records using --source-limit parameter.
   2. ~ 12 runs of deltastreamer. Not sure if results in 12 commits.  ( Please 
share some knowledge or documentation here. I apologize that I dont know this. 
) 
   3. Please assume defaults for this. I have not modified cleaner policy for 
deltastreamer. 
   4. This is failing in first run itself. I checked manually, there are no 
files for oldest commits. 
   
   A sample. case. 
   Lets say I open .hoodie directory for a COW table with no partition
   <img width="1172" alt="Screenshot 2021-05-28 at 3 29 26 PM" 
src="https://user-images.githubusercontent.com/8509512/119967062-9be8f780-bfc9-11eb-9bca-87cb8fc4fd07.png";>
   Now, I open this commit file `20210524175014.commit` to find following data:
   ```
   {
     "partitionToWriteStats" : {
       "default" : [ {
         "fileId" : "890de7c3-7f0d-4586-8e66-83a9f8d9c106-0",
         "path" : 
"default/890de7c3-7f0d-4586-8e66-83a9f8d9c106-0_0-23-13521_20210524175014.parquet",
         "prevCommit" : "20210524160842",
         "numWrites" : 325368,
         "numDeletes" : 0,
         "numUpdateWrites" : 0,
         "numInserts" : 115,
         "totalWriteBytes" : 32817083,
         "totalWriteErrors" : 0,
         "tempPath" : null,
         "partitionPath" : "default",
         "totalLogRecords" : 0,
         "totalLogFilesCompacted" : 0,
         "totalLogSizeCompacted" : 0,
         "totalUpdatedRecordsCompacted" : 0,
         "totalLogBlocks" : 0,
         "totalCorruptLogBlock" : 0,
         "totalRollbackBlocks" : 0,
         "fileSizeInBytes" : 32817083
       } ]
     },
     "compacted" : false,
     "extraMetadata" : {
       "schema" : 
"{\"type\":\"record\",\"name\":\"hoodie_source\",\"namespace\":\"hoodie.source\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"created_at\",\"type\":[\"string\",\"null\"]},{\"name\":\"created_by\",\"type\":[\"string\",\"null\"]},{\"name\":\"last_modified_by\",\"type\":[\"string\",\"null\"]},{\"name\":\"version\",\"type\":[\"long\",\"null\"]},{\"name\":\"address_reference_id\",\"type\":[\"string\",\"null\"]},{\"name\":\"city\",\"type\":[\"string\",\"null\"]},{\"name\":\"current\",\"type\":[\"boolean\",\"null\"]},{\"name\":\"line_one\",\"type\":[\"string\",\"null\"]},{\"name\":\"line_two\",\"type\":[\"string\",\"null\"]},{\"name\":\"permanent\",\"type\":[\"boolean\",\"null\"]},{\"name\":\"pin_code\",\"type\":[\"string\",\"null\"]},{\"name\":\"source\",\"type\":[\"string\",\"null\"]},{\"name\":\"state\",\"type\":[\"string\",\"null\"]},{\"name\":\"type\",\"type\":[\"string\",\"null\"]},{\"name\":\"email_address\",\"type\":[\"string\",\"null\"]},{\"name\":\
 
"collection_case_id\",\"type\":[\"long\",\"null\"]},{\"name\":\"updated_date\",\"type\":[\"string\",\"null\"]},{\"name\":\"updated_at\",\"type\":[\"string\",\"null\"]},{\"name\":\"latitude\",\"type\":[\"double\",\"null\"]},{\"name\":\"longitude\",\"type\":[\"double\",\"null\"]},{\"name\":\"__lsn\",\"type\":[\"long\",\"null\"]},{\"name\":\"_hoodie_is_deleted\",\"type\":[\"boolean\",\"null\"]}]}",
       "deltastreamer.checkpoint.key" : 
"collection_service.public.addresses,0:1066373,1:1064984,2:1065562,3:1070617,4:1067374,5:1066110"
     },
     "operationType" : "UPSERT",
     "fileIdAndRelativePaths" : {
       "890de7c3-7f0d-4586-8e66-83a9f8d9c106-0" : 
"default/890de7c3-7f0d-4586-8e66-83a9f8d9c106-0_0-23-13521_20210524175014.parquet"
     },
     "totalRecordsDeleted" : 0,
     "totalLogRecordsCompacted" : 0,
     "totalLogFilesCompacted" : 0,
     "totalCompactedRecordsUpdated" : 0,
     "totalLogFilesSize" : 0,
     "totalScanTime" : 0,
     "totalCreateTime" : 0,
     "totalUpsertTime" : 5089
   }
   ```
   
   Now , this 
`default/890de7c3-7f0d-4586-8e66-83a9f8d9c106-0_0-23-13521_20210524175014.parquet`
 mentioned in the commit is missing. This error is raised only while doing 
[incremental query 
](https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
 )
   
   But if I just search the prefix `890de7c3-7f0d-4586-8e66-83a9f8d9c106` there 
are many parquet files which I can see
   
   
   Right now, I have overridden `HoodieIncrSource.java` when running 
deltastreamer for target table to make it work.
   Added some logic like below, 
   If no checkpoint is found (basically this is first run), do a snapshot query 
and save last commit as checkpoint. 
   If checkpoint is found carry on with incremental query as usual.
   
   What this benefits us with is that we are always doing incremental query on 
newer commits rather than older commits. Newer commits's parquet file is 
present. Also it solves the problem of taking snapshot automatically, when 
reading from a hudi table for the first time.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to