This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new c092c8f [GOBBLIN-1411] Rename staging file name with record count
before moving to task output
c092c8f is described below
commit c092c8f9cfc364de19af323c16cc92c36f19ce2a
Author: vbohra <[email protected]>
AuthorDate: Mon Mar 22 10:07:34 2021 -0700
[GOBBLIN-1411] Rename staging file name with record count before moving to
task output
Closes #3251 from vikrambohra/GOBBLIN-1411
---
.../org/apache/gobblin/writer/FsDataWriter.java | 23 +++++++++++-----------
1 file changed, 11 insertions(+), 12 deletions(-)
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
index ad67041..d8060ef 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
@@ -70,7 +70,7 @@ public abstract class FsDataWriter<D> implements
DataWriter<D>, FinalState, Meta
protected final String fileName;
protected final FileSystem fs;
protected final FileContext fileContext;
- protected final Path stagingFile;
+ protected Path stagingFile;
protected final String partitionKey;
private final GlobalMetadata defaultMetadata;
protected Path outputFile;
@@ -256,18 +256,18 @@ public abstract class FsDataWriter<D> implements
DataWriter<D>, FinalState, Meta
this.bytesWritten = Optional.of(Long.valueOf(stagingFileStatus.getLen()));
+ // Rename staging file to add record count before copying to output file
+ if (this.shouldIncludeRecordCountInFileName) {
+ String filePathWithRecordCount = addRecordCountToStagingFile();
+ this.stagingFile = new Path(filePathWithRecordCount);
+ this.outputFile = new Path(this.outputFile.getParent().toString(), new
Path(filePathWithRecordCount).getName());
+ }
+
LOG.info(String.format("Moving data from %s to %s", this.stagingFile,
this.outputFile));
// For the same reason as deleting the staging file if it already exists,
overwrite
// the output file if it already exists to prevent task retry from being
blocked.
HadoopUtils.renamePath(this.fs, this.stagingFile, this.outputFile, true);
-
- // The staging file is moved to the output path in commit, so rename to
add record count after that
- if (this.shouldIncludeRecordCountInFileName) {
- String filePathWithRecordCount = addRecordCountToFileName();
- this.properties.appendToSetProp(this.allOutputFilesPropName,
filePathWithRecordCount);
- } else {
- this.properties.appendToSetProp(this.allOutputFilesPropName,
getOutputFilePath());
- }
+ this.properties.appendToSetProp(this.allOutputFilesPropName,
this.outputFile.toString());
FsWriterMetrics metrics = new FsWriterMetrics(
this.id,
@@ -301,13 +301,12 @@ public abstract class FsDataWriter<D> implements
DataWriter<D>, FinalState, Meta
this.closer.close();
}
- private synchronized String addRecordCountToFileName()
+ private synchronized String addRecordCountToStagingFile()
throws IOException {
- String filePath = getOutputFilePath();
+ String filePath = this.stagingFile.toString();
String filePathWithRecordCount =
IngestionRecordCountProvider.constructFilePath(filePath, recordsWritten());
LOG.info("Renaming " + filePath + " to " + filePathWithRecordCount);
HadoopUtils.renamePath(this.fs, new Path(filePath), new
Path(filePathWithRecordCount), true);
- this.outputFile = new Path(filePathWithRecordCount);
return filePathWithRecordCount;
}