This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 9053c3a [GOBBLIN-1576] skip appending record count to staging file if
present… (#3429)
9053c3a is described below
commit 9053c3a2cd6fbba94afbfeab85b941865dd95e3c
Author: vbohra <[email protected]>
AuthorDate: Tue Nov 23 09:56:03 2021 -0800
[GOBBLIN-1576] skip appending record count to staging file if present…
(#3429)
* [GOBBLIN-1576] skip appending record count to staging file if present
already
* fixed checkstyle
* fixed method
Co-authored-by: Vikram Bohra <[email protected]>
---
.../main/java/org/apache/gobblin/writer/FsDataWriter.java | 4 ++++
.../util/recordcount/IngestionRecordCountProvider.java | 14 +++++++++++++-
.../util/recordcount/IngestionRecordCountProviderTest.java | 2 ++
3 files changed, 19 insertions(+), 1 deletion(-)
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
index 7670b5b..5b6d54a 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/FsDataWriter.java
@@ -304,6 +304,10 @@ public abstract class FsDataWriter<D> implements
DataWriter<D>, FinalState, Meta
private synchronized String addRecordCountToStagingFile()
throws IOException {
String filePath = this.stagingFile.toString();
+ if(IngestionRecordCountProvider.containsRecordCount(filePath)) {
+ LOG.info(String.format("Path %s already has record count", filePath));
+ return filePath;
+ }
String filePathWithRecordCount =
IngestionRecordCountProvider.constructFilePath(filePath, recordsWritten());
LOG.info("Renaming " + filePath + " to " + filePathWithRecordCount);
HadoopUtils.renamePath(this.fs, new Path(filePath), new
Path(filePathWithRecordCount), true);
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/recordcount/IngestionRecordCountProvider.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/recordcount/IngestionRecordCountProvider.java
index 35f0664..bd86a18 100644
---
a/gobblin-utility/src/main/java/org/apache/gobblin/util/recordcount/IngestionRecordCountProvider.java
+++
b/gobblin-utility/src/main/java/org/apache/gobblin/util/recordcount/IngestionRecordCountProvider.java
@@ -39,8 +39,11 @@ public class IngestionRecordCountProvider extends
RecordCountProvider {
/**
* Construct a new file path by appending record count to the filename of
the given file path, separated by SEPARATOR.
+ * return original path if record count already exists
* For example, given path: "/a/b/c/file.avro" and record count: 123,
* the new path returned will be: "/a/b/c/file.123.avro"
+ * given path: "/a/b/c/file.123.avro" and record count: 123,
+ * returned "/a/b/c/file.123.avro"
*/
public static String constructFilePath(String oldFilePath, long
recordCounts) {
return new Path(new Path(oldFilePath).getParent(),
Files.getNameWithoutExtension(oldFilePath).toString() + SEPARATOR
@@ -48,12 +51,21 @@ public class IngestionRecordCountProvider extends
RecordCountProvider {
}
/**
+ * @param filepath format /a/b/c/file.123.avro
+ * @return true if record count exists
+ */
+ public static boolean containsRecordCount(String filepath) {
+ String[] components = filepath.split(Pattern.quote(SEPARATOR));
+ return components.length >= 3 &&
StringUtils.isNumeric(components[components.length - 2]);
+ }
+
+ /**
* The record count should be the last component before the filename
extension.
*/
@Override
public long getRecordCount(Path filepath) {
String[] components = filepath.getName().split(Pattern.quote(SEPARATOR));
- Preconditions.checkArgument(components.length >= 2 &&
StringUtils.isNumeric(components[components.length - 2]),
+ Preconditions.checkArgument(containsRecordCount(filepath.getName()),
String.format("Filename %s does not follow the pattern:
FILENAME.RECORDCOUNT.EXTENSION", filepath));
return Long.parseLong(components[components.length - 2]);
}
diff --git
a/gobblin-utility/src/test/java/org/apache/gobblin/util/recordcount/IngestionRecordCountProviderTest.java
b/gobblin-utility/src/test/java/org/apache/gobblin/util/recordcount/IngestionRecordCountProviderTest.java
index 9248b03..349ad4d 100644
---
a/gobblin-utility/src/test/java/org/apache/gobblin/util/recordcount/IngestionRecordCountProviderTest.java
+++
b/gobblin-utility/src/test/java/org/apache/gobblin/util/recordcount/IngestionRecordCountProviderTest.java
@@ -33,6 +33,8 @@ public class IngestionRecordCountProviderTest {
IngestionRecordCountProvider filenameRecordCountProvider = new
IngestionRecordCountProvider();
Assert.assertEquals(IngestionRecordCountProvider.constructFilePath("/a/b/c.avro",
123), "/a/b/c.123.avro");
Assert.assertEquals(filenameRecordCountProvider.getRecordCount(new
Path("/a/b/c.123.avro")), 123);
+
Assert.assertEquals(IngestionRecordCountProvider.containsRecordCount("/a/b/c.123.avro"),
true);
+
Assert.assertEquals(IngestionRecordCountProvider.containsRecordCount("/a/b/c.xyz.avro"),
false);
}
}