idrismike opened a new issue, #9307:
URL: https://github.com/apache/hudi/issues/9307
**Describe the problem you faced**
I am using Hudi to write to a partitioned table that has a complex key
(String, Int, String) and is partitioned by one of the columns (also part of
the key, i.e. the String column) and the partion type is `SIMPLE`. When I use
the method `InsertOverWrite`, the Hudi writer skips the method
`validateTableConfig` in `HoodieWriterUtils.scala` class since it's
`overwrite` and hence no issue. However, when I use either the `Upsert` or
`insertAppend`, Hudi validates `tableConfig` already existing with the incoming
`data source config` from the writer. There, it throws the exception (See
below). Presumably, this is because, the `data source writer` partition column
contains `SIMPLE` whereas the partition column from the `table Config` only has
the column name. And, if we look at the `validateTableConfig` method, the
comparison directly compares the two, and there is not processing done on
either of them.
`org.apache.hudi.exception.HoodieException: Config conflict(key current
value existing value):
PartitionPath:<column_name>:SIMPLE <colum_name> testOnly 33s
at
org.apache.hudi.HoodieWriterUtils$.validateTableConfig(HoodieWriterUtils.scala:171)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:131)`
**To Reproduce**
Use Hudi version 0.13.0 or 0.13.1 with Spark 3.1.2
Steps to reproduce the behavior:
1. Create a table with row key being a composite key of columns of type any
e.g. (Int, String, Int)
2. The table should be partitioned by the String column (or any other
column) of partitioning type `SIMPLE`
3. Write a dataframe using Hudi's `InsertAppend` or `Upsert` method to write
multiple records in separate i.e. First one dataframe followed by another write.
4. You should hit the exception at the second write since at the first write
you have nothing in the table.
**Expected behavior**
A clear and concise description of what you expected to happen.
Expected behaviour is that since the original table and datasource writer
use the same configuration and same version of hudi, the exeption should not be
thrown like in Hudi version 0.12.0.
**Environment Description**
* Hudi version : 0.13.1 and 0.13.0 (at least tested with these two)
* Spark version : 3.1.2
* Hive version :
* Hadoop version : 3.2.1
* Storage (HDFS/S3/GCS..) : Local and Azure Abfss (does not really matter)
* Running on Docker? (yes/no) : No, but does not matter
**Additional context**
Add any other context about the problem here.
**Stacktrace**
``org.apache.hudi.exception.HoodieException: Config conflict(key current
value existing value):
PartitionPath: <column_name>:SIMPLE <column_name> testOnly 33s
at
org.apache.hudi.HoodieWriterUtils$.validateTableConfig(HoodieWriterUtils.scala:171)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:131)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
at
org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
at
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
at local.package.core.DatasetDef.writeInsertAppend(DatasetDef.scala:155)
at
local.package.utils.DataCollectionContext.$anonfun$readRawAndWrite$2(DataCollectionContext.scala:78)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at
local.package.utils.DataCollectionContext.$anonfun$readRawAndWrite$1(DataCollectionContext.scala:68)
at scala.collection.immutable.List.map(List.scala:290)
at
local.package.utils.DataCollectionContext.readRawAndWrite(DataCollectionContext.scala:65)
at local.package.test.LocalClass.$anonfun$new$3(LocalClass.scala:26)
at local.package.utils.MetricsUtils.time(MetricsUtils.scala:39)
at local.package.utils.MetricUtils$.time(MetricsUtils.scala:53)
at local.package.test.LocalClass.$anonfun$new$2(LocalClass.scala:22)
at
org.scalatest.wordspec.AsyncWordSpecLike.transformToOutcomeParam$1(AsyncWordSpecLike.scala:153)
at
org.scalatest.wordspec.AsyncWordSpecLike.$anonfun$registerTestToRun$1(AsyncWordSpecLike.scala:154)
at
org.scalatest.AsyncTestSuite.$anonfun$transformToOutcome$1(AsyncTestSuite.scala:240)
at
org.scalatest.wordspec.AsyncWordSpecLike$$anon$3.apply(AsyncWordSpecLike.scala:1212)
at org.scalatest.AsyncTestSuite.withFixture(AsyncTestSuite.scala:316)
at org.scalatest.AsyncTestSuite.withFixture$(AsyncTestSuite.scala:315)
at
org.scalatest.wordspec.AsyncWordSpec.withFixture(AsyncWordSpec.scala:2296)
at
org.scalatest.wordspec.AsyncWordSpecLike.invokeWithAsyncFixture$1(AsyncWordSpecLike.scala:1210)
at
org.scalatest.wordspec.AsyncWordSpecLike.$anonfun$runTest$1(AsyncWordSpecLike.scala:1224)
at org.scalatest.AsyncSuperEngine.runTestImpl(AsyncEngine.scala:374)
at
org.scalatest.wordspec.AsyncWordSpecLike.runTest(AsyncWordSpecLike.scala:1224)
at
org.scalatest.wordspec.AsyncWordSpecLike.runTest$(AsyncWordSpecLike.scala:1204)
at
org.scalatest.wordspec.AsyncWordSpec.runTest(AsyncWordSpec.scala:2296)
at
org.scalatest.wordspec.AsyncWordSpecLike.$anonfun$runTests$1(AsyncWordSpecLike.scala:1283)
at
org.scalatest.AsyncSuperEngine.$anonfun$runTestsInBranch$1(AsyncEngine.scala:432)
at
scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at
scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:89)
at
org.scalatest.AsyncSuperEngine.traverseSubNodes$1(AsyncEngine.scala:406)
at
org.scalatest.AsyncSuperEngine.runTestsInBranch(AsyncEngine.scala:479)
at
org.scalatest.AsyncSuperEngine.$anonfun$runTestsInBranch$1(AsyncEngine.scala:460)
at
scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at
scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:89)
at
org.scalatest.AsyncSuperEngine.traverseSubNodes$1(AsyncEngine.scala:406)
at
org.scalatest.AsyncSuperEngine.runTestsInBranch(AsyncEngine.scala:487)
at org.scalatest.AsyncSuperEngine.runTestsImpl(AsyncEngine.scala:555)
at
org.scalatest.wordspec.AsyncWordSpecLike.runTests(AsyncWordSpecLike.scala:1283)
at
org.scalatest.wordspec.AsyncWordSpecLike.runTests$(AsyncWordSpecLike.scala:1282)
at
org.scalatest.wordspec.AsyncWordSpec.runTests(AsyncWordSpec.scala:2296)
at org.scalatest.Suite.run(Suite.scala:1114)
at org.scalatest.Suite.run$(Suite.scala:1096)``
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]