This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 0617de4 [GOBBLIN-1057] Optimize unnecessary RPCs in distcp-ng
0617de4 is described below
commit 0617de4a902a199d1c4203e0ff44c5ac615b6c02
Author: Hung Tran <[email protected]>
AuthorDate: Mon Feb 24 15:51:21 2020 -0800
[GOBBLIN-1057] Optimize unnecessary RPCs in distcp-ng
Disable fetching of the source checksum by
default.
There is no code using it, so this should not have
any impact.
If the DataFileVersionStrategy is
ModTimeDataFileVersionStrategy then
optimize the code path by using the modification
time in the FileStatus
object that has already been fetched.
Closes #2897 from htran1/distcp-ng-optimizations
---
.../apache/gobblin/data/management/copy/CopyableFile.java | 11 +++++++++--
.../data/management/copy/hive/HiveCopyEntityHelper.java | 13 ++++++++++---
.../management/copy/replication/ConfigBasedDataset.java | 12 ++++++++++--
3 files changed, 29 insertions(+), 7 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
index 5fbae45..24c9e3a 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
@@ -22,6 +22,7 @@ import
org.apache.gobblin.data.management.copy.PreserveAttributes.Option;
import org.apache.gobblin.dataset.DatasetConstants;
import org.apache.gobblin.dataset.DatasetDescriptor;
import org.apache.gobblin.dataset.Descriptor;
+import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.guid.Guid;
@@ -54,6 +55,8 @@ import com.google.common.collect.Lists;
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@EqualsAndHashCode(callSuper = true)
public class CopyableFile extends CopyEntity implements File {
+ private static final byte[] EMPTY_CHECKSUM = new byte[0];
+
/**
* The source data the file belongs to. For now, since it's only used before
copying, set it to be
* transient so that it won't be serialized, avoid unnecessary data transfer
@@ -250,8 +253,12 @@ public class CopyableFile extends CopyEntity implements
File {
this.configuration.getTargetFs(), this.destination);
}
if (this.checksum == null) {
- FileChecksum checksumTmp = this.origin.isDirectory() ? null :
this.originFs.getFileChecksum(this.origin.getPath());
- this.checksum = checksumTmp == null ? new byte[0] :
checksumTmp.getBytes();
+ if (ConfigUtils.getBoolean(this.configuration.getConfig(),
"copy.skipChecksum", true)) {
+ this.checksum = EMPTY_CHECKSUM;
+ } else {
+ FileChecksum checksumTmp = this.origin.isDirectory() ? null :
this.originFs.getFileChecksum(this.origin.getPath());
+ this.checksum = checksumTmp == null ? EMPTY_CHECKSUM :
checksumTmp.getBytes();
+ }
}
if (this.fileSet == null) {
// Default file set per dataset
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
index 84adc27..e440460 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
+import org.apache.gobblin.util.filesystem.ModTimeDataFileVersionStrategy;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -635,12 +636,18 @@ public class HiveCopyEntityHelper {
// For each source path
Path newPath =
helper.getTargetPathHelper().getTargetPath(sourcePath.getPath(),
desiredTargetLocation.getFileSystem(), partition, true);
boolean shouldCopy = true;
+ // Can optimize by using the mod time that has already been fetched
+ boolean useDirectGetModTime = sourceLocation.versionStrategy.isPresent()
+ && sourceLocation.versionStrategy.get().getClass().getName().equals(
+ ModTimeDataFileVersionStrategy.class.getName());
+
if (desiredTargetExistingPaths.containsKey(newPath)) {
// If the file exists at the destination, check whether it should be
replaced, if not, no need to copy
FileStatus existingTargetStatus =
desiredTargetExistingPaths.get(newPath);
-
- Comparable srcVer =
sourceLocation.versionStrategy.get().getVersion(sourcePath.getPath());
- Comparable dstVer =
desiredTargetLocation.versionStrategy.get().getVersion(existingTargetStatus.getPath());
+ Comparable srcVer = useDirectGetModTime ?
sourcePath.getModificationTime() :
+
sourceLocation.versionStrategy.get().getVersion(sourcePath.getPath());
+ Comparable dstVer = useDirectGetModTime ?
existingTargetStatus.getModificationTime() :
+
desiredTargetLocation.versionStrategy.get().getVersion(existingTargetStatus.getPath());
// destination has higher version, skip the copy
if (srcVer.compareTo(dstVer) <= 0) {
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
index 57f952f..2513f48 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import org.apache.gobblin.util.filesystem.ModTimeDataFileVersionStrategy;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -250,9 +251,16 @@ public class ConfigBasedDataset implements CopyableDataset
{
boolean shouldCopy = true;
+ // Can optimize by using the mod time that has already been fetched
+ boolean useDirectGetModTime = this.srcDataFileVersionStrategy.isPresent()
+ && this.srcDataFileVersionStrategy.get().getClass().getName().equals(
+ ModTimeDataFileVersionStrategy.class.getName());
+
if (copyToFileMap.containsKey(newPath)) {
- Comparable srcVer =
this.srcDataFileVersionStrategy.get().getVersion(originFileStatus.getPath());
- Comparable dstVer =
this.dstDataFileVersionStrategy.get().getVersion(copyToFileMap.get(newPath).getPath());
+ Comparable srcVer = useDirectGetModTime ?
originFileStatus.getModificationTime() :
+
this.srcDataFileVersionStrategy.get().getVersion(originFileStatus.getPath());
+ Comparable dstVer = useDirectGetModTime ?
copyToFileMap.get(newPath).getModificationTime() :
+
this.dstDataFileVersionStrategy.get().getVersion(copyToFileMap.get(newPath).getPath());
// destination has higher version, skip the copy
if (srcVer.compareTo(dstVer) <= 0) {