This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 9053c3a  [GOBBLIN-1576] skip appending record count to staging file if 
present… (#3429)
9053c3a is described below

commit 9053c3a2cd6fbba94afbfeab85b941865dd95e3c
Author: vbohra <[email protected]>
AuthorDate: Tue Nov 23 09:56:03 2021 -0800

    [GOBBLIN-1576] skip appending record count to staging file if present… 
(#3429)
    
    * [GOBBLIN-1576] skip appending record count to staging file if present 
already
    
    * fixed checkstyle
    
    * fixed method
    
    Co-authored-by: Vikram Bohra <[email protected]>
---
 .../main/java/org/apache/gobblin/writer/FsDataWriter.java  |  4 ++++
 .../util/recordcount/IngestionRecordCountProvider.java     | 14 +++++++++++++-
 .../util/recordcount/IngestionRecordCountProviderTest.java |  2 ++
 3 files changed, 19 insertions(+), 1 deletion(-)

diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
index 7670b5b..5b6d54a 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
@@ -304,6 +304,10 @@ public abstract class FsDataWriter<D> implements 
DataWriter<D>, FinalState, Meta
   private synchronized String addRecordCountToStagingFile()
       throws IOException {
     String filePath = this.stagingFile.toString();
+    if(IngestionRecordCountProvider.containsRecordCount(filePath)) {
+      LOG.info(String.format("Path %s already has record count", filePath));
+      return filePath;
+    }
     String filePathWithRecordCount = 
IngestionRecordCountProvider.constructFilePath(filePath, recordsWritten());
     LOG.info("Renaming " + filePath + " to " + filePathWithRecordCount);
     HadoopUtils.renamePath(this.fs, new Path(filePath), new 
Path(filePathWithRecordCount), true);
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/recordcount/IngestionRecordCountProvider.java
 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/recordcount/IngestionRecordCountProvider.java
index 35f0664..bd86a18 100644
--- 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/recordcount/IngestionRecordCountProvider.java
+++ 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/recordcount/IngestionRecordCountProvider.java
@@ -39,8 +39,11 @@ public class IngestionRecordCountProvider extends 
RecordCountProvider {
 
   /**
    * Construct a new file path by appending record count to the filename of 
the given file path, separated by SEPARATOR.
+   * return original path if record count already exists
    * For example, given path: "/a/b/c/file.avro" and record count: 123,
    * the new path returned will be: "/a/b/c/file.123.avro"
+   * given path: "/a/b/c/file.123.avro" and record count: 123,
+   * returned "/a/b/c/file.123.avro"
    */
   public static String constructFilePath(String oldFilePath, long 
recordCounts) {
     return new Path(new Path(oldFilePath).getParent(), 
Files.getNameWithoutExtension(oldFilePath).toString() + SEPARATOR
@@ -48,12 +51,21 @@ public class IngestionRecordCountProvider extends 
RecordCountProvider {
   }
 
   /**
+   * @param filepath format /a/b/c/file.123.avro
+   * @return true if record count exists
+   */
+  public static boolean containsRecordCount(String filepath) {
+    String[] components = filepath.split(Pattern.quote(SEPARATOR));
+    return components.length >= 3 && 
StringUtils.isNumeric(components[components.length - 2]);
+  }
+
+  /**
    * The record count should be the last component before the filename 
extension.
    */
   @Override
   public long getRecordCount(Path filepath) {
     String[] components = filepath.getName().split(Pattern.quote(SEPARATOR));
-    Preconditions.checkArgument(components.length >= 2 && 
StringUtils.isNumeric(components[components.length - 2]),
+    Preconditions.checkArgument(containsRecordCount(filepath.getName()),
         String.format("Filename %s does not follow the pattern: 
FILENAME.RECORDCOUNT.EXTENSION", filepath));
     return Long.parseLong(components[components.length - 2]);
   }
diff --git 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/recordcount/IngestionRecordCountProviderTest.java
 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/recordcount/IngestionRecordCountProviderTest.java
index 9248b03..349ad4d 100644
--- 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/recordcount/IngestionRecordCountProviderTest.java
+++ 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/recordcount/IngestionRecordCountProviderTest.java
@@ -33,6 +33,8 @@ public class IngestionRecordCountProviderTest {
     IngestionRecordCountProvider filenameRecordCountProvider = new 
IngestionRecordCountProvider();
     
Assert.assertEquals(IngestionRecordCountProvider.constructFilePath("/a/b/c.avro",
 123), "/a/b/c.123.avro");
     Assert.assertEquals(filenameRecordCountProvider.getRecordCount(new 
Path("/a/b/c.123.avro")), 123);
+    
Assert.assertEquals(IngestionRecordCountProvider.containsRecordCount("/a/b/c.123.avro"),
 true);
+    
Assert.assertEquals(IngestionRecordCountProvider.containsRecordCount("/a/b/c.xyz.avro"),
 false);
   }
 
 }

Reply via email to