bvaradar commented on a change in pull request #1004: [HUDI-328] Adding delete
api to HoodieWriteClient
URL: https://github.com/apache/incubator-hudi/pull/1004#discussion_r348501608
##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -72,131 +73,212 @@ private[hudi] object HoodieSparkSqlWriter {
parameters(OPERATION_OPT_KEY)
}
- // register classes & schemas
- val structName = s"${tblName.get}_record"
- val nameSpace = s"hoodie.${tblName.get}"
- sparkContext.getConf.registerKryoClasses(
- Array(classOf[org.apache.avro.generic.GenericData],
- classOf[org.apache.avro.Schema]))
- val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema,
structName, nameSpace)
- sparkContext.getConf.registerAvroSchemas(schema)
- log.info(s"Registered avro schema : ${schema.toString(true)}")
-
- // Convert to RDD[HoodieRecord]
- val keyGenerator =
DataSourceUtils.createKeyGenerator(toProperties(parameters))
- val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df,
structName, nameSpace)
- val hoodieAllIncomingRecords = genericRecords.map(gr => {
- val orderingVal = DataSourceUtils.getNestedFieldValAsString(
- gr, parameters(PRECOMBINE_FIELD_OPT_KEY)).asInstanceOf[Comparable[_]]
- DataSourceUtils.createHoodieRecord(gr,
- orderingVal, keyGenerator.getKey(gr),
parameters(PAYLOAD_CLASS_OPT_KEY))
- }).toJavaRDD()
+ var writeSuccessful: Boolean = false
+ var commitTime: String = null
+ var writeStatuses: JavaRDD[WriteStatus] = null
val jsc = new JavaSparkContext(sparkContext)
-
val basePath = new Path(parameters("path"))
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
var exists = fs.exists(new Path(basePath,
HoodieTableMetaClient.METAFOLDER_NAME))
- // Handle various save modes
- if (mode == SaveMode.ErrorIfExists && exists) {
- throw new HoodieException(s"hoodie dataset at $basePath already exists.")
- }
- if (mode == SaveMode.Ignore && exists) {
- log.warn(s"hoodie dataset at $basePath already exists. Ignoring & not
performing actual writes.")
- return (true, common.util.Option.empty())
- }
- if (mode == SaveMode.Overwrite && exists) {
- log.warn(s"hoodie dataset at $basePath already exists. Deleting existing
data & overwriting with new data.")
- fs.delete(basePath, true)
- exists = false
- }
+ if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
+ // register classes & schemas
+ val structName = s"${tblName.get}_record"
+ val nameSpace = s"hoodie.${tblName.get}"
+ sparkContext.getConf.registerKryoClasses(
+ Array(classOf[org.apache.avro.generic.GenericData],
+ classOf[org.apache.avro.Schema]))
+ val schema =
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName,
nameSpace)
+ sparkContext.getConf.registerAvroSchemas(schema)
+ log.info(s"Registered avro schema : ${schema.toString(true)}")
- // Create the dataset if not present
- if (!exists) {
- HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration,
path.get, storageType,
- tblName.get, "archived")
- }
+ // Convert to RDD[HoodieRecord]
+ val keyGenerator =
DataSourceUtils.createKeyGenerator(toProperties(parameters))
+ val genericRecords: RDD[GenericRecord] =
AvroConversionUtils.createRdd(df, structName, nameSpace)
+ val hoodieAllIncomingRecords = genericRecords.map(gr => {
+ val orderingVal = DataSourceUtils.getNestedFieldValAsString(
+ gr, parameters(PRECOMBINE_FIELD_OPT_KEY)).asInstanceOf[Comparable[_]]
+ DataSourceUtils.createHoodieRecord(gr,
+ orderingVal, keyGenerator.getKey(gr),
parameters(PAYLOAD_CLASS_OPT_KEY))
+ }).toJavaRDD()
- // Create a HoodieWriteClient & issue the write.
- val client = DataSourceUtils.createHoodieClient(jsc, schema.toString,
path.get, tblName.get,
- mapAsJavaMap(parameters)
- )
-
- val hoodieRecords =
- if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) {
- DataSourceUtils.dropDuplicates(
- jsc,
- hoodieAllIncomingRecords,
- mapAsJavaMap(parameters), client.getTimelineServer)
- } else {
- hoodieAllIncomingRecords
+ // Handle various save modes
+ if (mode == SaveMode.ErrorIfExists && exists) {
Review comment:
Why not check these immediately after delete handling begins ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services