smdahmed commented on issue #859: Hudi upsert after a delete in partition will cause valid records inserted to disappear. URL: https://github.com/apache/incubator-hudi/issues/859#issuecomment-527492285 Grateful for all your help Vinoth. I shall prepare the set up and run the program you have provided in my setup. And let you know the results as requested. There is a difference in the code between your approach and mine. My code snippet is below. The code in the below routine is common for all upsert cases i.e. upsert after an insert and upsert after delete. But I see that upsert after delete seems to cause a problem. How easy is it for you to change your example on below lines? (Hope I am not asking too much). ``` // Convert to RDD[HoodieRecord] val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters)) val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace) val hoodieAllIncomingRecords = genericRecords.map(gr => { val orderingVal = DataSourceUtils.getNestedFieldValAsString( gr, parameters(PRECOMBINE_FIELD_OPT_KEY)).asInstanceOf[Comparable[_]] DataSourceUtils.createHoodieRecord(gr, orderingVal, keyGenerator.getKey(gr), parameters(PAYLOAD_CLASS_OPT_KEY)) }).toJavaRDD() // Create the dataset if not present if (!exists) { HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get, storageType, tblName.get, "archived") } // Create a HoodieWriteClient & issue the write. val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get, tblName.get, mapAsJavaMap(parameters) ) val hoodieRecords = if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) { DataSourceUtils.dropDuplicates( jsc, hoodieAllIncomingRecords, mapAsJavaMap(parameters), client.getTimelineServer) } else { hoodieAllIncomingRecords } val commitTime = client.startCommit() val writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, commitTime, operation) // Check for errors and commit the write. val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count() val writeSuccessful = if (errorCount == 0) { log.info("No errors. Proceeding to commit the write.") val metaMap = parameters.filter(kv => kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY))) val commitSuccess = if (metaMap.isEmpty) { client.commit(commitTime, writeStatuses) } else { client.commit(commitTime, writeStatuses, common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap)))) } if (commitSuccess) { log.info("Commit " + commitTime + " successful!") } else { log.info("Commit " + commitTime + " failed!") } ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
