[
https://issues.apache.org/jira/browse/HUDI-1770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17458431#comment-17458431
]
sivabalan narayanan commented on HUDI-1770:
-------------------------------------------
we already have a patch to fix this scenario
[https://github.com/apache/hudi/pull/3946]
> Deltastreamer throws errors when not running frequently
> -------------------------------------------------------
>
> Key: HUDI-1770
> URL: https://issues.apache.org/jira/browse/HUDI-1770
> Project: Apache Hudi
> Issue Type: Bug
> Components: DeltaStreamer
> Affects Versions: 0.7.0, 0.8.0
> Reporter: Vinoth Govindarajan
> Priority: Major
> Labels: sev:high
>
> When delta streamer is using HoodieIncrSource from another parent Hudi table,
> it runs into this error, when you are not running your delta streamer
> pipeline frequently.
>
> {code:java}
> User class threw exception: org.apache.spark.sql.AnalysisException: Path does
> not exist:
> hdfs:///tmp/delta_streamer_test/datestr=2021-03-30/f64f3420-4e03-4835-ab06-5d73cb953aa9-0_3-4-91_20210402163524.parquet;
> at
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:558)
> at
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> at scala.collection.immutable.List.flatMap(List.scala:355)
> at
> org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545)
> at
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359)
> at
> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
> at
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:641)
> at
> org.apache.hudi.IncrementalRelation.buildScan(IncrementalRelation.scala:151)
> at
> org.apache.spark.sql.execution.datasources.DataSourceStrategy.apply(DataSourceStrategy.scala:306)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1334)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
> at
> org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79)
> at
> org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
> at
> org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:85)
> at
> org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:106)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:106)
> at
> org.apache.hudi.HoodieSparkUtils$.createRdd(HoodieSparkUtils.scala:104)
> at
> org.apache.hudi.HoodieSparkUtils$.createRdd(HoodieSparkUtils.scala:95)
> at org.apache.hudi.HoodieSparkUtils.createRdd(HoodieSparkUtils.scala)
> at
> org.apache.hudi.utilities.deltastreamer.DeltaSync.lambda$readFromSource$4(DeltaSync.java:380)
> at org.apache.hudi.common.util.Option.map(Option.java:107)
> at
> org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:380)
> at
> org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:263)
> at
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:170)
> at org.apache.hudi.common.util.Option.ifPresent(Option.java:96)
> at
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:168)
> at
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:470)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:690)
> {code}
>
> I guess this is because of the inconsistency with the cleaner commit retained
> and archival process which cleans up the commit files
> ([source|https://github.com/apache/hudi/blob/fe16d0de7c76105775c887b700751241bc82624c/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java#L123]):
> {code:java}
> private static final String DEFAULT_CLEANER_FILE_VERSIONS_RETAINED = "3";
> private static final String DEFAULT_CLEANER_COMMITS_RETAINED = "10";
> private static final String DEFAULT_MAX_COMMITS_TO_KEEP = "30";
> private static final String DEFAULT_MIN_COMMITS_TO_KEEP = "20";
> {code}
>
> These file not found error is for the commits which are present in .hoodie
> folder but the actual version is cleaned by the cleaner, delta streamer is
> doing a listing of all the commits and selects the latest commit file, but
> when the actual committed version file is not there, it throws this error.
>
> To recreate this error, you need to read from a hoodie table with
> HoodieIncrSource and schedule your pipeline to run twice a day (provided the
> cleaner cleans the commits in that interval).
>
> *Possible Solution:*
> Merge these two configs into one, and avoid this kind of inconsistent state
> in the HDFS.
>
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)