SmyxBug opened a new issue, #10838:
URL: https://github.com/apache/hudi/issues/10838

   ## Hudi env
   - centos7 
   - hadoop3.1.3 
   - scala2.12.18 
   - spark3.3.0
   
   ## Maven Project pom.xml
   ```xml
           <dependency>
               <groupId>org.apache.hudi</groupId>
               <artifactId>hudi-spark3.3-bundle_2.12</artifactId>
               <version>0.14.0</version>
           </dependency>
           <!-- Apache Spark -->
           <dependency>
               <groupId>org.apache.spark</groupId>
               <artifactId>spark-avro_2.12</artifactId>
               <version>3.5.0</version>
           </dependency>
           <dependency>
               <groupId>org.apache.spark</groupId>
               <artifactId>spark-sql_2.12</artifactId>
               <version>3.5.0</version>
               <exclusions>
                   <exclusion>
                       <groupId>com.fasterxml.jackson.core</groupId>
                       <artifactId>jackson-core</artifactId>
                   </exclusion>
               </exclusions>
           </dependency>
           <dependency>
               <groupId>com.fasterxml.jackson.core</groupId>
               <artifactId>jackson-core</artifactId>
               <version>2.16.1</version>
           </dependency>
   ```
   ## test code
   ```java
   package org.example.hoodiedemo;
   
   import org.apache.commons.lang3.StringUtils;
   import org.apache.hadoop.conf.Configuration;
   import org.apache.hadoop.fs.FileStatus;
   import org.apache.hadoop.fs.FileSystem;
   import org.apache.hadoop.fs.Path;
   import org.apache.hudi.DataSourceWriteOptions;
   import org.apache.hudi.common.model.HoodieTableType;
   import org.apache.hudi.config.HoodieWriteConfig;
   import org.apache.spark.sql.*;
   import org.apache.spark.sql.types.DataType;
   import org.apache.spark.sql.types.DataTypes;
   import org.apache.spark.sql.types.StructField;
   import org.apache.spark.sql.types.StructType;
   import scala.collection.Iterator;
   
   import java.util.ArrayList;
   
   public class HoodieTest {
       public static void main(String[] args) {
           String hdfsPath = "hdfs://192.168.1.68:8020";
           String tablePath = "/datas/hudi-warehouse/";
           String tableName = "t_hudi_spark_jdk11";
           // 创建带有字段的表
           createTable(hdfsPath, tablePath, tableName);
           // 插入数据
           // insertData(hdfsPath, tablePath, tableName);
       }
       private static void createTable(String hdfsPath, String tablePath, 
String tableName) {
           try (SparkSession sparkSession = SparkSession.builder()
                   .appName("HoodieTest")
                   .master("local[*]")
                   .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
                   .getOrCreate()) {
               Configuration configuration = 
sparkSession.sparkContext().hadoopConfiguration();
               configuration.set("fs.defaultFS", hdfsPath);
               StructField[] structFields = new StructField[3];
               structFields[0] = DataTypes.createStructField("id", 
DataTypes.LongType, false);
               structFields[1] = DataTypes.createStructField("name", 
DataTypes.StringType, false);
               structFields[2] = DataTypes.createStructField("age", 
DataTypes.IntegerType, false);
               StructType schema = new StructType(structFields);
               Dataset<Row> rowDataset = sparkSession.createDataFrame(new 
ArrayList<>(), schema);
               rowDataset.write().format("hudi")
                       
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "id")
                       .option(DataSourceWriteOptions.OPERATION_OPT_KEY(), 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL())
                       .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY(), 
HoodieTableType.MERGE_ON_READ.name())
                       .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
                       .mode(SaveMode.Overwrite)
                       .save(tablePath + tableName);
           } catch (Exception e) {
               e.printStackTrace();
           }
       }
       private static void insertData(String hdfsPath, String tablePath, String 
tableName) {
           try (SparkSession sparkSession = SparkSession.builder()
                   .appName("HoodieTest")
                   .master("local[*]")
                   .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
                   .getOrCreate()) {
               Configuration sConf = 
sparkSession.sparkContext().hadoopConfiguration();
               sConf.set("fs.defaultFS", hdfsPath);
               String fileName = "D:\\tmp\\t_hudi_spark_jdk11.json";
               Dataset<Row> dataset = sparkSession.read().format("json")
                       .load("file:///" + fileName);
               DataFrameWriter<Row> frameWriter = 
dataset.write().format("hudi");
               DataFrameWriter<Row> option = 
frameWriter.option(HoodieWriteConfig.TBL_NAME.key(), tableName);
               option.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), 
"id");
               option.option(DataSourceWriteOptions.OPERATION_OPT_KEY(), 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL());
               option.mode(SaveMode.Append).save(tablePath + tableName);
           } catch (Exception e) {
               e.printStackTrace();
           }
       }
   }
   ```
   ## error msg
   ```java
   12:10:36.324 [main] INFO org.apache.spark.SparkContext - Successfully 
stopped SparkContext
   org.apache.hudi.internal.schema.HoodieSchemaException: Failed to convert 
struct type to avro schema: 
StructType(StructField(age,LongType,true),StructField(id,LongType,true),StructField(name,StringType,true))
        at 
org.apache.hudi.AvroConversionUtils$.convertStructTypeToAvroSchema(AvroConversionUtils.scala:147)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.writeInternal(HoodieSparkSqlWriter.scala:276)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:132)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)
        at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
        at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
        at 
org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
        at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
        at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
        at 
org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
        at org.example.hoodiedemo.HoodieTest.insertData(HoodieTest.java:54)
        at org.example.hoodiedemo.HoodieTest.main(HoodieTest.java:34)
   Caused by: java.lang.ClassNotFoundException: 
org.apache.spark.sql.adapter.Spark2Adapter
        at 
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
        at 
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
        at 
org.apache.hudi.SparkAdapterSupport$.sparkAdapter$lzycompute(SparkAdapterSupport.scala:49)
        at 
org.apache.hudi.SparkAdapterSupport$.sparkAdapter(SparkAdapterSupport.scala:35)
        at 
org.apache.hudi.SparkAdapterSupport.sparkAdapter(SparkAdapterSupport.scala:29)
        at 
org.apache.hudi.SparkAdapterSupport.sparkAdapter$(SparkAdapterSupport.scala:29)
        at 
org.apache.hudi.HoodieSparkUtils$.sparkAdapter$lzycompute(HoodieSparkUtils.scala:66)
        at 
org.apache.hudi.HoodieSparkUtils$.sparkAdapter(HoodieSparkUtils.scala:66)
        at 
org.apache.hudi.AvroConversionUtils$.convertStructTypeToAvroSchema(AvroConversionUtils.scala:143)
        ... 34 more
   ```
   
   #### I guess the default SparkAdapter is used because the correct Spark 
version is not available. I wonder how to set the Spark version here?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to