zhangxinjian123 commented on issue #829: Structured Streaming read kafka URL: https://github.com/apache/incubator-hudi/issues/829#issuecomment-520660528 For example, the DataFrame I started writing saved to the path is three fields, the second two fields are normal, the third writing is four fields and the error is reported The error information is as follows: Caused by: com.uber.hoodie.exception.HoodieException: java.util.concurrent.ExecutionException: java.lang.ArrayIndexOutOfBoundsException: 10 at com.uber.hoodie.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:148) at com.uber.hoodie.table.HoodieCopyOnWriteTable.handleUpdateInternal(HoodieCopyOnWriteTable.java:206) ... 33 more Caused by: java.util.concurrent.ExecutionException: java.lang.ArrayIndexOutOfBoundsException: 10 at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at com.uber.hoodie.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:146) My code is as follows: dataList.writeStream.foreachBatch( (batchDF: Dataset[mutable.Map[String, String]], batchId: Long) => if (!batchDF.isEmpty) { val cols: Array[String] = batchDF.take(1).flatMap(_.keys) val rddRow: RDD[Row] = batchDF.rdd.filter(_.nonEmpty).map { m: mutable.Map[String, String] => val seq = m.values.toSeq Row.fromSeq(seq) } val fields: Array[StructField] = cols.map(fieldName => StructField(fieldName, StringType, nullable = true)) val schemad: StructType = StructType(fields) val sparkSql: DataFrame = spark.createDataFrame(rddRow, schemad) val writer = sparkSql .write .format("com.uber.hoodie") .option("hoodie.insert.shuffle.parallelism", "2") .option("hoodie.upsert.shuffle.parallelism", "2") .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, tableType) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "partition") .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp") .option(HoodieWriteConfig.TABLE_NAME, tableName) .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, if (nonPartitionedTable) { classOf[NonpartitionedKeyGenerator].getCanonicalName } else { classOf[SimpleKeyGenerator].getCanonicalName }) .mode(SaveMode.Append) writer.save(tablePath) } ).start() .awaitTermination()
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
