zhangxinjian123 commented on issue #829: Structured Streaming read kafka
URL: https://github.com/apache/incubator-hudi/issues/829#issuecomment-520660528
 
 
   For example, the DataFrame I started writing saved to the path is three 
fields, the second two fields are normal, the third writing is four fields and 
the error is reported
   
   The error information is as follows:
   Caused by: com.uber.hoodie.exception.HoodieException: 
java.util.concurrent.ExecutionException: 
java.lang.ArrayIndexOutOfBoundsException: 10
        at 
com.uber.hoodie.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:148)
        at 
com.uber.hoodie.table.HoodieCopyOnWriteTable.handleUpdateInternal(HoodieCopyOnWriteTable.java:206)
        ... 33 more
   Caused by: java.util.concurrent.ExecutionException: 
java.lang.ArrayIndexOutOfBoundsException: 10
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at 
com.uber.hoodie.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:146)
   
   My code is as follows:
    dataList.writeStream.foreachBatch(
   
         (batchDF: Dataset[mutable.Map[String, String]], batchId: Long) =>
   
           if (!batchDF.isEmpty) {
   
             val cols: Array[String] = batchDF.take(1).flatMap(_.keys)
             val rddRow: RDD[Row] = batchDF.rdd.filter(_.nonEmpty).map { m: 
mutable.Map[String, String] =>
               val seq = m.values.toSeq
               Row.fromSeq(seq)
             }
   
             val fields: Array[StructField] = cols.map(fieldName => 
StructField(fieldName, StringType, nullable = true))
             val schemad: StructType = StructType(fields)
             val sparkSql: DataFrame = spark.createDataFrame(rddRow, schemad)
   
             val writer = sparkSql
               .write
               .format("com.uber.hoodie")
               .option("hoodie.insert.shuffle.parallelism", "2")
               .option("hoodie.upsert.shuffle.parallelism", "2")
               .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, tableType)
               .option(DataSourceWriteOptions.OPERATION_OPT_KEY,
                 DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
               .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY,
                 "_row_key")
               .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,
                 "partition")
               .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY,
                 "timestamp")
               .option(HoodieWriteConfig.TABLE_NAME, tableName)
               .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, if 
(nonPartitionedTable) {
                 classOf[NonpartitionedKeyGenerator].getCanonicalName
               }
               else {
                 classOf[SimpleKeyGenerator].getCanonicalName
               })
               .mode(SaveMode.Append)
             writer.save(tablePath)
   
   
           }
   
   
       ).start()
         .awaitTermination()
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to