bvaradar commented on a change in pull request #1004: [HUDI-328] Adding delete 
api to HoodieWriteClient
URL: https://github.com/apache/incubator-hudi/pull/1004#discussion_r348501608
 
 

 ##########
 File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 ##########
 @@ -72,131 +73,212 @@ private[hudi] object HoodieSparkSqlWriter {
         parameters(OPERATION_OPT_KEY)
       }
 
-    // register classes & schemas
-    val structName = s"${tblName.get}_record"
-    val nameSpace = s"hoodie.${tblName.get}"
-    sparkContext.getConf.registerKryoClasses(
-      Array(classOf[org.apache.avro.generic.GenericData],
-        classOf[org.apache.avro.Schema]))
-    val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, 
structName, nameSpace)
-    sparkContext.getConf.registerAvroSchemas(schema)
-    log.info(s"Registered avro schema : ${schema.toString(true)}")
-
-    // Convert to RDD[HoodieRecord]
-    val keyGenerator = 
DataSourceUtils.createKeyGenerator(toProperties(parameters))
-    val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, 
structName, nameSpace)
-    val hoodieAllIncomingRecords = genericRecords.map(gr => {
-      val orderingVal = DataSourceUtils.getNestedFieldValAsString(
-        gr, parameters(PRECOMBINE_FIELD_OPT_KEY)).asInstanceOf[Comparable[_]]
-      DataSourceUtils.createHoodieRecord(gr,
-        orderingVal, keyGenerator.getKey(gr), 
parameters(PAYLOAD_CLASS_OPT_KEY))
-    }).toJavaRDD()
+    var writeSuccessful: Boolean = false
+    var commitTime: String = null
+    var writeStatuses: JavaRDD[WriteStatus] = null
 
     val jsc = new JavaSparkContext(sparkContext)
-
     val basePath = new Path(parameters("path"))
     val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
     var exists = fs.exists(new Path(basePath, 
HoodieTableMetaClient.METAFOLDER_NAME))
 
-    // Handle various save modes
-    if (mode == SaveMode.ErrorIfExists && exists) {
-      throw new HoodieException(s"hoodie dataset at $basePath already exists.")
-    }
-    if (mode == SaveMode.Ignore && exists) {
-      log.warn(s"hoodie dataset at $basePath already exists. Ignoring & not 
performing actual writes.")
-      return (true, common.util.Option.empty())
-    }
-    if (mode == SaveMode.Overwrite && exists) {
-      log.warn(s"hoodie dataset at $basePath already exists. Deleting existing 
data & overwriting with new data.")
-      fs.delete(basePath, true)
-      exists = false
-    }
+    if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
+      // register classes & schemas
+      val structName = s"${tblName.get}_record"
+      val nameSpace = s"hoodie.${tblName.get}"
+      sparkContext.getConf.registerKryoClasses(
+        Array(classOf[org.apache.avro.generic.GenericData],
+          classOf[org.apache.avro.Schema]))
+      val schema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, 
nameSpace)
+      sparkContext.getConf.registerAvroSchemas(schema)
+      log.info(s"Registered avro schema : ${schema.toString(true)}")
 
-    // Create the dataset if not present
-    if (!exists) {
-      HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, 
path.get, storageType,
-        tblName.get, "archived")
-    }
+      // Convert to RDD[HoodieRecord]
+      val keyGenerator = 
DataSourceUtils.createKeyGenerator(toProperties(parameters))
+      val genericRecords: RDD[GenericRecord] = 
AvroConversionUtils.createRdd(df, structName, nameSpace)
+      val hoodieAllIncomingRecords = genericRecords.map(gr => {
+        val orderingVal = DataSourceUtils.getNestedFieldValAsString(
+          gr, parameters(PRECOMBINE_FIELD_OPT_KEY)).asInstanceOf[Comparable[_]]
+        DataSourceUtils.createHoodieRecord(gr,
+          orderingVal, keyGenerator.getKey(gr), 
parameters(PAYLOAD_CLASS_OPT_KEY))
+      }).toJavaRDD()
 
-    // Create a HoodieWriteClient & issue the write.
-    val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, 
path.get, tblName.get,
-      mapAsJavaMap(parameters)
-    )
-
-    val hoodieRecords =
-      if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) {
-        DataSourceUtils.dropDuplicates(
-          jsc,
-          hoodieAllIncomingRecords,
-          mapAsJavaMap(parameters), client.getTimelineServer)
-      } else {
-        hoodieAllIncomingRecords
+      // Handle various save modes
+      if (mode == SaveMode.ErrorIfExists && exists) {
 
 Review comment:
   Why not check these immediately after delete handling begins ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to