[
https://issues.apache.org/jira/browse/HUDI-2275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17425756#comment-17425756
]
Dave Hagman commented on HUDI-2275:
-----------------------------------
[~shivnarayan] I received the same error (above) even when running from the jar
built off of your branch. First I receive many errors (classified as warns)
about the timeline being inconsistent (from the deltastreamer making changes)
and then the job ultimately fails due to the call to _*checkArgument*_:
Error Details:
{code:java}
java.lang.IllegalArgumentException: Last known instant from client was
20211007191403 but server has the following timeline
[[20211005201655__rollback__COMPLETED], [20211005202809__rollback__COMPLETED],
[20211006132623__rollback__COMPLETED], [20211007164718__rollback__COMPLETED],
[20211007165545__rollback__COMPLETED], [20211007170747__rollback__COMPLETED],
[20211007190436__commit__COMPLETED], [20211007190515__commit__COMPLETED],
[20211007190542__clean__COMPLETED], [20211007190551__commit__COMPLETED],
[20211007190614__clean__COMPLETED], [20211007190624__commit__COMPLETED],
[20211007190646__clean__COMPLETED], [20211007190656__commit__COMPLETED],
[20211007190719__clean__COMPLETED], [20211007190730__commit__COMPLETED],
[20211007190753__clean__COMPLETED], [20211007190804__commit__COMPLETED],
[20211007190828__clean__COMPLETED], [20211007190838__commit__COMPLETED],
[20211007190900__clean__COMPLETED], [20211007190910__commit__COMPLETED],
[20211007190933__clean__COMPLETED], [20211007190943__commit__COMPLETED],
[20211007191009__clean__COMPLETED], [20211007191020__commit__COMPLETED],
[20211007191046__clean__COMPLETED], [20211007191056__commit__COMPLETED],
[20211007191118__clean__COMPLETED], [20211007191129__commit__COMPLETED],
[20211007191151__clean__COMPLETED], [20211007191201__commit__COMPLETED],
[20211007191223__clean__COMPLETED], [20211007191233__commit__COMPLETED],
[20211007191257__clean__COMPLETED], [20211007191307__commit__COMPLETED],
[20211007191329__clean__COMPLETED], [20211007191339__commit__COMPLETED],
[20211007191403__clean__COMPLETED], [20211007191413__commit__COMPLETED],
[20211007191438__clean__COMPLETED], [20211007191448__commit__COMPLETED],
[20211007191510__clean__COMPLETED], [20211007191511__rollback__COMPLETED],
[20211007191522__commit__COMPLETED], [20211007191546__clean__COMPLETED],
[20211007191556__commit__COMPLETED], [20211007191624__clean__COMPLETED],
[20211007191634__commit__COMPLETED], [20211007191704__clean__COMPLETED]]
at
org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:40)
at
org.apache.hudi.timeline.service.RequestHandler$ViewHandler.handle(RequestHandler.java:510)
at
io.javalin.security.SecurityUtil.noopAccessManager(SecurityUtil.kt:22)
at io.javalin.Javalin.lambda$addHandler$0(Javalin.java:606)
at
io.javalin.core.JavalinServlet$service$2$1.invoke(JavalinServlet.kt:46)
at
io.javalin.core.JavalinServlet$service$2$1.invoke(JavalinServlet.kt:17)
at
io.javalin.core.JavalinServlet$service$1.invoke(JavalinServlet.kt:143)
at io.javalin.core.JavalinServlet$service$2.invoke(JavalinServlet.kt:41)
at io.javalin.core.JavalinServlet.service(JavalinServlet.kt:107)
at
io.javalin.core.util.JettyServerUtil$initialize$httpHandler$1.doHandle(JettyServerUtil.kt:72)
at
org.apache.hudi.org.apache.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203)
at
org.apache.hudi.org.apache.jetty.servlet.ServletHandler.doScope(ServletHandler.java:480)
at
org.apache.hudi.org.apache.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1668)
at
org.apache.hudi.org.apache.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201)
at
org.apache.hudi.org.apache.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1247)
at
org.apache.hudi.org.apache.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144)
at
org.apache.hudi.org.apache.jetty.server.handler.HandlerList.handle(HandlerList.java:61)
at
org.apache.hudi.org.apache.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:174)
at
org.apache.hudi.org.apache.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
at
org.apache.hudi.org.apache.jetty.server.Server.handle(Server.java:502)
at
org.apache.hudi.org.apache.jetty.server.HttpChannel.handle(HttpChannel.java:370)
at
org.apache.hudi.org.apache.jetty.server.HttpConnection.onFillable(HttpConnection.java:267)
at
org.apache.hudi.org.apache.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305)
at
org.apache.hudi.org.apache.jetty.io.FillInterest.fillable(FillInterest.java:103)
at
org.apache.hudi.org.apache.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117)
at
org.apache.hudi.org.apache.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333)
at
org.apache.hudi.org.apache.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310)
at
org.apache.hudi.org.apache.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168)
at
org.apache.hudi.org.apache.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126)
at
org.apache.hudi.org.apache.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366)
at
org.apache.hudi.org.apache.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:765)
at
org.apache.hudi.org.apache.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:683)
at java.lang.Thread.run(Thread.java:748)
{code}
and then the job fails with:
{code:java}
Exception in thread "main" java.lang.IllegalArgumentException
at
org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:31)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:450)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.transitionState(HoodieActiveTimeline.java:431)
at
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.saveAsComplete(HoodieActiveTimeline.java:153)
at
org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:219)
at
org.apache.hudi.client.AbstractHoodieWriteClient.commitStats(AbstractHoodieWriteClient.java:191)
at
org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:124)
at
org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:620)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:274)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:164)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
at
org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
at
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
{code}
> HoodieDeltaStreamerException when using OCC and a second concurrent writer
> --------------------------------------------------------------------------
>
> Key: HUDI-2275
> URL: https://issues.apache.org/jira/browse/HUDI-2275
> Project: Apache Hudi
> Issue Type: Bug
> Components: DeltaStreamer, Spark Integration, Writer Core
> Affects Versions: 0.9.0
> Reporter: Dave Hagman
> Assignee: Sagar Sumit
> Priority: Critical
> Labels: sev:critical
> Fix For: 0.10.0
>
>
> I am trying to utilize [Optimistic Concurrency
> Control|https://hudi.apache.org/docs/concurrency_control] in order to allow
> two writers to update a single table simultaneously. The two writers are:
> * Writer A: Deltastreamer job consuming continuously from Kafka
> * Writer B: A spark datasource-based writer that is consuming parquet files
> out of S3
> * Table Type: Copy on Write
>
> After a few commits from each writer the deltastreamer will fail with the
> following exception:
>
> {code:java}
> org.apache.hudi.exception.HoodieDeltaStreamerException: Unable to find
> previous checkpoint. Please double check if this table was indeed built via
> delta streamer. Last Commit :Option{val=[20210803165741__commit__COMPLETED]},
> Instants :[[20210803165741__commit__COMPLETED]], CommitMetadata={
> "partitionToWriteStats" : {
> ...{code}
>
> What appears to be happening is a lack of commit isolation between the two
> writers
> Writer B (spark datasource writer) will land commits which are eventually
> picked up by Writer A (Delta Streamer). This is an issue because the Delta
> Streamer needs checkpoint information which the spark datasource of course
> does not include in its commits. My understanding was that OCC was built for
> this very purpose (among others).
> OCC config for Delta Streamer:
> {code:java}
> hoodie.write.concurrency.mode=optimistic_concurrency_control
> hoodie.cleaner.policy.failed.writes=LAZY
>
> hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
> hoodie.write.lock.zookeeper.url=<zk_host>
> hoodie.write.lock.zookeeper.port=2181
> hoodie.write.lock.zookeeper.lock_key=writer_lock
> hoodie.write.lock.zookeeper.base_path=/hudi-write-locks{code}
>
> OCC config for spark datasource:
> {code:java}
> // Multi-writer concurrency
> .option("hoodie.cleaner.policy.failed.writes", "LAZY")
> .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
> .option(
> "hoodie.write.lock.provider",
>
> org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider.class.getCanonicalName()
> )
> .option("hoodie.write.lock.zookeeper.url", jobArgs.zookeeperHost)
> .option("hoodie.write.lock.zookeeper.port", jobArgs.zookeeperPort)
> .option("hoodie.write.lock.zookeeper.lock_key", "writer_lock")
> .option("hoodie.write.lock.zookeeper.base_path", "/hudi-write-locks"){code}
> h3. Steps to Reproduce:
> * Start a deltastreamer job against some table Foo
> * In parallel, start writing to the same table Foo using spark datasource
> writer
> * Note that after a few commits from each the deltastreamer is likely to
> fail with the above exception when the datasource writer creates non-isolated
> inflight commits
> NOTE: I have not tested this with two of the same datasources (ex. two
> deltastreamer jobs)
> NOTE 2: Another detail that may be relevant is that the two writers are on
> completely different spark clusters but I assumed this shouldn't be an issue
> since we're locking using Zookeeper
--
This message was sent by Atlassian Jira
(v8.3.4#803005)