[ 
https://issues.apache.org/jira/browse/HUDI-2772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-2772:
--------------------------------------
    Description: 
Even after setting the right config to copy over deltastreamer checkpoint, 
deltastreamer fails to read the checkpoint from previous commit metadata. This 
is not something that happens in general. In this case, in continuous mode, 
there is no data in source (parquet dfs) folder and so deltatastreamer 
continuously checks source folder and also loads last checkpoint from timeline 
metadata. So, with this set up, when a write from spark-datasource is 
triggered, deltastreamer immediately fails to read the checkpoint from the 
completed spark-writer commit.  But if deltastreamer is restarted, the 
exception is not seen and picks up the checkpoint. 

I induced a 1 sec delay in continuous mode and things were fine too. 

 

Setup:

Deltastreamer in continuous mode. source folder did not have any data, and so 
deltastreamer was checking source folder and fetching latest checkpoint from 
commit metadata in quick succession. 

And triggered a concurrent write from spark-datasource. 

 

I inspected the last commit.completed instant(that was reported by 
deltastreamer) made by spark writer and it looks ok to me. 
{code:java}
grep "checkpoint" 
/tmp/hudi-deltastreamer-gh-mw/.hoodie/20211116074129737.deltacommit
    "deltastreamer.checkpoint.key" : "1637066483000" {code}
But after the below exception, if I restart deltastreamer, it just runs fine. 
Very strange? I was able to reprod this 2 times out of 5.  

here is the checkpoint from last delta commit by deltastreamer (which matches 
the entry found by delta commit by spark writer above)
{code:java}
grep "checkpoint" 
/tmp/hudi-deltastreamer-gh-mw/.hoodie/20211116074123384.deltacommit
    "deltastreamer.checkpoint.key" : "1637066483000" {code}
 

I also check detlastreamer code and we do look at only completed instants and 
the completed commit metadata. So, not sure why is this happening. 

stacktrace: 
{code:java}
21/11/16 10:51:15 WARN HoodieDeltaStreamer: Next round 
21/11/16 10:51:15 WARN DeltaSync: Extra metadata :: 20211116105105578, 
20211116105105578.deltacommit, = [schema, deltastreamer.checkpoint.key]
21/11/16 10:51:15 WARN HoodieDeltaStreamer: Next round 
21/11/16 10:51:15 WARN DeltaSync: Extra metadata :: 20211116105105578, 
20211116105105578.deltacommit, = [schema, deltastreamer.checkpoint.key]
21/11/16 10:51:15 WARN HoodieDeltaStreamer: Next round 
21/11/16 10:51:15 WARN DeltaSync: Extra metadata :: 20211116105112814, 
20211116105112814.deltacommit, = []
21/11/16 10:51:15 ERROR HoodieDeltaStreamer: Shutting down delta-sync due to 
exception
org.apache.hudi.utilities.exception.HoodieDeltaStreamerException: Unable to 
find previous checkpoint. Please double check if this table was indeed built 
via delta streamer. Last Commit 
:Option{val=[20211116105112814__deltacommit__COMPLETED]}, Instants 
:[[20211116104228269__deltacommit__COMPLETED], 
[20211116104553080__deltacommit__COMPLETED], 
[20211116104759622__deltacommit__COMPLETED], 
[20211116105105578__deltacommit__COMPLETED], 
[20211116105112814__deltacommit__COMPLETED]], CommitMetadata={
  "partitionToWriteStats" : { },
  "compacted" : false,
  "extraMetadata" : { },
  "operationType" : "UNKNOWN",
  "fileIdAndRelativePaths" : { },
  "totalRecordsDeleted" : 0,
  "totalLogRecordsCompacted" : 0,
  "totalLogFilesCompacted" : 0,
  "totalCompactedRecordsUpdated" : 0,



{code}

  was:
Even after setting the right config to copy over deltastreamer checkpoint, 
deltastreamer fails to read the checkpoint from previous commit metadata. This 
is not something that happens in general. In this case, in continuous mode, 
there is no data in source (parquet dfs) folder and so deltatastreamer 
continuously checks source folder and also loads last checkpoint from timeline 
metadata. So, with this set up, when a write from spark-datasource is 
triggered, deltastreamer immediately fails to read the checkpoint from the 
completed spark-writer commit.  But if deltastreamer is restarted, the 
exception is not seen and picks up the checkpoint. 

I induced a 1 sec delay in continuous mode and things were fine too. 

 

Setup:

Deltastreamer in continuous mode. source folder did not have any data, and so 
deltastreamer was checking source folder and fetching latest checkpoint from 
commit metadata in quick succession. 

And triggered a concurrent write from spark-datasource. 

 

I inspected the last commit.completed instant(that was reported by 
deltastreamer) made by spark writer and it looks ok to me. 
{code:java}
grep "checkpoint" 
/tmp/hudi-deltastreamer-gh-mw/.hoodie/20211116074129737.deltacommit
    "deltastreamer.checkpoint.key" : "1637066483000" {code}
But after the below exception, if I restart deltastreamer, it just runs fine. 
Very strange? I was able to reprod this 2 times out of 5.  

here is the checkpoint from last delta commit by deltastreamer (which matches 
the entry found by delta commit by spark writer above)
{code:java}
grep "checkpoint" 
/tmp/hudi-deltastreamer-gh-mw/.hoodie/20211116074123384.deltacommit
    "deltastreamer.checkpoint.key" : "1637066483000" {code}
 

I also check detlastreamer code and we do look at only completed instants and 
the completed commit metadata. So, not sure why is this happening. 

stacktrace: 
{code:java}
21/11/16 07:41:31 ERROR HoodieDeltaStreamer: Shutting down delta-sync due to 
exception
org.apache.hudi.utilities.exception.HoodieDeltaStreamerException: Unable to 
find previous checkpoint. Please double check if this table was indeed built 
via delta streamer. Last Commit 
:Option{val=[20211116074129737__deltacommit__COMPLETED]}, Instants 
:[[20211116072710523__deltacommit__COMPLETED], 
[20211116072748906__deltacommit__COMPLETED], 
[20211116072910768__deltacommit__COMPLETED], 
[20211116074114874__deltacommit__COMPLETED], 
[20211116074123384__deltacommit__COMPLETED], 
[20211116074129737__deltacommit__COMPLETED]], CommitMetadata={
  "partitionToWriteStats" : { },
  "compacted" : false,
  "extraMetadata" : { },
  "operationType" : "UNKNOWN",
  "fileIdAndRelativePaths" : { },
  "totalRecordsDeleted" : 0,
  "totalLogRecordsCompacted" : 0,
  "totalLogFilesCompacted" : 0,
  "totalCompactedRecordsUpdated" : 0,
  "totalLogFilesSize" : 0,
  "totalScanTime" : 0,
  "totalCreateTime" : 0,
  "totalUpsertTime" : 0,
  "minAndMaxEventTime" : {
    "Optional.empty" : {
      "val" : null,
      "present" : false
    }
  },
  "writePartitionPaths" : [ ]
}
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:346)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:281)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:634)
        at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
21/11/16 07:41:31 WARN HoodieDeltaStreamer: Gracefully shutting down compactor
21/11/16 07:41:39 WARN SparkRDDWriteClient: Slept for 20 secs, proceeding 
21/11/16 07:41:40 ERROR HoodieAsyncService: Monitor noticed one or more threads 
failed. Requesting graceful shutdown of other threads
java.util.concurrent.ExecutionException: 
org.apache.hudi.exception.HoodieException: Unable to find previous checkpoint. 
Please double check if this table was indeed built via delta streamer. Last 
Commit :Option{val=[20211116074129737__deltacommit__COMPLETED]}, Instants 
:[[20211116072710523__deltacommit__COMPLETED], 
[20211116072748906__deltacommit__COMPLETED], 
[20211116072910768__deltacommit__COMPLETED], 
[20211116074114874__deltacommit__COMPLETED], 
[20211116074123384__deltacommit__COMPLETED], 
[20211116074129737__deltacommit__COMPLETED]], CommitMetadata={{code}


> Deltastreamer fails to read checkpoint from previous commit metadata by spark 
> writer on continuous mode where there is no data in source
> ----------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: HUDI-2772
>                 URL: https://issues.apache.org/jira/browse/HUDI-2772
>             Project: Apache Hudi
>          Issue Type: Sub-task
>            Reporter: sivabalan narayanan
>            Assignee: sivabalan narayanan
>            Priority: Critical
>
> Even after setting the right config to copy over deltastreamer checkpoint, 
> deltastreamer fails to read the checkpoint from previous commit metadata. 
> This is not something that happens in general. In this case, in continuous 
> mode, there is no data in source (parquet dfs) folder and so deltatastreamer 
> continuously checks source folder and also loads last checkpoint from 
> timeline metadata. So, with this set up, when a write from spark-datasource 
> is triggered, deltastreamer immediately fails to read the checkpoint from the 
> completed spark-writer commit.  But if deltastreamer is restarted, the 
> exception is not seen and picks up the checkpoint. 
> I induced a 1 sec delay in continuous mode and things were fine too. 
>  
> Setup:
> Deltastreamer in continuous mode. source folder did not have any data, and so 
> deltastreamer was checking source folder and fetching latest checkpoint from 
> commit metadata in quick succession. 
> And triggered a concurrent write from spark-datasource. 
>  
> I inspected the last commit.completed instant(that was reported by 
> deltastreamer) made by spark writer and it looks ok to me. 
> {code:java}
> grep "checkpoint" 
> /tmp/hudi-deltastreamer-gh-mw/.hoodie/20211116074129737.deltacommit
>     "deltastreamer.checkpoint.key" : "1637066483000" {code}
> But after the below exception, if I restart deltastreamer, it just runs fine. 
> Very strange? I was able to reprod this 2 times out of 5.  
> here is the checkpoint from last delta commit by deltastreamer (which matches 
> the entry found by delta commit by spark writer above)
> {code:java}
> grep "checkpoint" 
> /tmp/hudi-deltastreamer-gh-mw/.hoodie/20211116074123384.deltacommit
>     "deltastreamer.checkpoint.key" : "1637066483000" {code}
>  
> I also check detlastreamer code and we do look at only completed instants and 
> the completed commit metadata. So, not sure why is this happening. 
> stacktrace: 
> {code:java}
> 21/11/16 10:51:15 WARN HoodieDeltaStreamer: Next round 
> 21/11/16 10:51:15 WARN DeltaSync: Extra metadata :: 20211116105105578, 
> 20211116105105578.deltacommit, = [schema, deltastreamer.checkpoint.key]
> 21/11/16 10:51:15 WARN HoodieDeltaStreamer: Next round 
> 21/11/16 10:51:15 WARN DeltaSync: Extra metadata :: 20211116105105578, 
> 20211116105105578.deltacommit, = [schema, deltastreamer.checkpoint.key]
> 21/11/16 10:51:15 WARN HoodieDeltaStreamer: Next round 
> 21/11/16 10:51:15 WARN DeltaSync: Extra metadata :: 20211116105112814, 
> 20211116105112814.deltacommit, = []
> 21/11/16 10:51:15 ERROR HoodieDeltaStreamer: Shutting down delta-sync due to 
> exception
> org.apache.hudi.utilities.exception.HoodieDeltaStreamerException: Unable to 
> find previous checkpoint. Please double check if this table was indeed built 
> via delta streamer. Last Commit 
> :Option{val=[20211116105112814__deltacommit__COMPLETED]}, Instants 
> :[[20211116104228269__deltacommit__COMPLETED], 
> [20211116104553080__deltacommit__COMPLETED], 
> [20211116104759622__deltacommit__COMPLETED], 
> [20211116105105578__deltacommit__COMPLETED], 
> [20211116105112814__deltacommit__COMPLETED]], CommitMetadata={
>   "partitionToWriteStats" : { },
>   "compacted" : false,
>   "extraMetadata" : { },
>   "operationType" : "UNKNOWN",
>   "fileIdAndRelativePaths" : { },
>   "totalRecordsDeleted" : 0,
>   "totalLogRecordsCompacted" : 0,
>   "totalLogFilesCompacted" : 0,
>   "totalCompactedRecordsUpdated" : 0,
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to