[ 
https://issues.apache.org/jira/browse/HUDI-1770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vinoth Govindarajan updated HUDI-1770:
--------------------------------------
    Description: 
When delta streamer is using HoodieIncrSource from another parent Hudi table, 
it runs into this error, when you are not running your delta streamer pipeline 
frequently.

 
{code:java}
User class threw exception: org.apache.spark.sql.AnalysisException: Path does 
not exist: 
hdfs:///tmp/delta_streamer_test/datestr=2021-03-30/f64f3420-4e03-4835-ab06-5d73cb953aa9-0_3-4-91_20210402163524.parquet;
        at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:558)
        at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545)
        at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.immutable.List.flatMap(List.scala:355)
        at 
org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545)
        at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359)
        at 
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
        at 
org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:641)
        at 
org.apache.hudi.IncrementalRelation.buildScan(IncrementalRelation.scala:151)
        at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy.apply(DataSourceStrategy.scala:306)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79)
        at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
        at 
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:85)
        at 
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:106)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:106)
        at 
org.apache.hudi.HoodieSparkUtils$.createRdd(HoodieSparkUtils.scala:104)
        at 
org.apache.hudi.HoodieSparkUtils$.createRdd(HoodieSparkUtils.scala:95)
        at org.apache.hudi.HoodieSparkUtils.createRdd(HoodieSparkUtils.scala)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.lambda$readFromSource$4(DeltaSync.java:380)
        at org.apache.hudi.common.util.Option.map(Option.java:107)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:380)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:263)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:170)
        at org.apache.hudi.common.util.Option.ifPresent(Option.java:96)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:168)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:470)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:690)
{code}
 

I guess this is because of the inconsistency with the cleaner commit retained 
and archival process which cleans up the commit files 
([source|https://github.com/apache/hudi/blob/fe16d0de7c76105775c887b700751241bc82624c/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java#L123]):
{code:java}
private static final String DEFAULT_CLEANER_FILE_VERSIONS_RETAINED = "3";
private static final String DEFAULT_CLEANER_COMMITS_RETAINED = "10";
private static final String DEFAULT_MAX_COMMITS_TO_KEEP = "30";
private static final String DEFAULT_MIN_COMMITS_TO_KEEP = "20";
{code}
 

These file not found error is for the commits which are present in .hoodie 
folder but the actual version is cleaned by the cleaner, delta streamer is 
doing a listing of all the commits and selects the latest commit file, but when 
the actual committed version file is not there, it throws this error.

 

To recreate this error, you need to read from a hoodie table with 
HoodieIncrSource and schedule your pipeline to run twice a day (provided the 
cleaner cleans the commits in that interval).

 

*Possible Solution:*

Merge these two configs into one, and avoid this kind of inconsistent state in 
the HDFS.

 

 

  was:
When delta streamer is using HoodieIncrSource from another parent Hudi table, 
it runs into this error:

 
{code:java}
User class threw exception: org.apache.spark.sql.AnalysisException: Path does 
not exist: 
hdfs:///tmp/delta_streamer_test/datestr=2021-03-30/f64f3420-4e03-4835-ab06-5d73cb953aa9-0_3-4-91_20210402163524.parquet;
        at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:558)
        at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545)
        at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.immutable.List.flatMap(List.scala:355)
        at 
org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545)
        at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359)
        at 
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
        at 
org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:641)
        at 
org.apache.hudi.IncrementalRelation.buildScan(IncrementalRelation.scala:151)
        at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy.apply(DataSourceStrategy.scala:306)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79)
        at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
        at 
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:85)
        at 
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:106)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:106)
        at 
org.apache.hudi.HoodieSparkUtils$.createRdd(HoodieSparkUtils.scala:104)
        at 
org.apache.hudi.HoodieSparkUtils$.createRdd(HoodieSparkUtils.scala:95)
        at org.apache.hudi.HoodieSparkUtils.createRdd(HoodieSparkUtils.scala)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.lambda$readFromSource$4(DeltaSync.java:380)
        at org.apache.hudi.common.util.Option.map(Option.java:107)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:380)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:263)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:170)
        at org.apache.hudi.common.util.Option.ifPresent(Option.java:96)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:168)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:470)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:690)
{code}
 

I guess this is because of the inconsistency with the cleaner commit retained 
and archival process which cleans up the commit files 
([source|https://github.com/apache/hudi/blob/fe16d0de7c76105775c887b700751241bc82624c/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java#L123]):
{code:java}
private static final String DEFAULT_CLEANER_FILE_VERSIONS_RETAINED = "3";
private static final String DEFAULT_CLEANER_COMMITS_RETAINED = "10";
private static final String DEFAULT_MAX_COMMITS_TO_KEEP = "30";
private static final String DEFAULT_MIN_COMMITS_TO_KEEP = "20";
{code}
 

These file not found error is for the commits which are present in .hoodie 
folder but the actual version is cleaned by the cleaner, delta streamer is 
doing a listing of all the commits and selects the latest commit file, but when 
the actual committed version file is not there, it throws this error.

 

To recreate this error, you need to read from a hoodie table with 
HoodieIncrSource and schedule your pipeline to run twice a day (provided the 
cleaner cleans the commits in that interval).

 

*Possible Solution:*

Merge these two configs into one, and avoid this kind of inconsistent state in 
the HDFS.

 

 


> Deltastreamer throws errors when not running frequently
> -------------------------------------------------------
>
>                 Key: HUDI-1770
>                 URL: https://issues.apache.org/jira/browse/HUDI-1770
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: DeltaStreamer
>            Reporter: Vinoth Govindarajan
>            Priority: Major
>
> When delta streamer is using HoodieIncrSource from another parent Hudi table, 
> it runs into this error, when you are not running your delta streamer 
> pipeline frequently.
>  
> {code:java}
> User class threw exception: org.apache.spark.sql.AnalysisException: Path does 
> not exist: 
> hdfs:///tmp/delta_streamer_test/datestr=2021-03-30/f64f3420-4e03-4835-ab06-5d73cb953aa9-0_3-4-91_20210402163524.parquet;
>       at 
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:558)
>       at 
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545)
>       at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>       at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>       at scala.collection.immutable.List.foreach(List.scala:392)
>       at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>       at scala.collection.immutable.List.flatMap(List.scala:355)
>       at 
> org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545)
>       at 
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359)
>       at 
> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
>       at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
>       at 
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:641)
>       at 
> org.apache.hudi.IncrementalRelation.buildScan(IncrementalRelation.scala:151)
>       at 
> org.apache.spark.sql.execution.datasources.DataSourceStrategy.apply(DataSourceStrategy.scala:306)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
>       at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
>       at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
>       at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
>       at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>       at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>       at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>       at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
>       at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
>       at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
>       at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>       at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>       at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>       at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
>       at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
>       at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
>       at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>       at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>       at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>       at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
>       at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
>       at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
>       at 
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
>       at 
> org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79)
>       at 
> org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
>       at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:85)
>       at 
> org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)
>       at 
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:106)
>       at 
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:106)
>       at 
> org.apache.hudi.HoodieSparkUtils$.createRdd(HoodieSparkUtils.scala:104)
>       at 
> org.apache.hudi.HoodieSparkUtils$.createRdd(HoodieSparkUtils.scala:95)
>       at org.apache.hudi.HoodieSparkUtils.createRdd(HoodieSparkUtils.scala)
>       at 
> org.apache.hudi.utilities.deltastreamer.DeltaSync.lambda$readFromSource$4(DeltaSync.java:380)
>       at org.apache.hudi.common.util.Option.map(Option.java:107)
>       at 
> org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:380)
>       at 
> org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:263)
>       at 
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:170)
>       at org.apache.hudi.common.util.Option.ifPresent(Option.java:96)
>       at 
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:168)
>       at 
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:470)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:690)
> {code}
>  
> I guess this is because of the inconsistency with the cleaner commit retained 
> and archival process which cleans up the commit files 
> ([source|https://github.com/apache/hudi/blob/fe16d0de7c76105775c887b700751241bc82624c/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java#L123]):
> {code:java}
> private static final String DEFAULT_CLEANER_FILE_VERSIONS_RETAINED = "3";
> private static final String DEFAULT_CLEANER_COMMITS_RETAINED = "10";
> private static final String DEFAULT_MAX_COMMITS_TO_KEEP = "30";
> private static final String DEFAULT_MIN_COMMITS_TO_KEEP = "20";
> {code}
>  
> These file not found error is for the commits which are present in .hoodie 
> folder but the actual version is cleaned by the cleaner, delta streamer is 
> doing a listing of all the commits and selects the latest commit file, but 
> when the actual committed version file is not there, it throws this error.
>  
> To recreate this error, you need to read from a hoodie table with 
> HoodieIncrSource and schedule your pipeline to run twice a day (provided the 
> cleaner cleans the commits in that interval).
>  
> *Possible Solution:*
> Merge these two configs into one, and avoid this kind of inconsistent state 
> in the HDFS.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to