This is an automated email from the ASF dual-hosted git repository.

ibuenros pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 93855f6  [GOBBLIN-784] Allow setting replication factor in distcp
93855f6 is described below

commit 93855f6bf01316d8639a041c61559404a82010eb
Author: Jack Moseley <[email protected]>
AuthorDate: Tue May 28 13:41:51 2019 -0700

    [GOBBLIN-784] Allow setting replication factor in distcp
    
    Closes #2648 from jack-moseley/distcp-replication
---
 .../management/copy/writer/FileAwareInputStreamDataWriter.java     | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
index f156949..c9e3a28 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
@@ -27,8 +27,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
-import lombok.extern.slf4j.Slf4j;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileContext;
@@ -45,6 +43,8 @@ import com.google.common.base.Predicate;
 import com.google.common.base.Strings;
 import com.google.common.collect.Iterators;
 
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.broker.EmptyKey;
 import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
 import org.apache.gobblin.broker.iface.NotConfiguredException;
@@ -206,7 +206,8 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
   protected void writeImpl(InputStream inputStream, Path writeAt, CopyableFile 
copyableFile,
       FileAwareInputStream record) throws IOException {
 
-    final short replication = copyableFile.getReplication(this.fs);
+    final short replication = 
this.state.getPropAsShort(ConfigurationKeys.WRITER_FILE_REPLICATION_FACTOR,
+        copyableFile.getReplication(this.fs));
     final long blockSize = copyableFile.getBlockSize(this.fs);
     final long fileSize = copyableFile.getFileStatus().getLen();
 

Reply via email to