willforevercn opened a new issue, #8617:
URL: https://github.com/apache/hudi/issues/8617

   **Describe the problem you faced**
   I would like to know if HUDI tables supports MapType or not.
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. launch spark in local machine
   2. create a dataframe with MapType
   3. try save it as hudi table in local disk
   
   **Expected behavior**
   
   HUDI table created with column as map
   
   **Environment Description**
   * Hudi version :
   0.13.0
   * Spark version :
   spark3.3
   * Hive version :
   N/A
   * Hadoop version :
   N/A
   * Storage (HDFS/S3/GCS..) :
   local
   * Running on Docker? (yes/no) :
   no
   
   **Additional context**
   
   sample code:
   ```
   import random
   
   from pyspark.sql.functions import rand, udf, lit, col, when, 
current_timestamp
   from pyspark.sql.types import IntegerType, ArrayType, StringType, MapType, 
StructType, StructField, LongType, FloatType
   
   tag_ids_all = list(range(10000000, 10000999))
   
   def get_tags():
       count = random.randint(0, 5)
       tags = {}
       for i in range(count):
           key = tag_ids_all[random.randint(0, len(tag_ids_all) - 1)]
           value = 1.0
           tags[key] = value
       return tags
   
   tags_udf = udf(get_tags, MapType(IntegerType(), FloatType()))
   
   df = spark.range(100) \
       .withColumn("id", (rand(seed=67) * 1000000).cast("integer")) \
       .withColumn("tags", tags_udf()) \
       .withColumn("updated_dt", current_timestamp())
   
   df.show(truncate=False)
   
   table_name = "map_test"
   hudi_options = {
       'hoodie.table.name': table_name,
       "hoodie.datasource.write.table.type": "COPY_ON_WRITE",
       'hoodie.datasource.write.recordkey.field': 'id',
       'hoodie.datasource.write.table.name': table_name,
       'hoodie.datasource.write.operation': 'upsert',
       'hoodie.datasource.write.precombine.field': "updated_dt",
       'hoodie.datasource.write.partitionpath.field': "",
   
       'hoodie.datasource.hive_sync.enable': False,
       "hoodie.datasource.hive_sync.mode": "hms",
       'hoodie.datasource.hive_sync.sync_as_datasource': 'false',
       'hoodie.datasource.hive_sync.database': "usertagging",
       'hoodie.datasource.hive_sync.table': table_name,
       "hoodie.datasource.hive_sync.partition_fields": "institution_id",
       'hoodie.datasource.hive_sync.use_jdbc': 'false',
       'hoodie.datasource.write.hive_style_partitioning': 'true',
   }
   
df.coalesce(1).write.format("hudi").options(**hudi_options).mode("append").save("/path/to/map_test")
   ```
   
   **Stacktrace**
   
   ```23/05/01 13:33:31 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout 
does not exist
   23/05/01 13:33:31 WARN HiveConf: HiveConf of name hive.stats.retries.wait 
does not exist
   
   23/05/01 13:33:33 WARN ObjectStore: Version information not found in 
metastore. hive.metastore.schema.verification is not enabled so recording the 
schema version 2.3.0
   23/05/01 13:33:33 WARN ObjectStore: setMetaStoreSchemaVersion called but 
recording version is disabled: version = 2.3.0, comment = Set by MetaStore 
[email protected]
   Traceback (most recent call last):
     File "<stdin>", line 1, in <module>
     File "/usr/local/lib/python3.9/site-packages/pyspark/sql/readwriter.py", 
line 968, in save
       self._jwrite.save(path)
     File 
"/usr/local/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py",
 line 1321, in __call__
     File "/usr/local/lib/python3.9/site-packages/pyspark/sql/utils.py", line 
190, in deco
       return f(*a, **kw)
     File 
"/usr/local/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py",
 line 326, in get_return_value
   py4j.protocol.Py4JJavaError: An error occurred while calling o104.save.
   : org.apache.hudi.org.apache.spark.sql.avro.IncompatibleSchemaException: 
Unexpected type MapType(IntegerType,FloatType,true).
        at 
org.apache.hudi.org.apache.spark.sql.avro.SchemaConverters$.toAvroType(SchemaConverters.scala:211)
        at 
org.apache.hudi.org.apache.spark.sql.avro.SchemaConverters$.$anonfun$toAvroType$2(SchemaConverters.scala:204)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102)
        at 
org.apache.hudi.org.apache.spark.sql.avro.SchemaConverters$.toAvroType(SchemaConverters.scala:202)
        at 
org.apache.hudi.org.apache.spark.sql.avro.HoodieSparkAvroSchemaConverters$.toAvroType(HoodieSparkAvroSchemaConverters.scala:37)
        at 
org.apache.hudi.AvroConversionUtils$.convertStructTypeToAvroSchema(AvroConversionUtils.scala:164)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:230)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)
        at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
        at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
        at 
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
        at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
        at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
        at 
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:829)
   ```
   Please let me know if anything else is needed.
   
   Thanks,
   Li
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to