AlejandroIzuel opened a new issue, #12902:
URL: https://github.com/apache/hudi/issues/12902
**Describe the problem you faced**
When I try to run a UnitTest on my Scala project using HUDI and the
CloudWatch metrics option to "true" I get the following exception,
`org.apache.hudi.exception.HoodieException: Unable to instantiate class
org.apache.hudi.metrics.cloudwatch.CloudWatchMetricsReporter`
**To Reproduce**
Steps to reproduce the behavior:
1. Use this "dummy" UnitTest
```
package xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.junit.runner.RunWith
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite
import org.scalatestplus.junit.JUnitRunner
@RunWith(classOf[JUnitRunner])
class HudiWriteReadTest extends AnyFunSuite with BeforeAndAfterAll {
// Initialize Spark session
val spark: SparkSession = SparkSession
.builder()
.master("local[1]")
.appName("xxxxxxxxxxxxxxxxx.HudiWriteReadTest")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.hudi.catalog.HoodieCatalog")
.config("spark.sql.extensions",
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
.getOrCreate()
import spark.implicits._
// Define schema
val schema = StructType(
Seq(
StructField("id", IntegerType, false),
StructField("name", StringType, false),
StructField("timestamp", LongType, false)
)
)
def areDataFramesEqual(df1: DataFrame, df2: DataFrame): Boolean = {
// Define the columns to compare (excluding metadata columns)
val columnsToCompare = Seq("id", "name", "timestamp")
// Select only the columns to compare
val df1Selected = df1.select(columnsToCompare.head,
columnsToCompare.tail: _*)
val df2Selected = df2.select(columnsToCompare.head,
columnsToCompare.tail: _*)
// Check if schemas are identical
if (!df1Selected.schema.equals(df2Selected.schema)) {
println("Schemas do not match:")
println("DF1 Schema: " + df1Selected.schema.treeString)
println("DF2 Schema: " + df2Selected.schema.treeString)
return false
}
// Collect rows from both DataFrames
val rows1 = df1Selected.collect()
val rows2 = df2Selected.collect()
// Check if the number of rows is the same
if (rows1.length != rows2.length) {
println(s"Row counts differ: DF1 has ${rows1.length} rows, DF2 has
${rows2.length} rows.")
return false
}
// Sort rows by a unique identifier (e.g., "id") to ensure consistent
ordering
val sortedRows1 = rows1.sortBy(_.getAs[Int]("id"))
val sortedRows2 = rows2.sortBy(_.getAs[Int]("id"))
// Compare sorted rows
sortedRows1.zip(sortedRows2).forall {
case (row1, row2) => row1 == row2
}
}
// Function to write DataFrame to Hudi
def writeToHudi(df: DataFrame, tablePath: String): Unit = {
df.write
.format("hudi")
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, "insert")
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "id")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "")
.option(HoodieWriteConfig.TABLE_NAME, "hudi_test_table")
.option("hoodie.metrics.on", "true")
.option("hoodie.metrics.reporter.type", "CLOUDWATCH")
.mode("overwrite")
.save(tablePath)
}
// Function to read DataFrame from Hudi
def readFromHudi(tablePath: String): DataFrame = {
spark.read
.format("hudi")
.load(tablePath)
}
// Unit test
test("Hudi write and read operations") {
val data = Seq(
(1, "Alice", 1000L),
(2, "Bob", 2000L)
)
val df = data.toDF("id", "name", "timestamp")
val tablePath = "file:///tmp/hudi_test_table"
// Write data to Hudi
writeToHudi(df, tablePath)
// Read data from Hudi
val readDf = readFromHudi(tablePath)
// Validate the data
assert(areDataFramesEqual(readDf, df))
}
// Stop Spark session after tests
override def afterAll(): Unit = {
spark.stop()
}
}
```
2. Run the ./gradlew build
3. Get the Exception
`org.apache.hudi.exception.HoodieException: Unable to instantiate class
org.apache.hudi.metrics.cloudwatch.CloudWatchMetricsReporter
`
**Expected behavior**
The UnitTest should run fine. If I set the CloudWatch to "false", it works.
**Environment Description**
* Hudi version : 0.15.0
* Spark version : 3.5.2
* Hive version : spark-hive_2.12:3.5.2
* Hadoop version : 3.4.0
* Storage (HDFS/S3/GCS..) : Local on PC
* Running on Docker? (yes/no) : no
**Additional context**
This is the build.gradle I'm using
dependencies {
// Implementation dependencies
implementation "org.apache.hadoop:hadoop-aws:3.4.0"
implementation "org.apache.hadoop:hadoop-common:3.4.0"
implementation "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2"
implementation "org.apache.spark:spark-avro_2.12:3.5.2"
implementation "org.apache.hudi:hudi-spark3.5-bundle_2.12:0.15.0"
implementation 'org.apache.hudi:hudi-aws-bundle:0.15.0'
implementation "com.amazonaws:aws-java-sdk-s3:1.12.772"
implementation "com.amazonaws:aws-java-sdk-secretsmanager:1.12.772"
implementation 'io.logz.sawmill:sawmill-core:2.0.20'
implementation "org.apache.spark:spark-core_2.12:3.5.2"
implementation "com.amazonaws:aws-java-sdk-cloudwatch:1.12.772"
// Compile-only dependencies
compileOnly "org.scala-lang:scala-library:2.12.8"
compileOnly "com.amazonaws:aws-java-sdk-secretsmanager:1.12.335"
compileOnly "org.apache.spark:spark-hive_2.12:3.5.2"
// Test dependencies
testImplementation "org.scalatest:scalatest_2.12:3.2.19"
testImplementation "junit:junit:4.13.2"
testImplementation "org.scalatestplus:junit-4-13_2.12:3.2.15.0"
}
**Stacktrace**
```
org.apache.hudi.exception.HoodieException: Unable to instantiate class
org.apache.hudi.metrics.cloudwatch.CloudWatchMetricsReporter
at
org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:75)
at
org.apache.hudi.metrics.MetricsReporterFactory.createReporter(MetricsReporterFactory.java:89)
at org.apache.hudi.metrics.Metrics.<init>(Metrics.java:61)
at org.apache.hudi.metrics.Metrics.getInstance(Metrics.java:87)
at org.apache.hudi.metrics.HoodieMetrics.<init>(HoodieMetrics.java:101)
at
org.apache.hudi.client.BaseHoodieClient.<init>(BaseHoodieClient.java:101)
at
org.apache.hudi.client.BaseHoodieWriteClient.<init>(BaseHoodieWriteClient.java:164)
at
org.apache.hudi.client.SparkRDDWriteClient.<init>(SparkRDDWriteClient.java:90)
at
org.apache.hudi.client.SparkRDDWriteClient.<init>(SparkRDDWriteClient.java:74)
at
org.apache.hudi.DataSourceUtils.createHoodieClient(DataSourceUtils.java:208)
at
org.apache.hudi.HoodieSparkSqlWriterInternal.$anonfun$writeInternal$27(HoodieSparkSqlWriter.scala:451)
at scala.Option.getOrElse(Option.scala:189)
at
org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:448)
at
org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:187)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:125)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:168)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
at
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
at
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
at
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
at
com.anonymized.cde.smmdevicetype.HudiWriteReadTest.writeToHudi(HudiWriteReadTest.scala:87)
at
com.anonymized.cde.smmdevicetype.HudiWriteReadTest.$anonfun$new$1(HudiWriteReadTest.scala:107)
at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at
org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
at
org.scalatest.funsuite.AnyFunSuite.withFixture(AnyFunSuite.scala:1564)
at
org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
at
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
at
org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
at
org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
at org.scalatest.funsuite.AnyFunSuite.runTest(AnyFunSuite.scala:1564)
at org.scalatest.Suite.run(Suite.scala:1114)
at org.scalatest.Suite.run$(Suite.scala:1096)
at
org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
at
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
at
org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
at
com.anonymized.cde.smmdevicetype.HudiWriteReadTest.org$scalatest$BeforeAndAfterAll$$super$run(HudiWriteReadTest.scala:15)
at
org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
at
com.anonymized.cde.smmdevicetype.HudiWriteReadTest.run(HudiWriteReadTest.scala:15)
at org.scalatestplus.junit.JUnitRunner.run(JUnitRunner.scala:99)
at
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:112)
at
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40)
at
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60)
at
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
at
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
at
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
at
org.gradle.api.internal.tasks.testing.worker.TestWorker$2.run(TestWorker.java:176)
at
org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
at
org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
at
org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
at
org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
at
org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:113)
at
org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:65)
at
worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
at
worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
Caused by: java.lang.NoSuchMethodException:
org.apache.hudi.metrics.cloudwatch.CloudWatchMetricsReporter.<init>(org.apache.hudi.config.metrics.HoodieMetricsConfig,
com.codahale.metrics.MetricRegistry)
at java.lang.Class.getConstructor0(Class.java:3110)
at java.lang.Class.getConstructor(Class.java:1853)
at
org.apache.hudi.common.util.ReflectionUtils.loadClass(ReflectionUtils.java:73)
... 106 more
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]