This is an automated email from the ASF dual-hosted git repository.
ibuenros pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 93855f6 [GOBBLIN-784] Allow setting replication factor in distcp
93855f6 is described below
commit 93855f6bf01316d8639a041c61559404a82010eb
Author: Jack Moseley <[email protected]>
AuthorDate: Tue May 28 13:41:51 2019 -0700
[GOBBLIN-784] Allow setting replication factor in distcp
Closes #2648 from jack-moseley/distcp-replication
---
.../management/copy/writer/FileAwareInputStreamDataWriter.java | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
index f156949..c9e3a28 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
@@ -27,8 +27,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
-import lombok.extern.slf4j.Slf4j;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
@@ -45,6 +43,8 @@ import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.collect.Iterators;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.broker.EmptyKey;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.iface.NotConfiguredException;
@@ -206,7 +206,8 @@ public class FileAwareInputStreamDataWriter extends
InstrumentedDataWriter<FileA
protected void writeImpl(InputStream inputStream, Path writeAt, CopyableFile
copyableFile,
FileAwareInputStream record) throws IOException {
- final short replication = copyableFile.getReplication(this.fs);
+ final short replication =
this.state.getPropAsShort(ConfigurationKeys.WRITER_FILE_REPLICATION_FACTOR,
+ copyableFile.getReplication(this.fs));
final long blockSize = copyableFile.getBlockSize(this.fs);
final long fileSize = copyableFile.getFileStatus().getLen();