maheshguptags commented on issue #12738:
URL: https://github.com/apache/hudi/issues/12738#issuecomment-2684762042

   Thanks to @cshuo for connecting over the call and offering suggestions.
   
   I spoke with @cshuo, and he recommended removing 
`hoodie.write.lock.provider` while keeping these two configurations as they 
are:` 'metadata.enabled'='true'` and 
`'hoodie.write.concurrency.mode'='SINGLE_WRITER'`, with a fresh table. However, 
it seems that `hoodie.write.lock.provider` is still required if we include 
these two configurations when creating the table.
   
   @cshuo, one question arises: If `metadata.enabled` and `concurrency.mode` is 
enabled by default, why isn’t this error occurring in  [your 
example](https://gist.github.com/cshuo/f93144ac1527fb039fa1242eab172835)?
   
   **Another approach I could try is to completely remove it(MDT-related 
config), as you did(keep default), and then do the testing.**
   
   also attaching table creation DDL as discussed :
   
   `app.streamingDdlQuery=CREATE TABLE IF NOT EXISTS hudi_table( 
   address STRING, x STRING,company_name STRING,first_active_date 
TIMESTAMP(3),last_active_date TIMESTAMP(3),y STRING ,ts TIMESTAMP(3),user_id 
STRING)PARTITIONED BY (`client_id`)WITH ('connector' = 
'hudi','write.task.max.size' = '2048','write.merge.max_memory' = '1024','path' 
= 's3a:/tmp/hudi_table/','table.type' = 
'COPY_ON_WRITE','hoodie.datasource.write.recordkey.field' = 
'x,y','payload.class'='com.gupshup.cdp.poc','precombine.field'='ts','hoodie.clean.async'='true','hoodie.cleaner.policy'
 = 'KEEP_LATEST_COMMITS', 'hoodie.clean.automatic' = 
'true','hoodie.clean.max.commits'='8','hoodie.clean.trigger.strategy'='NUM_COMMITS','hoodie.cleaner.parallelism'='100','hoodie.cleaner.commits.retained'='6',
 'hoodie.index.type'= 'BUCKET','hoodie.index.bucket.engine' = 
'SIMPLE','hoodie.bucket.index.num.buckets'='16','hoodie.bucket.index.hash.field'='y','hoodie.parquet.small.file.limit'='104857600','hoodie.parquet.compression.codec'='snappy','hoodie.schema.on.read.enable'=
 'true','hoodie.archive.
 automatic'='true','hoodie.keep.max.commits'= '45','hoodie.keep.min.commits'= 
'30','metadata.enabled'='true','hoodie.write.concurrency.mode'='SINGLE_WRITER')`
   
   error after enabling 
   ```
    ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - 
Fatal error occurred in the cluster entrypoint.
   org.apache.flink.util.FlinkRuntimeException: Failed to start the operator 
coordinators
        at 
org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:170)
 ~[flink-dist-1.18.1.jar:1.18.1]
        at 
org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:82)
 ~[flink-dist-1.18.1.jar:1.18.1]
        at 
org.apache.flink.runtime.scheduler.adaptive.CreatingExecutionGraph.handleExecutionGraphCreation(CreatingExecutionGraph.java:127)
 ~[flink-dist-1.18.1.jar:1.18.1]
        at 
org.apache.flink.runtime.scheduler.adaptive.CreatingExecutionGraph.lambda$null$0(CreatingExecutionGraph.java:84)
 ~[flink-dist-1.18.1.jar:1.18.1]
        at 
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.runIfState(AdaptiveScheduler.java:1246)
 ~[flink-dist-1.18.1.jar:1.18.1]
        at 
org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler.lambda$runIfState$29(AdaptiveScheduler.java:1261)
 ~[flink-dist-1.18.1.jar:1.18.1]
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
~[?:?]
        at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
        at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRunAsync$4(PekkoRpcActor.java:451)
 ~[flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
        at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
 ~[flink-dist-1.18.1.jar:1.18.1]
        at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRunAsync(PekkoRpcActor.java:451)
 ~[flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
        at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:218)
 ~[flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
        at 
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
 ~[flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
        at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
 ~[flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
        at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) 
[flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
        at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) 
[flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) 
[flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) 
[flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
        at 
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) 
[flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) 
[flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
[flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
[flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
        at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) 
[flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
        at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) 
[flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
        at 
org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) 
[flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
        at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) 
[flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
        at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) 
[flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
        at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) 
[flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
        at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) 
[flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
        at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) 
[flink-rpc-akka93f416ab-a5be-42bf-ac88-6e0e6f6adc3b.jar:1.18.1]
        at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?]
        at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown 
Source) [?:?]
        at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?]
        at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) [?:?]
        at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) [?:?]
   Caused by: org.apache.hudi.exception.HoodieException: Unable to instantiate 
class org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider
        at 
org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:75) 
~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
        at 
org.apache.hudi.client.transaction.lock.LockManager.getLockProvider(LockManager.java:111)
 
~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
        at 
org.apache.hudi.client.transaction.lock.LockManager.unlock(LockManager.java:98) 
~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
        at 
org.apache.hudi.client.HoodieFlinkTableServiceClient.initMetadataTable(HoodieFlinkTableServiceClient.java:211)
 
~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
        at 
org.apache.hudi.client.HoodieFlinkWriteClient.initMetadataTable(HoodieFlinkWriteClient.java:319)
 
~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
        at 
org.apache.hudi.sink.StreamWriteOperatorCoordinator.initMetadataTable(StreamWriteOperatorCoordinator.java:352)
 
~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
        at 
org.apache.hudi.sink.StreamWriteOperatorCoordinator.start(StreamWriteOperatorCoordinator.java:197)
 
~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
        at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:185)
 ~[flink-dist-1.18.1.jar:1.18.1]
        at 
org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:165)
 ~[flink-dist-1.18.1.jar:1.18.1]
        ... 34 more
   Caused by: java.lang.reflect.InvocationTargetException
        at 
jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 
~[?:?]
        at 
jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) 
~[?:?]
        at 
jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown 
Source) ~[?:?]
        at java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]
        at 
org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:73) 
~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
        at 
org.apache.hudi.client.transaction.lock.LockManager.getLockProvider(LockManager.java:111)
 
~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
        at 
org.apache.hudi.client.transaction.lock.LockManager.unlock(LockManager.java:98) 
~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
        at 
org.apache.hudi.client.HoodieFlinkTableServiceClient.initMetadataTable(HoodieFlinkTableServiceClient.java:211)
 
~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
        at 
org.apache.hudi.client.HoodieFlinkWriteClient.initMetadataTable(HoodieFlinkWriteClient.java:319)
 
~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
        at 
org.apache.hudi.sink.StreamWriteOperatorCoordinator.initMetadataTable(StreamWriteOperatorCoordinator.java:352)
 
~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
        at 
org.apache.hudi.sink.StreamWriteOperatorCoordinator.start(StreamWriteOperatorCoordinator.java:197)
 
~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
        at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:185)
 ~[flink-dist-1.18.1.jar:1.18.1]
        at 
org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:165)
 ~[flink-dist-1.18.1.jar:1.18.1]
        ... 34 more
   Caused by: org.apache.hudi.exception.HoodieLockException: Unsupported scheme 
:s3a, since this fs can not support atomic creation
        at 
org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider.<init>(FileSystemBasedLockProvider.java:89)
 
~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
        at 
jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 
~[?:?]
        at 
jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) 
~[?:?]
        at 
jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown 
Source) ~[?:?]
        at java.lang.reflect.Constructor.newInstance(Unknown Source) ~[?:?]
        at 
org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:73) 
~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
        at 
org.apache.hudi.client.transaction.lock.LockManager.getLockProvider(LockManager.java:111)
 
~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
        at 
org.apache.hudi.client.transaction.lock.LockManager.unlock(LockManager.java:98) 
~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
        at 
org.apache.hudi.client.HoodieFlinkTableServiceClient.initMetadataTable(HoodieFlinkTableServiceClient.java:211)
 
~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
        at 
org.apache.hudi.client.HoodieFlinkWriteClient.initMetadataTable(HoodieFlinkWriteClient.java:319)
 
~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
        at 
org.apache.hudi.sink.StreamWriteOperatorCoordinator.initMetadataTable(StreamWriteOperatorCoordinator.java:352)
 
~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
        at 
org.apache.hudi.sink.StreamWriteOperatorCoordinator.start(StreamWriteOperatorCoordinator.java:197)
 
~[blob_p-4954105f83fcf5d060f3341be827874d18f194b3-9c85dbe3de62902f41325b74625911ff:?]
        at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:185)
 ~[flink-dist-1.18.1.jar:1.18.1]
        at 
org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:165)
 ~[flink-dist-1.18.1.jar:1.18.1]
        ... 34 more
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to