Repository: incubator-gobblin Updated Branches: refs/heads/master e66b8a66c -> a7ca3d7bf
[GOBBLIN-627] Allow files in task output directory to be overwritten when renaming files with record count. Closes #2497 from sv2000/overwriteRename Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/a7ca3d7b Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/a7ca3d7b Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/a7ca3d7b Branch: refs/heads/master Commit: a7ca3d7bf71e892aa5fd1445da397c0d6f564101 Parents: e66b8a6 Author: suvasude <[email protected]> Authored: Tue Nov 6 10:46:31 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Tue Nov 6 10:46:31 2018 -0800 ---------------------------------------------------------------------- .../gobblin/configuration/ConfigurationKeys.java | 1 + .../org/apache/gobblin/writer/FsDataWriter.java | 17 +++++++---------- .../java/org/apache/gobblin/runtime/Task.java | 10 ++++++++++ .../org/apache/gobblin/runtime/TaskState.java | 1 + .../java/org/apache/gobblin/util/HadoopUtils.java | 18 ++++++++++++++++++ 5 files changed, 37 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a7ca3d7b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index 8383c2a..b1a525e 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -239,6 +239,7 @@ public class ConfigurationKeys { public static final String JOB_CONFIG_FILE_PATH_KEY = "job.config.path"; public static final String TASK_FAILURE_EXCEPTION_KEY = "task.failure.exception"; public static final String TASK_RETRIES_KEY = "task.retries"; + public static final String TASK_IGNORE_CLOSE_FAILURES = "task.ignoreCloseFailures"; public static final String JOB_FAILURES_KEY = "job.failures"; public static final String JOB_TRACKING_URL_KEY = "job.tracking.url"; public static final String FORK_STATE_KEY = "fork.state"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a7ca3d7b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java index aa228af..d814622 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java @@ -22,6 +22,7 @@ import java.io.OutputStream; import java.util.List; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -72,6 +73,7 @@ public abstract class FsDataWriter<D> implements DataWriter<D>, FinalState, Meta protected final int branchId; protected final String fileName; protected final FileSystem fs; + protected final FileContext fileContext; protected final Path stagingFile; protected final String partitionKey; private final GlobalMetadata defaultMetadata; @@ -102,6 +104,7 @@ public abstract class FsDataWriter<D> implements DataWriter<D>, FinalState, Meta // Add all job configuration properties so they are picked up by Hadoop JobConfigurationUtils.putStateIntoConfiguration(properties, conf); this.fs = WriterUtils.getWriterFS(properties, this.numBranches, this.branchId); + this.fileContext = FileContext.getFileContext(conf); // Initialize staging/output directory Path writerStagingDir = this.writerAttemptIdOptional.isPresent() ? WriterUtils @@ -255,15 +258,9 @@ public abstract class FsDataWriter<D> implements DataWriter<D>, FinalState, Meta this.bytesWritten = Optional.of(Long.valueOf(stagingFileStatus.getLen())); LOG.info(String.format("Moving data from %s to %s", this.stagingFile, this.outputFile)); - // For the same reason as deleting the staging file if it already exists, deleting - // the output file if it already exists prevents task retry from being blocked. - if (this.fs.exists(this.outputFile)) { - LOG.warn(String.format("Task output file %s already exists", this.outputFile)); - HadoopUtils.deletePath(this.fs, this.outputFile, false); - } - - HadoopUtils.renamePath(this.fs, this.stagingFile, this.outputFile); - + // For the same reason as deleting the staging file if it already exists, overwrite + // the output file if it already exists to prevent task retry from being blocked. + HadoopUtils.renamePath(this.fileContext, this.stagingFile, this.outputFile, true); } /** @@ -309,7 +306,7 @@ public abstract class FsDataWriter<D> implements DataWriter<D>, FinalState, Meta String filePath = getOutputFilePath(); String filePathWithRecordCount = IngestionRecordCountProvider.constructFilePath(filePath, recordsWritten()); LOG.info("Renaming " + filePath + " to " + filePathWithRecordCount); - HadoopUtils.renamePath(this.fs, new Path(filePath), new Path(filePathWithRecordCount)); + HadoopUtils.renamePath(this.fileContext, new Path(filePath), new Path(filePathWithRecordCount), true); this.outputFile = new Path(filePathWithRecordCount); return filePathWithRecordCount; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a7ca3d7b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java index 66f459b..a46468d 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java @@ -134,6 +134,8 @@ public class Task implements TaskIFace { private final String jobId; private final String taskId; private final String taskKey; + private final boolean isIgnoreCloseFailures; + private final TaskContext taskContext; private final TaskState taskState; private final TaskStateTracker taskStateTracker; @@ -180,6 +182,7 @@ public class Task implements TaskIFace { this.jobId = this.taskState.getJobId(); this.taskId = this.taskState.getTaskId(); this.taskKey = this.taskState.getTaskKey(); + this.isIgnoreCloseFailures = this.taskState.getJobState().getPropAsBoolean(ConfigurationKeys.TASK_IGNORE_CLOSE_FAILURES, false); this.taskStateTracker = taskStateTracker; this.taskExecutor = taskExecutor; this.countDownLatch = countDownLatch; @@ -883,6 +886,8 @@ public class Task implements TaskIFace { * 3. Check whether to publish data in task. */ public void commit() { + boolean isTaskFailed = false; + try { // Check if all forks succeeded List<Integer> failedForkIds = new ArrayList<>(); @@ -916,6 +921,7 @@ public class Task implements TaskIFace { } } catch (Throwable t) { failTask(t); + isTaskFailed = true; } finally { addConstructsFinalStateToTaskState(extractor, converter, rowChecker); @@ -928,6 +934,10 @@ public class Task implements TaskIFace { closer.close(); } catch (Throwable t) { LOG.error("Failed to close all open resources", t); + if ((!isIgnoreCloseFailures) && (!isTaskFailed)) { + LOG.error("Setting the task state to failed."); + failTask(t); + } } for (Map.Entry<Optional<Fork>, Optional<Future<?>>> forkAndFuture : this.forks.entrySet()) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a7ca3d7b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java index 6eb2f5f..a9d99be 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskState.java @@ -88,6 +88,7 @@ public class TaskState extends WorkUnitState { private String taskKey; @Getter private Optional<String> taskAttemptId; + private long startTime = 0; private long endTime = 0; private long duration; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a7ca3d7b/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java index 4b01ab4..2513d3f 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java @@ -40,10 +40,12 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RawLocalFileSystem; @@ -236,6 +238,22 @@ public class HadoopUtils { } /** + * A wrapper around {@link FileContext#rename(Path, Path, Options.Rename...)}. + */ + public static void renamePath(FileContext fc, Path oldName, Path newName) throws IOException { + renamePath(fc, oldName, newName, false); + } + + /** + * A wrapper around {@link FileContext#rename(Path, Path, Options.Rename...)}}. + */ + public static void renamePath(FileContext fc, Path oldName, Path newName, boolean overwrite) + throws IOException { + Options.Rename renameOptions = (overwrite) ? Options.Rename.OVERWRITE : Options.Rename.NONE; + fc.rename(oldName, newName, renameOptions); + } + + /** * A wrapper around {@link FileSystem#rename(Path, Path)} which throws {@link IOException} if * {@link FileSystem#rename(Path, Path)} returns False. */
