n3nash commented on a change in pull request #651: Spark Stage retry handling
URL: https://github.com/apache/incubator-hudi/pull/651#discussion_r279572515
##########
File path:
hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java
##########
@@ -392,96 +386,25 @@ protected HoodieMergeHandle getUpdateHandle(String
commitTime, String fileId,
* @param activeTimeline Hoodie active timeline
* @param instantToBeDeleted Instant to be deleted
*/
- protected static void deleteInflightInstant(boolean deleteInstant,
HoodieActiveTimeline activeTimeline,
+ protected void deleteInflightInstant(boolean deleteInstant,
HoodieActiveTimeline activeTimeline,
HoodieInstant instantToBeDeleted) {
// Remove the rolled back inflight commits
if (deleteInstant) {
- activeTimeline.deleteInflight(instantToBeDeleted);
- logger.info("Deleted inflight commit " + instantToBeDeleted);
- } else {
- logger.warn("Rollback finished without deleting inflight instant file.
Instant=" + instantToBeDeleted);
- }
- }
-
- /**
- * Finalize the written data files
- *
- * @param stats List of HoodieWriteStats
- * @return number of files finalized
- */
- @Override
- @SuppressWarnings("unchecked")
- public void finalizeWrite(JavaSparkContext jsc, List<HoodieWriteStat> stats)
- throws HoodieIOException {
-
- super.finalizeWrite(jsc, stats);
-
- if (config.shouldUseTempFolderForCopyOnWrite()) {
- // This is to rename each data file from temporary path to its final
location
- jsc.parallelize(stats, config.getFinalizeWriteParallelism())
- .foreach(writeStat -> {
- final FileSystem fs = getMetaClient().getFs();
- final Path finalPath = new Path(config.getBasePath(),
writeStat.getPath());
-
- if (writeStat.getTempPath() != null) {
- final Path tempPath = new Path(config.getBasePath(),
writeStat.getTempPath());
- boolean success;
- try {
- logger.info("Renaming temporary file: " + tempPath + " to " +
finalPath);
- success = fs.rename(tempPath, finalPath);
- } catch (IOException e) {
- throw new HoodieIOException(
- "Failed to rename file: " + tempPath + " to " + finalPath);
- }
-
- if (!success) {
- throw new HoodieIOException(
- "Failed to rename file: " + tempPath + " to " + finalPath);
- }
- }
- });
-
- // clean temporary data files
- cleanTemporaryDataFiles(jsc);
- }
- }
-
- /**
- * Clean temporary data files that are produced from previous failed commit
or retried spark
- * stages.
- */
- private void cleanTemporaryDataFiles(JavaSparkContext jsc) {
- if (!config.shouldUseTempFolderForCopyOnWrite()) {
- return;
- }
-
- final FileSystem fs = getMetaClient().getFs();
- final Path temporaryFolder = new Path(config.getBasePath(),
- HoodieTableMetaClient.TEMPFOLDER_NAME);
- try {
- if (!fs.exists(temporaryFolder)) {
- logger.info("Temporary folder does not exist: " + temporaryFolder);
- return;
- }
- List<FileStatus> fileStatusesList =
Arrays.asList(fs.listStatus(temporaryFolder));
- List<Tuple2<String, Boolean>> results = jsc
- .parallelize(fileStatusesList,
config.getFinalizeWriteParallelism()).map(fileStatus -> {
- FileSystem fs1 = getMetaClient().getFs();
- boolean success = fs1.delete(fileStatus.getPath(), false);
- logger
- .info("Deleting file in temporary folder" +
fileStatus.getPath() + "\t" + success);
- return new Tuple2<>(fileStatus.getPath().toString(), success);
- }).collect();
-
- for (Tuple2<String, Boolean> result : results) {
- if (!result._2()) {
- logger.info("Failed to delete file: " + result._1());
- throw new HoodieIOException("Failed to delete file in temporary
folder: " + result._1());
+ try {
+ //TODO: Cleanup Hoodie 1.0 rollback to simply call
super.rollbackFailedWrites with consistency check disabled
+ // and empty WriteStat list.
+ Path markerDir = new
Path(metaClient.getMarkerFolderPath(instantToBeDeleted.getTimestamp()));
+ logger.info("Removing marker directory=" + markerDir);
+ if (metaClient.getFs().exists(markerDir)) {
Review comment:
Both these operations are not atomic, does that mean that a marker file can
get deleted without the commit file actually being deleted ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services