This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new c092c8f  [GOBBLIN-1411] Rename staging file name with record count 
before moving to task output
c092c8f is described below

commit c092c8f9cfc364de19af323c16cc92c36f19ce2a
Author: vbohra <[email protected]>
AuthorDate: Mon Mar 22 10:07:34 2021 -0700

    [GOBBLIN-1411] Rename staging file name with record count before moving to 
task output
    
    Closes #3251 from vikrambohra/GOBBLIN-1411
---
 .../org/apache/gobblin/writer/FsDataWriter.java    | 23 +++++++++++-----------
 1 file changed, 11 insertions(+), 12 deletions(-)

diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
index ad67041..d8060ef 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
@@ -70,7 +70,7 @@ public abstract class FsDataWriter<D> implements 
DataWriter<D>, FinalState, Meta
   protected final String fileName;
   protected final FileSystem fs;
   protected final FileContext fileContext;
-  protected final Path stagingFile;
+  protected Path stagingFile;
   protected final String partitionKey;
   private final GlobalMetadata defaultMetadata;
   protected Path outputFile;
@@ -256,18 +256,18 @@ public abstract class FsDataWriter<D> implements 
DataWriter<D>, FinalState, Meta
 
     this.bytesWritten = Optional.of(Long.valueOf(stagingFileStatus.getLen()));
 
+    // Rename staging file to add record count before copying to output file
+    if (this.shouldIncludeRecordCountInFileName) {
+      String filePathWithRecordCount = addRecordCountToStagingFile();
+      this.stagingFile = new Path(filePathWithRecordCount);
+      this.outputFile = new Path(this.outputFile.getParent().toString(), new 
Path(filePathWithRecordCount).getName());
+    }
+
     LOG.info(String.format("Moving data from %s to %s", this.stagingFile, 
this.outputFile));
     // For the same reason as deleting the staging file if it already exists, 
overwrite
     // the output file if it already exists to prevent task retry from being 
blocked.
     HadoopUtils.renamePath(this.fs, this.stagingFile, this.outputFile, true);
-
-    // The staging file is moved to the output path in commit, so rename to 
add record count after that
-    if (this.shouldIncludeRecordCountInFileName) {
-      String filePathWithRecordCount = addRecordCountToFileName();
-      this.properties.appendToSetProp(this.allOutputFilesPropName, 
filePathWithRecordCount);
-    } else {
-      this.properties.appendToSetProp(this.allOutputFilesPropName, 
getOutputFilePath());
-    }
+    this.properties.appendToSetProp(this.allOutputFilesPropName, 
this.outputFile.toString());
 
     FsWriterMetrics metrics = new FsWriterMetrics(
         this.id,
@@ -301,13 +301,12 @@ public abstract class FsDataWriter<D> implements 
DataWriter<D>, FinalState, Meta
     this.closer.close();
   }
 
-  private synchronized String addRecordCountToFileName()
+  private synchronized String addRecordCountToStagingFile()
       throws IOException {
-    String filePath = getOutputFilePath();
+    String filePath = this.stagingFile.toString();
     String filePathWithRecordCount = 
IngestionRecordCountProvider.constructFilePath(filePath, recordsWritten());
     LOG.info("Renaming " + filePath + " to " + filePathWithRecordCount);
     HadoopUtils.renamePath(this.fs, new Path(filePath), new 
Path(filePathWithRecordCount), true);
-    this.outputFile = new Path(filePathWithRecordCount);
     return filePathWithRecordCount;
   }
 

Reply via email to