[
https://issues.apache.org/jira/browse/HUDI-7763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Y Ethan Guo updated HUDI-7763:
------------------------------
Fix Version/s: 1.0.0
> Fix that jmx reporter cannot initialized if metadata enables
> ------------------------------------------------------------
>
> Key: HUDI-7763
> URL: https://issues.apache.org/jira/browse/HUDI-7763
> Project: Apache Hudi
> Issue Type: Bug
> Components: metrics
> Environment: hudi0.14.1, Spark3.2
> Reporter: Jihwan Lee
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.0.0
>
>
> If the jmx metric option is activated, port settings can be set to range.
>
> Because metadata is also written as hoodie table, requires multiple metric
> instances. (occurs below exception)
> JmxReporterServer can only use one port each.
> So, jmx server might be able to be initialized on multiple ports.
>
> error log:
> ( jmx reporter for metadata is initialized first, then reporter for data
> occurs exception )
> {code:java}
> 24/05/13 20:28:27 INFO table.HoodieTableMetaClient: Loading
> HoodieTableMetaClient from
> /data/feeder/affiliate/book/affiliate_feeder_book_svc
> 24/05/13 20:28:27 INFO table.HoodieTableConfig: Loading table properties from
> /data/feeder/affiliate/book/affiliate_feeder_book_svc/.hoodie/hoodie.properties
> 24/05/13 20:28:27 INFO table.HoodieTableMetaClient: Finished Loading Table of
> type MERGE_ON_READ(version=1, baseFileFormat=PARQUET) from
> /data/feeder/affiliate/book/affiliate_feeder_book_svc
> 24/05/13 20:28:27 INFO table.HoodieTableMetaClient: Loading
> HoodieTableMetaClient from
> /data/feeder/affiliate/book/affiliate_feeder_book_svc
> 24/05/13 20:28:27 INFO table.HoodieTableConfig: Loading table properties from
> /data/feeder/affiliate/book/affiliate_feeder_book_svc/.hoodie/hoodie.properties
> 24/05/13 20:28:27 INFO table.HoodieTableMetaClient: Finished Loading Table of
> type MERGE_ON_READ(version=1, baseFileFormat=PARQUET) from
> /data/feeder/affiliate/book/affiliate_feeder_book_svc
> 24/05/13 20:28:27 INFO timeline.HoodieActiveTimeline: Loaded instants upto :
> Option{val=[==>20240513195519782__deltacommit__REQUESTED__20240513195521160]}
> 24/05/13 20:28:28 INFO config.HoodieWriteConfig: Automatically set
> hoodie.cleaner.policy.failed.writes=LAZY since optimistic concurrency control
> is used
> 24/05/13 20:28:28 INFO metrics.JmxMetricsReporter: Started JMX server on port
> 9889.
> 24/05/13 20:28:28 INFO metrics.JmxMetricsReporter: Configured JMXReporter
> with {port:9889}
> 24/05/13 20:28:28 INFO embedded.EmbeddedTimelineService: Overriding hostIp to
> (feeder-affiliate-book-svc-sink-09c3c08f71b47a5d-driver-svc.csp.svc) found in
> spark-conf. It was null
> 24/05/13 20:28:28 INFO view.FileSystemViewManager: Creating View Manager with
> storage type :MEMORY
> 24/05/13 20:28:28 INFO view.FileSystemViewManager: Creating in-memory based
> Table View
> 24/05/13 20:28:28 INFO util.log: Logging initialized @53678ms to
> org.apache.hudi.org.apache.jetty.util.log.Slf4jLog
> 24/05/13 20:28:28 INFO javalin.Javalin:
> __ __ _ __ __
> / /____ _ _ __ ____ _ / /(_)____ / // /
> __ / // __ `/| | / // __ `// // // __ \ / // /_
> / /_/ // /_/ / | |/ // /_/ // // // / / / /__ __/
> \____/ \__,_/ |___/ \__,_//_//_//_/ /_/ /_/
> https://javalin.io/documentation
> 24/05/13 20:28:28 INFO javalin.Javalin: Starting Javalin ...
> 24/05/13 20:28:28 INFO javalin.Javalin: You are running Javalin 4.6.7
> (released October 24, 2022. Your Javalin version is 567 days old. Consider
> checking for a newer version.).
> 24/05/13 20:28:28 INFO server.Server: jetty-9.4.48.v20220622; built:
> 2022-06-21T20:42:25.880Z; git: 6b67c5719d1f4371b33655ff2d047d24e171e49a; jvm
> 11.0.20.1+1
> 24/05/13 20:28:28 INFO server.Server: Started @54065ms
> 24/05/13 20:28:28 INFO javalin.Javalin: Listening on http://localhost:35071/
> 24/05/13 20:28:28 INFO javalin.Javalin: Javalin started in 177ms \o/
> 24/05/13 20:28:28 INFO service.TimelineService: Starting Timeline server on
> port :35071
> 24/05/13 20:28:28 INFO embedded.EmbeddedTimelineService: Started embedded
> timeline server at
> feeder-affiliate-book-svc-sink-09c3c08f71b47a5d-driver-svc.csp.svc:35071
> 24/05/13 20:28:28 INFO client.BaseHoodieClient: Timeline Server already
> running. Not restarting the service
> 24/05/13 20:28:28 INFO hudi.HoodieSparkSqlWriterInternal:
> Config.inlineCompactionEnabled ? true
> 24/05/13 20:28:28 INFO hudi.HoodieSparkSqlWriterInternal:
> Config.asyncClusteringEnabled ? false
> 24/05/13 20:28:28 INFO codegen.CodeGenerator: Code generated in 77.42891 ms
> 24/05/13 20:28:28 INFO table.HoodieTableMetaClient: Loading
> HoodieTableMetaClient from
> /data/feeder/affiliate/book/affiliate_feeder_book_svc
> 24/05/13 20:28:28 INFO table.HoodieTableConfig: Loading table properties from
> /data/feeder/affiliate/book/affiliate_feeder_book_svc/.hoodie/hoodie.properties
> 24/05/13 20:28:28 INFO table.HoodieTableMetaClient: Finished Loading Table of
> type MERGE_ON_READ(version=1, baseFileFormat=PARQUET) from
> /data/feeder/affiliate/book/affiliate_feeder_book_svc
> 24/05/13 20:28:28 INFO table.HoodieTableMetaClient: Loading Active commit
> timeline for /data/feeder/affiliate/book/affiliate_feeder_book_svc
> 24/05/13 20:28:28 INFO timeline.HoodieActiveTimeline: Loaded instants upto :
> Option{val=[==>20240513195519782__deltacommit__REQUESTED__20240513195521160]}
> 24/05/13 20:28:28 INFO client.BaseHoodieWriteClient: Generate a new instant
> time: 20240513202827651 action: deltacommit
> 24/05/13 20:28:28 INFO heartbeat.HoodieHeartbeatClient: Received request to
> start heartbeat for instant time 20240513202827651
> 24/05/13 20:28:28 INFO timeline.HoodieActiveTimeline: Creating a new instant
> [==>20240513202827651__deltacommit__REQUESTED]
> 24/05/13 20:28:28 INFO table.HoodieTableMetaClient: Loading
> HoodieTableMetaClient from
> /data/feeder/affiliate/book/affiliate_feeder_book_svc
> 24/05/13 20:28:28 INFO table.HoodieTableConfig: Loading table properties from
> /data/feeder/affiliate/book/affiliate_feeder_book_svc/.hoodie/hoodie.properties
> 24/05/13 20:28:28 INFO table.HoodieTableMetaClient: Finished Loading Table of
> type MERGE_ON_READ(version=1, baseFileFormat=PARQUET) from
> /data/feeder/affiliate/book/affiliate_feeder_book_svc
> 24/05/13 20:28:28 INFO table.HoodieTableMetaClient: Loading Active commit
> timeline for /data/feeder/affiliate/book/affiliate_feeder_book_svc
> 24/05/13 20:28:28 INFO timeline.HoodieActiveTimeline: Loaded instants upto :
> Option{val=[==>20240513202827651__deltacommit__REQUESTED__20240513202828906]}
> 24/05/13 20:28:28 INFO transaction.TransactionManager: Transaction starting
> for Option{val=[==>20240513202827651__deltacommit__INFLIGHT]} with latest
> completed transaction instant Optional.empty
> 24/05/13 20:28:28 INFO lock.LockManager: LockProvider
> org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider
> 24/05/13 20:28:29 INFO transaction.TransactionManager: Transaction started
> for Option{val=[==>20240513202827651__deltacommit__INFLIGHT]} with latest
> completed transaction instant Optional.empty
> 24/05/13 20:28:29 INFO table.HoodieTableMetaClient: Loading
> HoodieTableMetaClient from
> /data/feeder/affiliate/book/affiliate_feeder_book_svc
> 24/05/13 20:28:29 INFO table.HoodieTableConfig: Loading table properties from
> /data/feeder/affiliate/book/affiliate_feeder_book_svc/.hoodie/hoodie.properties
> 24/05/13 20:28:29 INFO table.HoodieTableMetaClient: Finished Loading Table of
> type MERGE_ON_READ(version=1, baseFileFormat=PARQUET) from
> /data/feeder/affiliate/book/affiliate_feeder_book_svc
> 24/05/13 20:28:29 INFO table.HoodieTableMetaClient: Loading
> HoodieTableMetaClient from
> /data/feeder/affiliate/book/affiliate_feeder_book_svc/.hoodie/metadata
> 24/05/13 20:28:29 INFO table.HoodieTableConfig: Loading table properties from
> /data/feeder/affiliate/book/affiliate_feeder_book_svc/.hoodie/metadata/.hoodie/hoodie.properties
> 24/05/13 20:28:29 INFO table.HoodieTableMetaClient: Finished Loading Table of
> type MERGE_ON_READ(version=1, baseFileFormat=HFILE) from
> /data/feeder/affiliate/book/affiliate_feeder_book_svc/.hoodie/metadata
> 24/05/13 20:28:29 INFO timeline.HoodieActiveTimeline: Loaded instants upto :
> Option{val=[20240513182607590__deltacommit__COMPLETED__20240513182612582]}
> 24/05/13 20:28:29 INFO table.HoodieTableMetaClient: Loading
> HoodieTableMetaClient from
> /data/feeder/affiliate/book/affiliate_feeder_book_svc
> 24/05/13 20:28:29 INFO table.HoodieTableConfig: Loading table properties from
> /data/feeder/affiliate/book/affiliate_feeder_book_svc/.hoodie/hoodie.properties
> 24/05/13 20:28:29 INFO table.HoodieTableMetaClient: Finished Loading Table of
> type MERGE_ON_READ(version=1, baseFileFormat=PARQUET) from
> /data/feeder/affiliate/book/affiliate_feeder_book_svc
> 24/05/13 20:28:29 INFO table.HoodieTableMetaClient: Loading
> HoodieTableMetaClient from
> /data/feeder/affiliate/book/affiliate_feeder_book_svc/.hoodie/metadata
> 24/05/13 20:28:29 INFO table.HoodieTableConfig: Loading table properties from
> /data/feeder/affiliate/book/affiliate_feeder_book_svc/.hoodie/metadata/.hoodie/hoodie.properties
> 24/05/13 20:28:29 INFO table.HoodieTableMetaClient: Finished Loading Table of
> type MERGE_ON_READ(version=1, baseFileFormat=HFILE) from
> /data/feeder/affiliate/book/affiliate_feeder_book_svc/.hoodie/metadata
> 24/05/13 20:28:29 INFO timeline.HoodieActiveTimeline: Loaded instants upto :
> Option{val=[20240513182607590__deltacommit__COMPLETED__20240513182612582]}
> 24/05/13 20:28:29 INFO view.AbstractTableFileSystemView: Took 1 ms to read 0
> instants, 0 replaced file groups
> 24/05/13 20:28:29 INFO util.ClusteringUtils: Found 0 files in pending
> clustering operations
> 24/05/13 20:28:29 ERROR metrics.JmxMetricsReporter: Jmx initialize failed:
> org.apache.hudi.exception.HoodieException: Jmx service url created
> service:jmx:rmi://localhost:9889/jndi/rmi://localhost:9889/jmxrmi
> at
> org.apache.hudi.metrics.JmxReporterServer.<init>(JmxReporterServer.java:107)
> at
> org.apache.hudi.metrics.JmxReporterServer$Builder.build(JmxReporterServer.java:88)
> at
> org.apache.hudi.metrics.JmxMetricsReporter.createJmxReport(JmxMetricsReporter.java:104)
> at
> org.apache.hudi.metrics.JmxMetricsReporter.<init>(JmxMetricsReporter.java:58)
> at
> org.apache.hudi.metrics.MetricsReporterFactory.createReporter(MetricsReporterFactory.java:75)
> at org.apache.hudi.metrics.Metrics.<init>(Metrics.java:58)
> at org.apache.hudi.metrics.Metrics.getInstance(Metrics.java:82)
> at org.apache.hudi.metrics.HoodieMetrics.<init>(HoodieMetrics.java:95)
> at
> org.apache.hudi.client.BaseHoodieClient.<init>(BaseHoodieClient.java:96)
> at
> org.apache.hudi.client.BaseHoodieWriteClient.<init>(BaseHoodieWriteClient.java:163)
> at
> org.apache.hudi.client.SparkRDDWriteClient.<init>(SparkRDDWriteClient.java:86)
> at
> org.apache.hudi.client.SparkRDDWriteClient.<init>(SparkRDDWriteClient.java:75)
> at
> org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.initializeWriteClient(SparkHoodieBackedTableMetadataWriter.java:146)
> at
> org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.getWriteClient(HoodieBackedTableMetadataWriter.java:1474)
> at
> org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.performTableServices(HoodieBackedTableMetadataWriter.java:1186)
> at
> org.apache.hudi.client.SparkRDDWriteClient.initializeMetadataTable(SparkRDDWriteClient.java:290)
> at
> org.apache.hudi.client.SparkRDDWriteClient.initMetadataTable(SparkRDDWriteClient.java:273)
> at
> org.apache.hudi.client.BaseHoodieWriteClient.doInitTable(BaseHoodieWriteClient.java:1257)
> at
> org.apache.hudi.client.BaseHoodieWriteClient.initTable(BaseHoodieWriteClient.java:1297)
> at
> org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:139)
> at
> org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:224)
> at
> org.apache.hudi.HoodieSparkSqlWriterInternal.liftedTree1$1(HoodieSparkSqlWriter.scala:504)
> at
> org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:502)
> at
> org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:204)
> at
> org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:121)
> at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)
> at
> org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
> at
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:97)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
> at
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:97)
> at
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:93)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
> at
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
> at
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
> at
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
> at
> org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:93)
> at
> org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:80)
> at
> org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:78)
> at
> org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:115)
> at
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
> at
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
> at
> org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
> at
> com.naver.csp.data.cores.common.spark.datasets.hudi.HudiWriter$DataFrameWriterImplicits.hudi(HudiWriter.scala:35)
> at
> com.naver.csp.data.cores.common.spark.datasets.kafka.KafkaHudiSink.batchAsCdc(KafkaHudiSink.scala:160)
> at
> com.naver.csp.data.cores.common.spark.datasets.kafka.KafkaHudiSink.$anonfun$startCdcStreaming$1(KafkaHudiSink.scala:78)
> at
> com.naver.csp.data.cores.common.spark.datasets.kafka.KafkaHudiSink.$anonfun$startCdcStreaming$1$adapted(KafkaHudiSink.scala:78)
> at
> org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:600)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:598)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:598)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:228)
> at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193)
> at
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303)
> at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)
> Caused by: java.rmi.server.ExportException: internal error: ObjID already in
> use
> at
> java.rmi/sun.rmi.transport.ObjectTable.putTarget(ObjectTable.java:184)
> at java.rmi/sun.rmi.transport.Transport.exportObject(Transport.java:106)
> at
> java.rmi/sun.rmi.transport.tcp.TCPTransport.exportObject(TCPTransport.java:254)
> at
> java.rmi/sun.rmi.transport.tcp.TCPEndpoint.exportObject(TCPEndpoint.java:412)
> at java.rmi/sun.rmi.transport.LiveRef.exportObject(LiveRef.java:147)
> at
> java.rmi/sun.rmi.server.UnicastServerRef.exportObject(UnicastServerRef.java:234)
> at java.rmi/sun.rmi.registry.RegistryImpl.setup(RegistryImpl.java:220)
> at java.rmi/sun.rmi.registry.RegistryImpl.<init>(RegistryImpl.java:205)
> at
> java.rmi/java.rmi.registry.LocateRegistry.createRegistry(LocateRegistry.java:203)
> at
> org.apache.hudi.metrics.JmxReporterServer.<init>(JmxReporterServer.java:104)
> ... 83 more
> 24/05/13 20:28:29 INFO transaction.TransactionManager: Transaction ending
> with transaction owner
> Option{val=[==>20240513202827651__deltacommit__INFLIGHT]}
> 24/05/13 20:28:29 INFO lock.LockManager: Released connection created for
> acquiring lock
> 24/05/13 20:28:29 INFO transaction.TransactionManager: Transaction ended with
> transaction owner Option{val=[==>20240513202827651__deltacommit__INFLIGHT]}
> 24/05/13 20:28:29 INFO hudi.HoodieSparkSqlWriterInternal:
> Config.inlineCompactionEnabled ? true
> 24/05/13 20:28:29 INFO hudi.HoodieSparkSqlWriterInternal:
> Config.asyncClusteringEnabled ? false
> 24/05/13 20:28:29 WARN hudi.HoodieSparkSqlWriterInternal: Closing write client
> 24/05/13 20:28:29 INFO client.BaseHoodieClient: Stopping Timeline service !!
> 24/05/13 20:28:29 INFO embedded.EmbeddedTimelineService: Closing Timeline
> server
> 24/05/13 20:28:29 INFO service.TimelineService: Closing Timeline Service
> 24/05/13 20:28:29 INFO javalin.Javalin: Stopping Javalin ...
> 24/05/13 20:28:29 INFO javalin.Javalin: Javalin has stopped
> 24/05/13 20:28:29 INFO service.TimelineService: Closed Timeline Service
> 24/05/13 20:28:29 INFO embedded.EmbeddedTimelineService: Closed Timeline
> server
> 24/05/13 20:28:29 INFO heartbeat.HoodieHeartbeatClient: Stopping heartbeat
> for instant 20240513202827651
> 24/05/13 20:28:29 INFO heartbeat.HoodieHeartbeatClient: Stopped heartbeat for
> instant 20240513202827651
> 24/05/13 20:28:29 INFO transaction.TransactionManager: Transaction manager
> closed
> 24/05/13 20:28:29 INFO transaction.TransactionManager: Transaction manager
> closed
> 24/05/13 20:28:29 ERROR streaming.MicroBatchExecution: Query [id =
> 7b545afa-8957-4e93-8c53-438e7996b501, runId =
> 98e99ae3-61b3-4e76-b80b-bbaff8af971e] terminated with error
> org.apache.hudi.exception.HoodieException: Failed to instantiate Metadata
> table
> at
> org.apache.hudi.client.SparkRDDWriteClient.initializeMetadataTable(SparkRDDWriteClient.java:293)
> at
> org.apache.hudi.client.SparkRDDWriteClient.initMetadataTable(SparkRDDWriteClient.java:273)
> at
> org.apache.hudi.client.BaseHoodieWriteClient.doInitTable(BaseHoodieWriteClient.java:1257)
> at
> org.apache.hudi.client.BaseHoodieWriteClient.initTable(BaseHoodieWriteClient.java:1297)
> at
> org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:139)
> at
> org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:224)
> at
> org.apache.hudi.HoodieSparkSqlWriterInternal.liftedTree1$1(HoodieSparkSqlWriter.scala:504)
> at
> org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:502)
> at
> org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:204)
> at
> org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:121)
> at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)
> at
> org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
> at
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:97)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
> at
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:97)
> at
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:93)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
> at
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
> at
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
> at
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
> at
> org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:93)
> at
> org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:80)
> at
> org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:78)
> at
> org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:115)
> at
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
> at
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
> at
> org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
> at
> com.naver.csp.data.cores.common.spark.datasets.hudi.HudiWriter$DataFrameWriterImplicits.hudi(HudiWriter.scala:35)
> at
> com.naver.csp.data.cores.common.spark.datasets.kafka.KafkaHudiSink.batchAsCdc(KafkaHudiSink.scala:160)
> at
> com.naver.csp.data.cores.common.spark.datasets.kafka.KafkaHudiSink.$anonfun$startCdcStreaming$1(KafkaHudiSink.scala:78)
> at
> com.naver.csp.data.cores.common.spark.datasets.kafka.KafkaHudiSink.$anonfun$startCdcStreaming$1$adapted(KafkaHudiSink.scala:78)
> at
> org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:600)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:598)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:598)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:228)
> at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193)
> at
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303)
> at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)
> Caused by: org.apache.hudi.exception.HoodieException: Jmx initialize failed:
> at
> org.apache.hudi.metrics.JmxMetricsReporter.<init>(JmxMetricsReporter.java:71)
> at
> org.apache.hudi.metrics.MetricsReporterFactory.createReporter(MetricsReporterFactory.java:75)
> at org.apache.hudi.metrics.Metrics.<init>(Metrics.java:58)
> at org.apache.hudi.metrics.Metrics.getInstance(Metrics.java:82)
> at org.apache.hudi.metrics.HoodieMetrics.<init>(HoodieMetrics.java:95)
> at
> org.apache.hudi.client.BaseHoodieClient.<init>(BaseHoodieClient.java:96)
> at
> org.apache.hudi.client.BaseHoodieWriteClient.<init>(BaseHoodieWriteClient.java:163)
> at
> org.apache.hudi.client.SparkRDDWriteClient.<init>(SparkRDDWriteClient.java:86)
> at
> org.apache.hudi.client.SparkRDDWriteClient.<init>(SparkRDDWriteClient.java:75)
> at
> org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.initializeWriteClient(SparkHoodieBackedTableMetadataWriter.java:146)
> at
> org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.getWriteClient(HoodieBackedTableMetadataWriter.java:1474)
> at
> org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.performTableServices(HoodieBackedTableMetadataWriter.java:1186)
> at
> org.apache.hudi.client.SparkRDDWriteClient.initializeMetadataTable(SparkRDDWriteClient.java:290)
> ... 68 more
> Caused by: org.apache.hudi.exception.HoodieException: Jmx service url created
> service:jmx:rmi://localhost:9889/jndi/rmi://localhost:9889/jmxrmi
> at
> org.apache.hudi.metrics.JmxReporterServer.<init>(JmxReporterServer.java:107)
> at
> org.apache.hudi.metrics.JmxReporterServer$Builder.build(JmxReporterServer.java:88)
> at
> org.apache.hudi.metrics.JmxMetricsReporter.createJmxReport(JmxMetricsReporter.java:104)
> at
> org.apache.hudi.metrics.JmxMetricsReporter.<init>(JmxMetricsReporter.java:58)
> ... 80 more
> Caused by: java.rmi.server.ExportException: internal error: ObjID already in
> use
> at
> java.rmi/sun.rmi.transport.ObjectTable.putTarget(ObjectTable.java:184)
> at java.rmi/sun.rmi.transport.Transport.exportObject(Transport.java:106)
> at
> java.rmi/sun.rmi.transport.tcp.TCPTransport.exportObject(TCPTransport.java:254)
> at
> java.rmi/sun.rmi.transport.tcp.TCPEndpoint.exportObject(TCPEndpoint.java:412)
> at java.rmi/sun.rmi.transport.LiveRef.exportObject(LiveRef.java:147)
> at
> java.rmi/sun.rmi.server.UnicastServerRef.exportObject(UnicastServerRef.java:234)
> at java.rmi/sun.rmi.registry.RegistryImpl.setup(RegistryImpl.java:220)
> at java.rmi/sun.rmi.registry.RegistryImpl.<init>(RegistryImpl.java:205)
> at
> java.rmi/java.rmi.registry.LocateRegistry.createRegistry(LocateRegistry.java:203)
> at
> org.apache.hudi.metrics.JmxReporterServer.<init>(JmxReporterServer.java:104)
> ... 83 more
> 24/05/13 20:28:29 INFO server.AbstractConnector: Stopped
> Spark@376498da{HTTP/1.1, (http/1.1)}{0.0.0.0:4040}
> 24/05/13 20:28:29 INFO ui.SparkUI: Stopped Spark web UI at
> http://feeder-affiliate-book-svc-sink-09c3c08f71b47a5d-driver-svc.csp.svc:4040
> 24/05/13 20:28:29 INFO k8s.KubernetesClusterSchedulerBackend: Shutting down
> all executors
> 24/05/13 20:28:29 INFO
> k8s.KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each
> executor to shut down
> 24/05/13 20:28:29 WARN k8s.ExecutorPodsWatchSnapshotSource: Kubernetes client
> has been closed.
> 24/05/13 20:28:29 ERROR util.Utils: Uncaught exception in thread main
> io.fabric8.kubernetes.client.KubernetesClientException: Failure executing:
> GET at:
> https://kubernetes.default.svc/api/v1/namespaces/csp/persistentvolumeclaims?labelSelector=spark-app-selector%3Dspark-52b31b8f89564fee94b07a763597d89f.
> Message: Forbidden!Configured service account doesn't have access. Service
> account may have been revoked. persistentvolumeclaims is forbidden: User
> "system:serviceaccount:csp:spark" cannot list resource
> "persistentvolumeclaims" in API group "" in the namespace "csp".
> at
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:639)
> at
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.assertResponseCode(OperationSupport.java:576)
> at
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:543)
> at
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:504)
> at
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:487)
> at
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.listRequestHelper(BaseOperation.java:163)
> at
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.list(BaseOperation.java:672)
> at
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.deleteList(BaseOperation.java:786)
> at
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.delete(BaseOperation.java:704)
> at
> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.$anonfun$stop$6(KubernetesClusterSchedulerBackend.scala:138)
> at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1471)
> at
> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.stop(KubernetesClusterSchedulerBackend.scala:139)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl.stop(TaskSchedulerImpl.scala:927)
> at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2567)
> at
> org.apache.spark.SparkContext.$anonfun$stop$12(SparkContext.scala:2086)
> at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1471)
> at org.apache.spark.SparkContext.stop(SparkContext.scala:2086)
> at
> org.apache.spark.deploy.SparkSubmit.$anonfun$runMain$13(SparkSubmit.scala:963)
> at
> org.apache.spark.deploy.SparkSubmit.$anonfun$runMain$13$adapted(SparkSubmit.scala:963)
> at scala.Option.foreach(Option.scala:407)
> at
> org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:963)
> at
> org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
> at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
> at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
> at
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> 24/05/13 20:28:29 INFO spark.MapOutputTrackerMasterEndpoint:
> MapOutputTrackerMasterEndpoint stopped!
> 24/05/13 20:28:29 INFO memory.MemoryStore: MemoryStore cleared
> 24/05/13 20:28:29 INFO storage.BlockManager: BlockManager stopped
> 24/05/13 20:28:29 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
> 24/05/13 20:28:29 INFO
> scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:
> OutputCommitCoordinator stopped!
> 24/05/13 20:28:29 INFO spark.SparkContext: Successfully stopped SparkContext
> Exception in thread "main"
> org.apache.spark.sql.streaming.StreamingQueryException: Failed to instantiate
> Metadata table
> === Streaming Query ===
> Identifier: [id = 7b545afa-8957-4e93-8c53-438e7996b501, runId =
> 98e99ae3-61b3-4e76-b80b-bbaff8af971e]
> Current Committed Offsets: {KafkaV2[Subscribe[CDC_CDC_GSHOPAPPO_BOOK]]:
> {"CDC_CDC_GSHOPAPPO_BOOK":{"23":4040666,"8":4020522,"17":4016708,"11":4011011,"2":3988721,"20":4040743,"5":3984751,"14":4013617,"4":3989875,"13":4014484,"22":4049439,"7":4009753,"16":4044345,"10":4015180,"1":4014228,"19":4081496,"9":3999227,"18":4046117,"12":3990938,"3":4016564,"21":4039580,"15":4071459,"6":3986614,"24":4051505,"0":3996595}}}
> Current Available Offsets: {KafkaV2[Subscribe[CDC_CDC_GSHOPAPPO_BOOK]]:
> {"CDC_CDC_GSHOPAPPO_BOOK":{"23":4040667,"8":4020522,"17":4016708,"11":4011011,"2":3988721,"20":4040743,"5":3984751,"14":4013617,"4":3989875,"13":4014484,"22":4049439,"7":4009753,"16":4044345,"10":4015180,"1":4014228,"19":4081496,"9":3999227,"18":4046117,"12":3990938,"3":4016564,"21":4039580,"15":4071459,"6":3986614,"24":4051505,"0":3996595}}}
> Current State: ACTIVE
> Thread State: RUNNABLE
> Logical Plan:
> StreamingDataSourceV2Relation [key#7, value#8, topic#9, partition#10,
> offset#11L, timestamp#12, timestampType#13],
> org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@325fe569,
> KafkaV2[Subscribe[CDC_CDC_GSHOPAPPO_BOOK]]
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:325)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)
> Caused by: org.apache.hudi.exception.HoodieException: Failed to instantiate
> Metadata table
> at
> org.apache.hudi.client.SparkRDDWriteClient.initializeMetadataTable(SparkRDDWriteClient.java:293)
> at
> org.apache.hudi.client.SparkRDDWriteClient.initMetadataTable(SparkRDDWriteClient.java:273)
> at
> org.apache.hudi.client.BaseHoodieWriteClient.doInitTable(BaseHoodieWriteClient.java:1257)
> at
> org.apache.hudi.client.BaseHoodieWriteClient.initTable(BaseHoodieWriteClient.java:1297)
> at
> org.apache.hudi.client.SparkRDDWriteClient.upsert(SparkRDDWriteClient.java:139)
> at
> org.apache.hudi.DataSourceUtils.doWriteOperation(DataSourceUtils.java:224)
> at
> org.apache.hudi.HoodieSparkSqlWriterInternal.liftedTree1$1(HoodieSparkSqlWriter.scala:504)
> at
> org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:502)
> at
> org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:204)
> at
> org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:121)
> at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)
> at
> org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
> at
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:97)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
> at
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:97)
> at
> org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:93)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
> at
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
> at
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
> at
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
> at
> org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:93)
> at
> org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:80)
> at
> org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:78)
> at
> org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:115)
> at
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
> at
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
> at
> org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
> at
> com.naver.csp.data.cores.common.spark.datasets.hudi.HudiWriter$DataFrameWriterImplicits.hudi(HudiWriter.scala:35)
> at
> com.naver.csp.data.cores.common.spark.datasets.kafka.KafkaHudiSink.batchAsCdc(KafkaHudiSink.scala:160)
> at
> com.naver.csp.data.cores.common.spark.datasets.kafka.KafkaHudiSink.$anonfun$startCdcStreaming$1(KafkaHudiSink.scala:78)
> at
> com.naver.csp.data.cores.common.spark.datasets.kafka.KafkaHudiSink.$anonfun$startCdcStreaming$1$adapted(KafkaHudiSink.scala:78)
> at
> org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:600)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
> at
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:598)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:598)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:228)
> at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193)
> at
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303)
> at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)
> ... 1 more
> Caused by: org.apache.hudi.exception.HoodieException: Jmx initialize failed:
> at
> org.apache.hudi.metrics.JmxMetricsReporter.<init>(JmxMetricsReporter.java:71)
> at
> org.apache.hudi.metrics.MetricsReporterFactory.createReporter(MetricsReporterFactory.java:75)
> at org.apache.hudi.metrics.Metrics.<init>(Metrics.java:58)
> at org.apache.hudi.metrics.Metrics.getInstance(Metrics.java:82)
> at org.apache.hudi.metrics.HoodieMetrics.<init>(HoodieMetrics.java:95)
> at
> org.apache.hudi.client.BaseHoodieClient.<init>(BaseHoodieClient.java:96)
> at
> org.apache.hudi.client.BaseHoodieWriteClient.<init>(BaseHoodieWriteClient.java:163)
> at
> org.apache.hudi.client.SparkRDDWriteClient.<init>(SparkRDDWriteClient.java:86)
> at
> org.apache.hudi.client.SparkRDDWriteClient.<init>(SparkRDDWriteClient.java:75)
> at
> org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter.initializeWriteClient(SparkHoodieBackedTableMetadataWriter.java:146)
> at
> org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.getWriteClient(HoodieBackedTableMetadataWriter.java:1474)
> at
> org.apache.hudi.metadata.HoodieBackedTableMetadataWriter.performTableServices(HoodieBackedTableMetadataWriter.java:1186)
> at
> org.apache.hudi.client.SparkRDDWriteClient.initializeMetadataTable(SparkRDDWriteClient.java:290)
> ... 68 more
> Caused by: org.apache.hudi.exception.HoodieException: Jmx service url created
> service:jmx:rmi://localhost:9889/jndi/rmi://localhost:9889/jmxrmi
> at
> org.apache.hudi.metrics.JmxReporterServer.<init>(JmxReporterServer.java:107)
> at
> org.apache.hudi.metrics.JmxReporterServer$Builder.build(JmxReporterServer.java:88)
> at
> org.apache.hudi.metrics.JmxMetricsReporter.createJmxReport(JmxMetricsReporter.java:104)
> at
> org.apache.hudi.metrics.JmxMetricsReporter.<init>(JmxMetricsReporter.java:58)
> ... 80 more
> Caused by: java.rmi.server.ExportException: internal error: ObjID already in
> use
> at
> java.rmi/sun.rmi.transport.ObjectTable.putTarget(ObjectTable.java:184)
> at java.rmi/sun.rmi.transport.Transport.exportObject(Transport.java:106)
> at
> java.rmi/sun.rmi.transport.tcp.TCPTransport.exportObject(TCPTransport.java:254)
> at
> java.rmi/sun.rmi.transport.tcp.TCPEndpoint.exportObject(TCPEndpoint.java:412)
> at java.rmi/sun.rmi.transport.LiveRef.exportObject(LiveRef.java:147)
> at
> java.rmi/sun.rmi.server.UnicastServerRef.exportObject(UnicastServerRef.java:234)
> at java.rmi/sun.rmi.registry.RegistryImpl.setup(RegistryImpl.java:220)
> at java.rmi/sun.rmi.registry.RegistryImpl.<init>(RegistryImpl.java:205)
> at
> java.rmi/java.rmi.registry.LocateRegistry.createRegistry(LocateRegistry.java:203)
> at
> org.apache.hudi.metrics.JmxReporterServer.<init>(JmxReporterServer.java:104)
> ... 83 more {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)