nsivabalan commented on a change in pull request #1004: [HUDI-328] Adding
delete api to HoodieWriteClient
URL: https://github.com/apache/incubator-hudi/pull/1004#discussion_r347111334
##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -72,131 +73,200 @@ private[hudi] object HoodieSparkSqlWriter {
parameters(OPERATION_OPT_KEY)
}
- // register classes & schemas
- val structName = s"${tblName.get}_record"
- val nameSpace = s"hoodie.${tblName.get}"
- sparkContext.getConf.registerKryoClasses(
- Array(classOf[org.apache.avro.generic.GenericData],
- classOf[org.apache.avro.Schema]))
- val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema,
structName, nameSpace)
- sparkContext.getConf.registerAvroSchemas(schema)
- log.info(s"Registered avro schema : ${schema.toString(true)}")
-
- // Convert to RDD[HoodieRecord]
- val keyGenerator =
DataSourceUtils.createKeyGenerator(toProperties(parameters))
- val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df,
structName, nameSpace)
- val hoodieAllIncomingRecords = genericRecords.map(gr => {
- val orderingVal = DataSourceUtils.getNestedFieldValAsString(
- gr, parameters(PRECOMBINE_FIELD_OPT_KEY)).asInstanceOf[Comparable[_]]
- DataSourceUtils.createHoodieRecord(gr,
- orderingVal, keyGenerator.getKey(gr),
parameters(PAYLOAD_CLASS_OPT_KEY))
- }).toJavaRDD()
+ var writeSuccessful: Boolean = false
+ var commitTime: String = null
+ var writeStatuses: JavaRDD[WriteStatus] = null
val jsc = new JavaSparkContext(sparkContext)
-
val basePath = new Path(parameters("path"))
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
var exists = fs.exists(new Path(basePath,
HoodieTableMetaClient.METAFOLDER_NAME))
- // Handle various save modes
- if (mode == SaveMode.ErrorIfExists && exists) {
- throw new HoodieException(s"hoodie dataset at $basePath already exists.")
- }
- if (mode == SaveMode.Ignore && exists) {
- log.warn(s"hoodie dataset at $basePath already exists. Ignoring & not
performing actual writes.")
- return (true, common.util.Option.empty())
- }
- if (mode == SaveMode.Overwrite && exists) {
- log.warn(s"hoodie dataset at $basePath already exists. Deleting existing
data & overwriting with new data.")
- fs.delete(basePath, true)
- exists = false
- }
+ if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
Review comment:
For now, I have a big if else for delete and non delete operations. I am
having issues converting generic types from java to scala. While I try to
figure that out(to make common code blocks common), thought will put it for
review.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services