willforevercn opened a new issue, #8617:
URL: https://github.com/apache/hudi/issues/8617
**Describe the problem you faced**
I would like to know if HUDI tables supports MapType or not.
**To Reproduce**
Steps to reproduce the behavior:
1. launch spark in local machine
2. create a dataframe with MapType
3. try save it as hudi table in local disk
**Expected behavior**
HUDI table created with column as map
**Environment Description**
* Hudi version :
0.13.0
* Spark version :
spark3.3
* Hive version :
N/A
* Hadoop version :
N/A
* Storage (HDFS/S3/GCS..) :
local
* Running on Docker? (yes/no) :
no
**Additional context**
sample code:
```
import random
from pyspark.sql.functions import rand, udf, lit, col, when,
current_timestamp
from pyspark.sql.types import IntegerType, ArrayType, StringType, MapType,
StructType, StructField, LongType, FloatType
tag_ids_all = list(range(10000000, 10000999))
def get_tags():
count = random.randint(0, 5)
tags = {}
for i in range(count):
key = tag_ids_all[random.randint(0, len(tag_ids_all) - 1)]
value = 1.0
tags[key] = value
return tags
tags_udf = udf(get_tags, MapType(IntegerType(), FloatType()))
df = spark.range(100) \
.withColumn("id", (rand(seed=67) * 1000000).cast("integer")) \
.withColumn("tags", tags_udf()) \
.withColumn("updated_dt", current_timestamp())
df.show(truncate=False)
table_name = "map_test"
hudi_options = {
'hoodie.table.name': table_name,
"hoodie.datasource.write.table.type": "COPY_ON_WRITE",
'hoodie.datasource.write.recordkey.field': 'id',
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.precombine.field': "updated_dt",
'hoodie.datasource.write.partitionpath.field': "",
'hoodie.datasource.hive_sync.enable': False,
"hoodie.datasource.hive_sync.mode": "hms",
'hoodie.datasource.hive_sync.sync_as_datasource': 'false',
'hoodie.datasource.hive_sync.database': "usertagging",
'hoodie.datasource.hive_sync.table': table_name,
"hoodie.datasource.hive_sync.partition_fields": "institution_id",
'hoodie.datasource.hive_sync.use_jdbc': 'false',
'hoodie.datasource.write.hive_style_partitioning': 'true',
}
df.coalesce(1).write.format("hudi").options(**hudi_options).mode("append").save("/path/to/map_test")
```
**Stacktrace**
```23/05/01 13:33:31 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout
does not exist
23/05/01 13:33:31 WARN HiveConf: HiveConf of name hive.stats.retries.wait
does not exist
23/05/01 13:33:33 WARN ObjectStore: Version information not found in
metastore. hive.metastore.schema.verification is not enabled so recording the
schema version 2.3.0
23/05/01 13:33:33 WARN ObjectStore: setMetaStoreSchemaVersion called but
recording version is disabled: version = 2.3.0, comment = Set by MetaStore
[email protected]
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/lib/python3.9/site-packages/pyspark/sql/readwriter.py",
line 968, in save
self._jwrite.save(path)
File
"/usr/local/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py",
line 1321, in __call__
File "/usr/local/lib/python3.9/site-packages/pyspark/sql/utils.py", line
190, in deco
return f(*a, **kw)
File
"/usr/local/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py",
line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o104.save.
: org.apache.hudi.org.apache.spark.sql.avro.IncompatibleSchemaException:
Unexpected type MapType(IntegerType,FloatType,true).
at
org.apache.hudi.org.apache.spark.sql.avro.SchemaConverters$.toAvroType(SchemaConverters.scala:211)
at
org.apache.hudi.org.apache.spark.sql.avro.SchemaConverters$.$anonfun$toAvroType$2(SchemaConverters.scala:204)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102)
at
org.apache.hudi.org.apache.spark.sql.avro.SchemaConverters$.toAvroType(SchemaConverters.scala:202)
at
org.apache.hudi.org.apache.spark.sql.avro.HoodieSparkAvroSchemaConverters$.toAvroType(HoodieSparkAvroSchemaConverters.scala:37)
at
org.apache.hudi.AvroConversionUtils$.convertStructTypeToAvroSchema(AvroConversionUtils.scala:164)
at
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:230)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
at
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
at
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)
```
Please let me know if anything else is needed.
Thanks,
Li
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]