Repository: incubator-gobblin
Updated Branches:
  refs/heads/master e66b8a66c -> a7ca3d7bf


[GOBBLIN-627] Allow files in task output directory to be overwritten when 
renaming files with record count.

Closes #2497 from sv2000/overwriteRename


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/a7ca3d7b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/a7ca3d7b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/a7ca3d7b

Branch: refs/heads/master
Commit: a7ca3d7bf71e892aa5fd1445da397c0d6f564101
Parents: e66b8a6
Author: suvasude <[email protected]>
Authored: Tue Nov 6 10:46:31 2018 -0800
Committer: Hung Tran <[email protected]>
Committed: Tue Nov 6 10:46:31 2018 -0800

----------------------------------------------------------------------
 .../gobblin/configuration/ConfigurationKeys.java  |  1 +
 .../org/apache/gobblin/writer/FsDataWriter.java   | 17 +++++++----------
 .../java/org/apache/gobblin/runtime/Task.java     | 10 ++++++++++
 .../org/apache/gobblin/runtime/TaskState.java     |  1 +
 .../java/org/apache/gobblin/util/HadoopUtils.java | 18 ++++++++++++++++++
 5 files changed, 37 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a7ca3d7b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 8383c2a..b1a525e 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -239,6 +239,7 @@ public class ConfigurationKeys {
   public static final String JOB_CONFIG_FILE_PATH_KEY = "job.config.path";
   public static final String TASK_FAILURE_EXCEPTION_KEY = 
"task.failure.exception";
   public static final String TASK_RETRIES_KEY = "task.retries";
+  public static final String TASK_IGNORE_CLOSE_FAILURES = 
"task.ignoreCloseFailures";
   public static final String JOB_FAILURES_KEY = "job.failures";
   public static final String JOB_TRACKING_URL_KEY = "job.tracking.url";
   public static final String FORK_STATE_KEY = "fork.state";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a7ca3d7b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
----------------------------------------------------------------------
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
index aa228af..d814622 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
@@ -22,6 +22,7 @@ import java.io.OutputStream;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -72,6 +73,7 @@ public abstract class FsDataWriter<D> implements 
DataWriter<D>, FinalState, Meta
   protected final int branchId;
   protected final String fileName;
   protected final FileSystem fs;
+  protected final FileContext fileContext;
   protected final Path stagingFile;
   protected final String partitionKey;
   private final GlobalMetadata defaultMetadata;
@@ -102,6 +104,7 @@ public abstract class FsDataWriter<D> implements 
DataWriter<D>, FinalState, Meta
     // Add all job configuration properties so they are picked up by Hadoop
     JobConfigurationUtils.putStateIntoConfiguration(properties, conf);
     this.fs = WriterUtils.getWriterFS(properties, this.numBranches, 
this.branchId);
+    this.fileContext = FileContext.getFileContext(conf);
 
     // Initialize staging/output directory
     Path writerStagingDir = this.writerAttemptIdOptional.isPresent() ? 
WriterUtils
@@ -255,15 +258,9 @@ public abstract class FsDataWriter<D> implements 
DataWriter<D>, FinalState, Meta
     this.bytesWritten = Optional.of(Long.valueOf(stagingFileStatus.getLen()));
 
     LOG.info(String.format("Moving data from %s to %s", this.stagingFile, 
this.outputFile));
-    // For the same reason as deleting the staging file if it already exists, 
deleting
-    // the output file if it already exists prevents task retry from being 
blocked.
-    if (this.fs.exists(this.outputFile)) {
-      LOG.warn(String.format("Task output file %s already exists", 
this.outputFile));
-      HadoopUtils.deletePath(this.fs, this.outputFile, false);
-    }
-
-    HadoopUtils.renamePath(this.fs, this.stagingFile, this.outputFile);
-
+    // For the same reason as deleting the staging file if it already exists, 
overwrite
+    // the output file if it already exists to prevent task retry from being 
blocked.
+    HadoopUtils.renamePath(this.fileContext, this.stagingFile, 
this.outputFile, true);
  }
 
   /**
@@ -309,7 +306,7 @@ public abstract class FsDataWriter<D> implements 
DataWriter<D>, FinalState, Meta
     String filePath = getOutputFilePath();
     String filePathWithRecordCount = 
IngestionRecordCountProvider.constructFilePath(filePath, recordsWritten());
     LOG.info("Renaming " + filePath + " to " + filePathWithRecordCount);
-    HadoopUtils.renamePath(this.fs, new Path(filePath), new 
Path(filePathWithRecordCount));
+    HadoopUtils.renamePath(this.fileContext, new Path(filePath), new 
Path(filePathWithRecordCount), true);
     this.outputFile = new Path(filePathWithRecordCount);
     return filePathWithRecordCount;
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a7ca3d7b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
index 66f459b..a46468d 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
@@ -134,6 +134,8 @@ public class Task implements TaskIFace {
   private final String jobId;
   private final String taskId;
   private final String taskKey;
+  private final boolean isIgnoreCloseFailures;
+
   private final TaskContext taskContext;
   private final TaskState taskState;
   private final TaskStateTracker taskStateTracker;
@@ -180,6 +182,7 @@ public class Task implements TaskIFace {
     this.jobId = this.taskState.getJobId();
     this.taskId = this.taskState.getTaskId();
     this.taskKey = this.taskState.getTaskKey();
+    this.isIgnoreCloseFailures = 
this.taskState.getJobState().getPropAsBoolean(ConfigurationKeys.TASK_IGNORE_CLOSE_FAILURES,
 false);
     this.taskStateTracker = taskStateTracker;
     this.taskExecutor = taskExecutor;
     this.countDownLatch = countDownLatch;
@@ -883,6 +886,8 @@ public class Task implements TaskIFace {
    * 3. Check whether to publish data in task.
    */
   public void commit() {
+    boolean isTaskFailed = false;
+
     try {
       // Check if all forks succeeded
       List<Integer> failedForkIds = new ArrayList<>();
@@ -916,6 +921,7 @@ public class Task implements TaskIFace {
       }
     } catch (Throwable t) {
       failTask(t);
+      isTaskFailed = true;
     } finally {
       addConstructsFinalStateToTaskState(extractor, converter, rowChecker);
 
@@ -928,6 +934,10 @@ public class Task implements TaskIFace {
         closer.close();
       } catch (Throwable t) {
         LOG.error("Failed to close all open resources", t);
+        if ((!isIgnoreCloseFailures) && (!isTaskFailed)) {
+          LOG.error("Setting the task state to failed.");
+          failTask(t);
+        }
       }
 
       for (Map.Entry<Optional<Fork>, Optional<Future<?>>> forkAndFuture : 
this.forks.entrySet()) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a7ca3d7b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java
index 6eb2f5f..a9d99be 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java
@@ -88,6 +88,7 @@ public class TaskState extends WorkUnitState {
   private String taskKey;
   @Getter
   private Optional<String> taskAttemptId;
+
   private long startTime = 0;
   private long endTime = 0;
   private long duration;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a7ca3d7b/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java
----------------------------------------------------------------------
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java
index 4b01ab4..2513d3f 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java
@@ -40,10 +40,12 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RawLocalFileSystem;
@@ -236,6 +238,22 @@ public class HadoopUtils {
   }
 
   /**
+   * A wrapper around {@link FileContext#rename(Path, Path, 
Options.Rename...)}.
+   */
+  public static void renamePath(FileContext fc, Path oldName, Path newName) 
throws IOException {
+    renamePath(fc, oldName, newName, false);
+  }
+
+  /**
+   * A wrapper around {@link FileContext#rename(Path, Path, 
Options.Rename...)}}.
+   */
+  public static void renamePath(FileContext fc, Path oldName, Path newName, 
boolean overwrite)
+      throws IOException {
+    Options.Rename renameOptions = (overwrite) ? Options.Rename.OVERWRITE : 
Options.Rename.NONE;
+    fc.rename(oldName, newName, renameOptions);
+  }
+
+  /**
    * A wrapper around {@link FileSystem#rename(Path, Path)} which throws 
{@link IOException} if
    * {@link FileSystem#rename(Path, Path)} returns False.
    */

Reply via email to