Repository: falcon
Updated Branches:
  refs/heads/master b0efc6fbf -> 3efffd4d9


FALCON-1944 Ability to provide additional DistCP options for mirroring 
extensions and feed replication

Author: Sowmya Ramesh <[email protected]>

Reviewers: "Venkat  Ranganathan <[email protected]>, Balu Vellanki 
<[email protected]>"

Closes #251 from sowmyaramesh/FALCON-1944


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/3efffd4d
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/3efffd4d
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/3efffd4d

Branch: refs/heads/master
Commit: 3efffd4d946c0a4a8e6885f86e1da3107de52e00
Parents: b0efc6f
Author: Sowmya Ramesh <[email protected]>
Authored: Wed Aug 3 15:09:29 2016 -0700
Committer: bvellanki <[email protected]>
Committed: Wed Aug 3 15:09:29 2016 -0700

----------------------------------------------------------------------
 .../main/META/hdfs-mirroring-properties.json    |  84 ++++++++++
 .../runtime/hdfs-mirroring-workflow.xml         |  28 ++++
 .../hdfs-snapshot-mirroring-properties.json     |  78 +++++++++-
 .../hdfs-snapshot-mirroring-workflow.xml        |  36 ++++-
 .../replication/HdfsSnapshotReplicator.java     |  99 +++++++++---
 .../replication/HdfsSnapshotReplicatorTest.java |   7 +-
 .../apache/falcon/util/DistCPOptionsUtil.java   | 153 +++++++++++++++++++
 .../falcon/util/ReplicationDistCpOption.java    |   8 +-
 docs/src/site/twiki/EntitySpecification.twiki   |  12 +-
 .../falcon/extensions/AbstractExtension.java    |  12 ++
 .../mirroring/hdfs/HdfsMirroringExtension.java  |  16 +-
 .../hdfs/HdfsMirroringExtensionProperties.java  |   7 +-
 .../HdfsSnapshotMirrorProperties.java           |   4 +-
 .../HdfsSnapshotMirroringExtension.java         |   6 +-
 .../apache/falcon/extensions/ExtensionTest.java |   2 +-
 .../feed/FSReplicationWorkflowBuilder.java      |   1 +
 .../feed/FeedReplicationWorkflowBuilder.java    |  20 ++-
 .../falcon/replication/FeedReplicator.java      |  92 +++++------
 .../falcon/replication/FeedReplicatorTest.java  |  20 ++-
 19 files changed, 580 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/addons/extensions/hdfs-mirroring/src/main/META/hdfs-mirroring-properties.json
----------------------------------------------------------------------
diff --git 
a/addons/extensions/hdfs-mirroring/src/main/META/hdfs-mirroring-properties.json 
b/addons/extensions/hdfs-mirroring/src/main/META/hdfs-mirroring-properties.json
index f1b4775..9d4a425 100644
--- 
a/addons/extensions/hdfs-mirroring/src/main/META/hdfs-mirroring-properties.json
+++ 
b/addons/extensions/hdfs-mirroring/src/main/META/hdfs-mirroring-properties.json
@@ -122,6 +122,90 @@
             "example":"100"
         },
         {
+            "propertyName":"tdeEncryptionEnabled",
+            "required":false,
+            "description":"Set this flag to true if TDE encryption is enabled 
on source and target. Default value is false",
+            "example":"true"
+        },
+        {
+            "propertyName":"overwrite",
+            "required":false,
+            "description":"DitcCP overwrites target files even if they exist 
at the source, or have the same contents",
+            "example":"true"
+        },
+        {
+            "propertyName":"ignoreErrors",
+            "required":false,
+            "description":"Ignore failures during DistCp",
+            "example":"true"
+        },
+        {
+            "propertyName":"skipChecksum",
+            "required":false,
+            "description":"Skip checksum errors during DistCP",
+            "example":"true"
+        },
+        {
+            "propertyName":"removeDeletedFiles",
+            "required":false,
+            "description":"DistCP deletes the files existing in the dst but 
not in src",
+            "example":"true"
+        },
+        {
+            "propertyName":"preserveBlockSize",
+            "required":false,
+            "description":"Preserve block size during DistCP",
+            "example":"true"
+        },
+        {
+            "propertyName":"preserveReplicationNumber",
+            "required":false,
+            "description":"Preserve replication number during DistCP",
+            "example":"false"
+        },
+        {
+            "propertyName":"preservePermission",
+            "required":false,
+            "description":"Preserve permission during DistCP",
+            "example":"true"
+        },
+        {
+            "propertyName":"preserveUser",
+            "required":false,
+            "description":"Preserve user during DistCP",
+            "example":"true"
+        },
+        {
+            "propertyName":"preserveGroup",
+            "required":false,
+            "description":"Preserve group during DistCP",
+            "example":"true"
+        },
+        {
+            "propertyName":"preserveChecksumType",
+            "required":false,
+            "description":"Preserve checksum type during DistCP",
+            "example":"true"
+        },
+        {
+            "propertyName":"preserveAcl",
+            "required":false,
+            "description":"Preserve ACL during DistCP",
+            "example":"false"
+        },
+        {
+            "propertyName":"preserveXattr",
+            "required":false,
+            "description":"Preserve Xattr during DistCP",
+            "example":"true"
+        },
+        {
+            "propertyName":"preserveTimes",
+            "required":false,
+            "description":"Preserve access and modification times during 
DistCP",
+            "example":"true"
+        },
+        {
             "propertyName":"jobNotificationType",
             "required":false,
             "description":"Email Notification for Falcon instance completion",

http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/addons/extensions/hdfs-mirroring/src/main/resources/runtime/hdfs-mirroring-workflow.xml
----------------------------------------------------------------------
diff --git 
a/addons/extensions/hdfs-mirroring/src/main/resources/runtime/hdfs-mirroring-workflow.xml
 
b/addons/extensions/hdfs-mirroring/src/main/resources/runtime/hdfs-mirroring-workflow.xml
index c0504fb..7929dd7 100644
--- 
a/addons/extensions/hdfs-mirroring/src/main/resources/runtime/hdfs-mirroring-workflow.xml
+++ 
b/addons/extensions/hdfs-mirroring/src/main/resources/runtime/hdfs-mirroring-workflow.xml
@@ -63,6 +63,32 @@
             <arg>${distcpMaxMaps}</arg>
             <arg>-mapBandwidth</arg>
             <arg>${distcpMapBandwidth}</arg>
+            <arg>-overwrite</arg>
+            <arg>${overwrite}</arg>
+            <arg>-ignoreErrors</arg>
+            <arg>${ignoreErrors}</arg>
+            <arg>-skipChecksum</arg>
+            <arg>${skipChecksum}</arg>
+            <arg>-removeDeletedFiles</arg>
+            <arg>${removeDeletedFiles}</arg>
+            <arg>-preserveBlockSize</arg>
+            <arg>${preserveBlockSize}</arg>
+            <arg>-preserveReplicationNumber</arg>
+            <arg>${preserveReplicationNumber}</arg>
+            <arg>-preservePermission</arg>
+            <arg>${preservePermission}</arg>
+            <arg>-preserveUser</arg>
+            <arg>${preserveUser}</arg>
+            <arg>-preserveGroup</arg>
+            <arg>${preserveGroup}</arg>
+            <arg>-preserveChecksumType</arg>
+            <arg>${preserveChecksumType}</arg>
+            <arg>-preserveAcl</arg>
+            <arg>${preserveAcl}</arg>
+            <arg>-preserveXattr</arg>
+            <arg>${preserveXattr}</arg>
+            <arg>-preserveTimes</arg>
+            <arg>${preserveTimes}</arg>
             <arg>-sourcePaths</arg>
             <arg>${sourceDir}</arg>
             <arg>-targetPath</arg>
@@ -71,6 +97,8 @@
             <arg>FILESYSTEM</arg>
             <arg>-availabilityFlag</arg>
             <arg>${availabilityFlag == 'NA' ? "NA" : availabilityFlag}</arg>
+            <arg>-tdeEncryptionEnabled</arg>
+            <arg>${tdeEncryptionEnabled}</arg>
             <arg>-counterLogDir</arg>
             <arg>${logDir}/job-${nominalTime}/${srcClusterName == 'NA' ? '' : 
srcClusterName}</arg>
         </java>

http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/addons/extensions/hdfs-snapshot-mirroring/src/main/META/hdfs-snapshot-mirroring-properties.json
----------------------------------------------------------------------
diff --git 
a/addons/extensions/hdfs-snapshot-mirroring/src/main/META/hdfs-snapshot-mirroring-properties.json
 
b/addons/extensions/hdfs-snapshot-mirroring/src/main/META/hdfs-snapshot-mirroring-properties.json
index 46554c1..430f1ec 100644
--- 
a/addons/extensions/hdfs-snapshot-mirroring/src/main/META/hdfs-snapshot-mirroring-properties.json
+++ 
b/addons/extensions/hdfs-snapshot-mirroring/src/main/META/hdfs-snapshot-mirroring-properties.json
@@ -140,13 +140,13 @@
       "example":"10"
     },
     {
-      "propertyName":"distcpMaxMaps",
+      "propertyName":"maxMaps",
       "required":false,
       "description":"Maximum number of mappers for DistCP",
       "example":"1"
     },
     {
-      "propertyName":"distcpMapBandwidth",
+      "propertyName":"mapBandwidth",
       "required":false,
       "description":"Bandwidth in MB for each mapper in DistCP",
       "example":"100"
@@ -158,6 +158,78 @@
       "example":"false"
     },
     {
+      "propertyName":"ignoreErrors",
+      "required":false,
+      "description":"Ignore failures during DistCp",
+      "example":"true"
+    },
+    {
+      "propertyName":"skipChecksum",
+      "required":false,
+      "description":"Skip checksum errors during DistCP",
+      "example":"true"
+    },
+    {
+      "propertyName":"removeDeletedFiles",
+      "required":false,
+      "description":"DistCP deletes the files existing in the dst but not in 
src",
+      "example":"true"
+    },
+    {
+      "propertyName":"preserveBlockSize",
+      "required":false,
+      "description":"Preserve block size during DistCP",
+      "example":"true"
+    },
+    {
+      "propertyName":"preserveReplicationNumber",
+      "required":false,
+      "description":"Preserve replication number during DistCP",
+      "example":"false"
+    },
+    {
+      "propertyName":"preservePermission",
+      "required":false,
+      "description":"Preserve permission during DistCP",
+      "example":"true"
+    },
+    {
+      "propertyName":"preserveUser",
+      "required":false,
+      "description":"Preserve user during DistCP",
+      "example":"true"
+    },
+    {
+      "propertyName":"preserveGroup",
+      "required":false,
+      "description":"Preserve group during DistCP",
+      "example":"true"
+    },
+    {
+      "propertyName":"preserveChecksumType",
+      "required":false,
+      "description":"Preserve checksum type during DistCP",
+      "example":"true"
+    },
+    {
+      "propertyName":"preserveAcl",
+      "required":false,
+      "description":"Preserve ACL during DistCP",
+      "example":"false"
+    },
+    {
+      "propertyName":"preserveXattr",
+      "required":false,
+      "description":"Preserve Xattr during DistCP",
+      "example":"true"
+    },
+    {
+      "propertyName":"preserveTimes",
+      "required":false,
+      "description":"Preserve access and modification times during DistCP",
+      "example":"true"
+    },
+    {
       "propertyName":"jobNotificationType",
       "required":false,
       "description":"Email Notification for Falcon instance completion",
@@ -170,4 +242,4 @@
       "example":"[email protected], [email protected]"
     }
   ]
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/addons/extensions/hdfs-snapshot-mirroring/src/main/resources/runtime/hdfs-snapshot-mirroring-workflow.xml
----------------------------------------------------------------------
diff --git 
a/addons/extensions/hdfs-snapshot-mirroring/src/main/resources/runtime/hdfs-snapshot-mirroring-workflow.xml
 
b/addons/extensions/hdfs-snapshot-mirroring/src/main/resources/runtime/hdfs-snapshot-mirroring-workflow.xml
index c735167..899f6b0 100644
--- 
a/addons/extensions/hdfs-snapshot-mirroring/src/main/resources/runtime/hdfs-snapshot-mirroring-workflow.xml
+++ 
b/addons/extensions/hdfs-snapshot-mirroring/src/main/resources/runtime/hdfs-snapshot-mirroring-workflow.xml
@@ -59,10 +59,36 @@
             
<main-class>org.apache.falcon.snapshots.replication.HdfsSnapshotReplicator</main-class>
             <arg>-Dmapred.job.queue.name=${queueName}</arg>
             <arg>-Dmapred.job.priority=${jobPriority}</arg>
-            <arg>-distcpMaxMaps</arg>
-            <arg>${distcpMaxMaps}</arg>
-            <arg>-distcpMapBandwidth</arg>
-            <arg>${distcpMapBandwidth}</arg>
+            <arg>-maxMaps</arg>
+            <arg>${maxMaps}</arg>
+            <arg>-mapBandwidth</arg>
+            <arg>${mapBandwidth}</arg>
+            <arg>-overwrite</arg>
+            <arg>${overwrite}</arg>
+            <arg>-ignoreErrors</arg>
+            <arg>${ignoreErrors}</arg>
+            <arg>-skipChecksum</arg>
+            <arg>${skipChecksum}</arg>
+            <arg>-removeDeletedFiles</arg>
+            <arg>${removeDeletedFiles}</arg>
+            <arg>-preserveBlockSize</arg>
+            <arg>${preserveBlockSize}</arg>
+            <arg>-preserveReplicationNumber</arg>
+            <arg>${preserveReplicationNumber}</arg>
+            <arg>-preservePermission</arg>
+            <arg>${preservePermission}</arg>
+            <arg>-preserveUser</arg>
+            <arg>${preserveUser}</arg>
+            <arg>-preserveGroup</arg>
+            <arg>${preserveGroup}</arg>
+            <arg>-preserveChecksumType</arg>
+            <arg>${preserveChecksumType}</arg>
+            <arg>-preserveAcl</arg>
+            <arg>${preserveAcl}</arg>
+            <arg>-preserveXattr</arg>
+            <arg>${preserveXattr}</arg>
+            <arg>-preserveTimes</arg>
+            <arg>${preserveTimes}</arg>
             <arg>-sourceNN</arg>
             <arg>${sourceNN}</arg>
             <arg>-sourceExecUrl</arg>
@@ -169,4 +195,4 @@
         </message>
     </kill>
     <end name="end"/>
-</workflow-app>
\ No newline at end of file
+</workflow-app>

http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicator.java
----------------------------------------------------------------------
diff --git 
a/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicator.java
 
b/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicator.java
index 6f5defe..cb597fe 100644
--- 
a/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicator.java
+++ 
b/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicator.java
@@ -28,6 +28,8 @@ import org.apache.falcon.FalconException;
 import 
org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirrorProperties;
 import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.snapshots.util.HdfsSnapshotUtil;
+import org.apache.falcon.util.DistCPOptionsUtil;
+import org.apache.falcon.util.ReplicationDistCpOption;
 import org.apache.falcon.workflow.util.OozieActionConfigurationHelper;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
@@ -138,17 +140,14 @@ public class HdfsSnapshotReplicator extends Configured 
implements Tool {
     private DistCpOptions getDistCpOptions(String sourceStorageUrl, String 
targetStorageUrl,
                                            DistributedFileSystem sourceFs, 
DistributedFileSystem targetFs,
                                            String sourceDir, String targetDir,
-                                           String currentSnapshotName) throws 
FalconException {
+                                           String currentSnapshotName) throws 
FalconException, IOException {
 
-        List<Path> sourceUris=new ArrayList<Path>();
+        List<Path> sourceUris = new ArrayList<>();
         sourceUris.add(new Path(getStagingUri(sourceStorageUrl, sourceDir)));
 
-        DistCpOptions distcpOptions = new DistCpOptions(sourceUris,
-                new Path(getStagingUri(targetStorageUrl, targetDir)));
 
-        // Settings needed for Snapshot distCp.
-        distcpOptions.setSyncFolder(true);
-        distcpOptions.setDeleteMissing(true);
+        DistCpOptions distcpOptions = DistCPOptionsUtil.getDistCpOptions(cmd, 
sourceUris,
+                new Path(getStagingUri(targetStorageUrl, targetDir)), true, 
null);
 
         // Use snapshot diff if two snapshots exist. Else treat it as simple 
distCp.
         // get latest replicated snapshot.
@@ -157,24 +156,14 @@ public class HdfsSnapshotReplicator extends Configured 
implements Tool {
             distcpOptions.setUseDiff(true, replicatedSnapshotName, 
currentSnapshotName);
         }
 
-        if 
(Boolean.valueOf(cmd.getOptionValue(HdfsSnapshotMirrorProperties.TDE_ENCRYPTION_ENABLED.getName())))
 {
-            // skipCRCCheck and update enabled
-            distcpOptions.setSkipCRC(true);
-        }
-
-        distcpOptions.setBlocking(true);
-        distcpOptions.setMaxMaps(
-                
Integer.parseInt(cmd.getOptionValue(HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName())));
-        distcpOptions.setMapBandwidth(
-                
Integer.parseInt(cmd.getOptionValue(HdfsSnapshotMirrorProperties.MAP_BANDWIDTH_IN_MB.getName())));
         return distcpOptions;
     }
 
     private String findLatestReplicatedSnapshot(DistributedFileSystem 
sourceFs, DistributedFileSystem targetFs,
-            String sourceDir, String targetDir) throws FalconException {
+                                                String sourceDir, String 
targetDir) throws FalconException {
         try {
             FileStatus[] sourceSnapshots = sourceFs.listStatus(new 
Path(getSnapshotDir(sourceDir)));
-            Set<String> sourceSnapshotNames = new HashSet<String>();
+            Set<String> sourceSnapshotNames = new HashSet<>();
             for (FileStatus snapshot : sourceSnapshots) {
                 sourceSnapshotNames.add(snapshot.getPath().getName());
             }
@@ -190,8 +179,8 @@ public class HdfsSnapshotReplicator extends Configured 
implements Tool {
                 });
 
                 // get most recent snapshot name that exists in source.
-                for (int i = 0; i < targetSnapshots.length; i++) {
-                    String name = targetSnapshots[i].getPath().getName();
+                for (FileStatus targetSnapshot : targetSnapshots) {
+                    String name = targetSnapshot.getPath().getName();
                     if (sourceSnapshotNames.contains(name)) {
                         return name;
                     }
@@ -219,7 +208,7 @@ public class HdfsSnapshotReplicator extends Configured 
implements Tool {
     protected CommandLine getCommand(String[] args) throws FalconException {
         Options options = new Options();
 
-        Option opt = new 
Option(HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName(),
+        Option opt = new 
Option(HdfsSnapshotMirrorProperties.MAX_MAPS.getName(),
                 true, "max number of maps to use for distcp");
         opt.setRequired(true);
         options.addOption(opt);
@@ -270,11 +259,75 @@ public class HdfsSnapshotReplicator extends Configured 
implements Tool {
         opt.setRequired(true);
         options.addOption(opt);
 
+        opt = new 
Option(ReplicationDistCpOption.DISTCP_OPTION_OVERWRITE.getName(), true, "option 
to force overwrite");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new 
Option(ReplicationDistCpOption.DISTCP_OPTION_IGNORE_ERRORS.getName(), true, 
"abort on error");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new 
Option(ReplicationDistCpOption.DISTCP_OPTION_SKIP_CHECKSUM.getName(), true, 
"skip checksums");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new 
Option(ReplicationDistCpOption.DISTCP_OPTION_REMOVE_DELETED_FILES.getName(), 
true,
+                "remove deleted files - should there be files in the target 
directory that"
+                        + "were removed from the source directory"
+        );
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new 
Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_BLOCK_SIZE.getName(), 
true,
+                "preserve block size");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new 
Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_REPLICATION_NUMBER.getName(),
 true,
+                "preserve replication count");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new 
Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_PERMISSIONS.getName(), 
true,
+                "preserve permissions");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new 
Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_USER.getName(), true,
+                "preserve user");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new 
Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_GROUP.getName(), true,
+                "preserve group");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new 
Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_CHECKSUM_TYPE.getName(), 
true,
+                "preserve checksum type");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new 
Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_ACL.getName(), true,
+                "preserve ACL");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new 
Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_XATTR.getName(), true,
+                "preserve XATTR");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new 
Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_TIMES.getName(), true,
+                "preserve access and modification times");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         try {
             return new GnuParser().parse(options, args);
         } catch (ParseException pe) {
             LOG.info("Unabel to parse commad line arguments for 
HdfsSnapshotReplicator " + pe.getMessage());
-            throw  new FalconException(pe.getMessage());
+            throw new FalconException(pe.getMessage());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicatorTest.java
----------------------------------------------------------------------
diff --git 
a/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicatorTest.java
 
b/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicatorTest.java
index fe7ced5..e66544d 100644
--- 
a/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicatorTest.java
+++ 
b/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicatorTest.java
@@ -25,6 +25,7 @@ import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import 
org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirrorProperties;
+import org.apache.falcon.util.ReplicationDistCpOption;
 import org.apache.falcon.snapshots.util.HdfsSnapshotUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -57,7 +58,7 @@ public class HdfsSnapshotReplicatorTest extends 
HdfsSnapshotReplicator {
 
     private FsPermission fsPermission = new FsPermission(FsAction.ALL, 
FsAction.ALL, FsAction.ALL);
 
-    private String[] args = {"--" + 
HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName(), "1",
+    private String[] args = {"--" + 
HdfsSnapshotMirrorProperties.MAX_MAPS.getName(), "1",
         "--" + HdfsSnapshotMirrorProperties.MAP_BANDWIDTH_IN_MB.getName(), 
"100",
         "--" + HdfsSnapshotMirrorProperties.SOURCE_NN.getName(), 
"hdfs://localhost:54136",
         "--" + HdfsSnapshotMirrorProperties.SOURCE_EXEC_URL.getName(), 
"localhost:8021",
@@ -67,6 +68,8 @@ public class HdfsSnapshotReplicatorTest extends 
HdfsSnapshotReplicator {
         "/apps/falcon/snapshot-replication/sourceDir/",
         "--" + HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_DIR.getName(),
         "/apps/falcon/snapshot-replication/targetDir/",
+        "--" + ReplicationDistCpOption.DISTCP_OPTION_IGNORE_ERRORS.getName(), 
"false",
+        "--" + ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_ACL.getName(), 
"false",
         "--" + HdfsSnapshotMirrorProperties.TDE_ENCRYPTION_ENABLED.getName(), 
"false",
         "--" + HdfsSnapshotMirrorProperties.SNAPSHOT_JOB_NAME.getName(), 
"snapshotJobName", };
 
@@ -87,7 +90,7 @@ public class HdfsSnapshotReplicatorTest extends 
HdfsSnapshotReplicator {
         miniDfs.allowSnapshot(targetDir);
 
         cmd = getCommand(args);
-        
Assert.assertEquals(cmd.getOptionValue(HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName()),
 "1");
+        
Assert.assertEquals(cmd.getOptionValue(HdfsSnapshotMirrorProperties.MAX_MAPS.getName()),
 "1");
         
Assert.assertEquals(cmd.getOptionValue(HdfsSnapshotMirrorProperties.MAP_BANDWIDTH_IN_MB.getName()),
 "100");
 
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/common/src/main/java/org/apache/falcon/util/DistCPOptionsUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/DistCPOptionsUtil.java 
b/common/src/main/java/org/apache/falcon/util/DistCPOptionsUtil.java
new file mode 100644
index 0000000..bbeb3e9
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/util/DistCPOptionsUtil.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.util;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Utility to set DistCp options.
+ */
+public final class DistCPOptionsUtil {
+    private static final String TDE_ENCRYPTION_ENABLED = 
"tdeEncryptionEnabled";
+
+    private DistCPOptionsUtil() {}
+
+    public static DistCpOptions getDistCpOptions(CommandLine cmd,
+                                                 List<Path> sourcePaths,
+                                                 Path targetPath,
+                                                 boolean isSnapshot,
+                                                 Configuration conf) throws 
FalconException, IOException {
+        DistCpOptions distcpOptions = new DistCpOptions(sourcePaths, 
targetPath);
+        distcpOptions.setBlocking(true);
+
+        
distcpOptions.setMaxMaps(Integer.parseInt(cmd.getOptionValue("maxMaps")));
+        
distcpOptions.setMapBandwidth(Integer.parseInt(cmd.getOptionValue("mapBandwidth")));
+
+        String tdeEncryptionEnabled = 
cmd.getOptionValue(TDE_ENCRYPTION_ENABLED);
+        if (StringUtils.isNotBlank(tdeEncryptionEnabled) && 
tdeEncryptionEnabled.equalsIgnoreCase(Boolean.TRUE.toString())) {
+            distcpOptions.setSyncFolder(true);
+            distcpOptions.setSkipCRC(true);
+        } else {
+            if (!isSnapshot) {
+                String overwrite = 
cmd.getOptionValue(ReplicationDistCpOption.DISTCP_OPTION_OVERWRITE.getName());
+                if (StringUtils.isNotEmpty(overwrite) && 
overwrite.equalsIgnoreCase(Boolean.TRUE.toString())) {
+                    
distcpOptions.setOverwrite(Boolean.parseBoolean(overwrite));
+                } else {
+                    distcpOptions.setSyncFolder(true);
+                }
+            }
+
+            String skipChecksum = 
cmd.getOptionValue(ReplicationDistCpOption.DISTCP_OPTION_SKIP_CHECKSUM.getName());
+            if (StringUtils.isNotEmpty(skipChecksum)) {
+                distcpOptions.setSkipCRC(Boolean.parseBoolean(skipChecksum));
+            }
+        }
+
+        if (isSnapshot) {
+            // Settings needed for Snapshot distCp.
+            distcpOptions.setSyncFolder(true);
+            distcpOptions.setDeleteMissing(true);
+        } else {
+            // Removing deleted files by default - FALCON-1844
+            String removeDeletedFiles = cmd.getOptionValue(
+                    
ReplicationDistCpOption.DISTCP_OPTION_REMOVE_DELETED_FILES.getName(), "true");
+            boolean deleteMissing = Boolean.parseBoolean(removeDeletedFiles);
+            distcpOptions.setDeleteMissing(deleteMissing);
+            if (deleteMissing) {
+                // DistCP will fail with InvalidInputException if 
deleteMissing is set to true and
+                // if targetPath does not exist. Create targetPath to avoid 
failures.
+                FileSystem fs = 
HadoopClientFactory.get().createProxiedFileSystem(targetPath.toUri(), conf);
+                if (!fs.exists(targetPath)) {
+                    fs.mkdirs(targetPath);
+                }
+            }
+        }
+
+        String ignoreErrors = 
cmd.getOptionValue(ReplicationDistCpOption.DISTCP_OPTION_IGNORE_ERRORS.getName());
+        if (StringUtils.isNotBlank(ignoreErrors)) {
+            
distcpOptions.setIgnoreFailures(Boolean.parseBoolean(ignoreErrors));
+        }
+
+        String preserveBlockSize = cmd.getOptionValue(
+                
ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_BLOCK_SIZE.getName());
+        if (StringUtils.isNotBlank(preserveBlockSize) && 
Boolean.parseBoolean(preserveBlockSize)) {
+            distcpOptions.preserve(DistCpOptions.FileAttribute.BLOCKSIZE);
+        }
+
+        String preserveReplicationCount = 
cmd.getOptionValue(ReplicationDistCpOption
+                .DISTCP_OPTION_PRESERVE_REPLICATION_NUMBER.getName());
+        if (StringUtils.isNotBlank(preserveReplicationCount) && 
Boolean.parseBoolean(preserveReplicationCount)) {
+            distcpOptions.preserve(DistCpOptions.FileAttribute.REPLICATION);
+        }
+
+        String preservePermission = cmd.getOptionValue(
+                
ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_PERMISSIONS.getName());
+        if (StringUtils.isNotBlank(preservePermission) && 
Boolean.parseBoolean(preservePermission)) {
+            distcpOptions.preserve(DistCpOptions.FileAttribute.PERMISSION);
+        }
+
+        String preserveUser = cmd.getOptionValue(
+                ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_USER.getName());
+        if (StringUtils.isNotBlank(preserveUser) && 
Boolean.parseBoolean(preserveUser)) {
+            distcpOptions.preserve(DistCpOptions.FileAttribute.USER);
+        }
+
+        String preserveGroup = cmd.getOptionValue(
+                
ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_GROUP.getName());
+        if (StringUtils.isNotBlank(preserveGroup) && 
Boolean.parseBoolean(preserveGroup)) {
+            distcpOptions.preserve(DistCpOptions.FileAttribute.GROUP);
+        }
+
+        String preserveChecksumType = cmd.getOptionValue(
+                
ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_CHECKSUM_TYPE.getName());
+        if (StringUtils.isNotBlank(preserveChecksumType) && 
Boolean.parseBoolean(preserveChecksumType)) {
+            distcpOptions.preserve(DistCpOptions.FileAttribute.CHECKSUMTYPE);
+        }
+
+        String preserveAcl = cmd.getOptionValue(
+                ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_ACL.getName());
+        if (StringUtils.isNotBlank(preserveAcl) && 
Boolean.parseBoolean(preserveAcl)) {
+            distcpOptions.preserve(DistCpOptions.FileAttribute.ACL);
+        }
+
+        String preserveXattr = cmd.getOptionValue(
+                
ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_XATTR.getName());
+        if (StringUtils.isNotBlank(preserveXattr) && 
Boolean.parseBoolean(preserveXattr)) {
+            distcpOptions.preserve(DistCpOptions.FileAttribute.XATTR);
+        }
+
+        String preserveTimes = cmd.getOptionValue(
+                
ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_TIMES.getName());
+        if (StringUtils.isNotBlank(preserveTimes) && 
Boolean.parseBoolean(preserveTimes)) {
+            distcpOptions.preserve(DistCpOptions.FileAttribute.TIMES);
+        }
+
+        return distcpOptions;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/common/src/main/java/org/apache/falcon/util/ReplicationDistCpOption.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/util/ReplicationDistCpOption.java 
b/common/src/main/java/org/apache/falcon/util/ReplicationDistCpOption.java
index a8b99bb..65f371c 100644
--- a/common/src/main/java/org/apache/falcon/util/ReplicationDistCpOption.java
+++ b/common/src/main/java/org/apache/falcon/util/ReplicationDistCpOption.java
@@ -29,7 +29,13 @@ public enum ReplicationDistCpOption {
     DISTCP_OPTION_REMOVE_DELETED_FILES("removeDeletedFiles"),
     DISTCP_OPTION_PRESERVE_BLOCK_SIZE("preserveBlockSize"),
     DISTCP_OPTION_PRESERVE_REPLICATION_NUMBER("preserveReplicationNumber"),
-    DISTCP_OPTION_PRESERVE_PERMISSIONS("preservePermission");
+    DISTCP_OPTION_PRESERVE_PERMISSIONS("preservePermission"),
+    DISTCP_OPTION_PRESERVE_USER("preserveUser"),
+    DISTCP_OPTION_PRESERVE_GROUP("preserveGroup"),
+    DISTCP_OPTION_PRESERVE_CHECKSUM_TYPE("preserveChecksumType"),
+    DISTCP_OPTION_PRESERVE_ACL("preserveAcl"),
+    DISTCP_OPTION_PRESERVE_XATTR("preserveXattr"),
+    DISTCP_OPTION_PRESERVE_TIMES("preserveTimes");
 
     private final String name;
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/docs/src/site/twiki/EntitySpecification.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/EntitySpecification.twiki 
b/docs/src/site/twiki/EntitySpecification.twiki
index 2615e5d..11d1e1b 100644
--- a/docs/src/site/twiki/EntitySpecification.twiki
+++ b/docs/src/site/twiki/EntitySpecification.twiki
@@ -399,6 +399,13 @@ permission indicates the permission.
         <property name="preserveBlockSize" value="true"/>
         <property name="preserveReplicationNumber" value="true"/>
         <property name="preservePermission" value="true"/>
+        <property name="preserveUser" value="true"/>
+        <property name="preserveGroup" value="false"/>
+        <property name="preserveChecksumType" value="true"/>
+        <property name="preserveAcl" value="true"/>
+        <property name="preserveXattr" value="true"/>
+        <property name="preserveTimes" value="true"/>
+        <property name="tdeEncryptionEnabled" value="false"/>
         <property name="order" value="LIFO"/>
     </properties>
 </verbatim>
@@ -414,7 +421,10 @@ used by each mapper during replication. "overwrite" 
represents overwrite destina
 bypassing checksum verification during replication. "removeDeletedFiles" 
represents deleting the files existing in the
 destination but not in source during replication. "preserveBlockSize" 
represents preserving block size during
 replication. "preserveReplicationNumber" represents preserving replication 
number during replication.
-"preservePermission" represents preserving permission during
+"preservePermission" represents preserving permission during replication. 
"preserveUser" represents preserving user during replication.
+"preserveGroup" represents preserving group during replication. 
"preserveChecksumType" represents preserving checksum type during replication.
+"preserveAcl" represents preserving ACL during replication. "preserveXattr" 
represents preserving Xattr during replication.
+"preserveTimes" represents preserving access and modification times during 
replication. "tdeEncryptionEnabled" if TDE is enabled.
 
 
 ---+++ Lifecycle

http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/extensions/src/main/java/org/apache/falcon/extensions/AbstractExtension.java
----------------------------------------------------------------------
diff --git 
a/extensions/src/main/java/org/apache/falcon/extensions/AbstractExtension.java 
b/extensions/src/main/java/org/apache/falcon/extensions/AbstractExtension.java
index 24bbb87..b160bb5 100644
--- 
a/extensions/src/main/java/org/apache/falcon/extensions/AbstractExtension.java
+++ 
b/extensions/src/main/java/org/apache/falcon/extensions/AbstractExtension.java
@@ -18,10 +18,12 @@
 
 package org.apache.falcon.extensions;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.extensions.mirroring.hdfs.HdfsMirroringExtension;
 import 
org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirroringExtension;
 import org.apache.falcon.extensions.mirroring.hive.HiveMirroringExtension;
+import org.apache.falcon.util.ReplicationDistCpOption;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -57,5 +59,15 @@ public abstract class AbstractExtension {
     public abstract void validate(final Properties extensionProperties) throws 
FalconException;
 
     public abstract Properties getAdditionalProperties(final Properties 
extensionProperties) throws FalconException;
+
+    public static void addAdditionalDistCPProperties(final Properties 
extensionProperties,
+                                                      final Properties 
additionalProperties) {
+        for (ReplicationDistCpOption distcpOption : 
ReplicationDistCpOption.values()) {
+            if (StringUtils.isBlank(
+                    extensionProperties.getProperty(distcpOption.getName()))) {
+                additionalProperties.put(distcpOption.getName(), "false");
+            }
+        }
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtension.java
----------------------------------------------------------------------
diff --git 
a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtension.java
 
b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtension.java
index f1acae2..ef26d81 100644
--- 
a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtension.java
+++ 
b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtension.java
@@ -53,15 +53,16 @@ public class HdfsMirroringExtension extends 
AbstractExtension {
         Properties additionalProperties = new Properties();
 
         // Add default properties if not passed
-        String distcpMaxMaps = 
extensionProperties.getProperty(HdfsMirroringExtensionProperties.MAX_MAPS.getName());
+        String distcpMaxMaps = extensionProperties.getProperty(
+                HdfsMirroringExtensionProperties.DISTCP_MAX_MAPS.getName());
         if (StringUtils.isBlank(distcpMaxMaps)) {
-            
additionalProperties.put(HdfsMirroringExtensionProperties.MAX_MAPS.getName(), 
"1");
+            
additionalProperties.put(HdfsMirroringExtensionProperties.DISTCP_MAX_MAPS.getName(),
 "1");
         }
 
         String distcpMapBandwidth = extensionProperties.getProperty(
-                
HdfsMirroringExtensionProperties.MAP_BANDWIDTH_IN_MB.getName());
+                
HdfsMirroringExtensionProperties.DISTCP_MAP_BANDWIDTH_IN_MB.getName());
         if (StringUtils.isBlank(distcpMapBandwidth)) {
-            
additionalProperties.put(HdfsMirroringExtensionProperties.MAP_BANDWIDTH_IN_MB.getName(),
 "100");
+            
additionalProperties.put(HdfsMirroringExtensionProperties.DISTCP_MAP_BANDWIDTH_IN_MB.getName(),
 "100");
         }
 
         // Construct fully qualified hdfs src path
@@ -105,6 +106,13 @@ public class HdfsMirroringExtension extends 
AbstractExtension {
         }
         
additionalProperties.put(HdfsMirroringExtensionProperties.TARGET_CLUSTER_FS_WRITE_ENDPOINT.getName(),
                 ClusterHelper.getStorageUrl(targetCluster));
+
+        if (StringUtils.isBlank(
+                
extensionProperties.getProperty(HdfsMirroringExtensionProperties.TDE_ENCRYPTION_ENABLED.getName())))
 {
+            
additionalProperties.put(HdfsMirroringExtensionProperties.TDE_ENCRYPTION_ENABLED.getName(),
 "false");
+        }
+
+        addAdditionalDistCPProperties(extensionProperties, 
additionalProperties);
         return additionalProperties;
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtensionProperties.java
----------------------------------------------------------------------
diff --git 
a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtensionProperties.java
 
b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtensionProperties.java
index 7d24b45..52ae0c0 100644
--- 
a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtensionProperties.java
+++ 
b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtensionProperties.java
@@ -28,9 +28,10 @@ public enum HdfsMirroringExtensionProperties {
     TARGET_DIR("targetDir", "Location on target cluster for replication"),
     TARGET_CLUSTER("targetCluster", "Target cluster"),
     TARGET_CLUSTER_FS_WRITE_ENDPOINT("targetClusterFS", "Target cluster end 
point", false),
-    MAX_MAPS("distcpMaxMaps", "Maximum number of maps used during 
replication", false),
-    MAP_BANDWIDTH_IN_MB("distcpMapBandwidth", "Bandwidth in MB/s used by each 
mapper during replication",
-            false);
+    DISTCP_MAX_MAPS("distcpMaxMaps", "Maximum number of maps used during 
replication", false),
+    DISTCP_MAP_BANDWIDTH_IN_MB("distcpMapBandwidth", "Bandwidth in MB/s used 
by each mapper during replication",
+            false),
+    TDE_ENCRYPTION_ENABLED("tdeEncryptionEnabled", "Set to true if TDE 
encryption is enabled", false);
 
     private final String name;
     private final String description;

http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirrorProperties.java
----------------------------------------------------------------------
diff --git 
a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirrorProperties.java
 
b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirrorProperties.java
index f179896..ad707c8 100644
--- 
a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirrorProperties.java
+++ 
b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirrorProperties.java
@@ -48,8 +48,8 @@ public enum HdfsSnapshotMirrorProperties {
     TARGET_SNAPSHOT_RETENTION_NUMBER("targetSnapshotRetentionNumber",
             "Number of latest target snapshots to retain on source", true),
 
-    DISTCP_MAX_MAPS("distcpMaxMaps", "Maximum number of maps used during 
distcp", false),
-    MAP_BANDWIDTH_IN_MB("distcpMapBandwidth", "Bandwidth in MB/s used by each 
mapper during replication", false),
+    MAX_MAPS("maxMaps", "Maximum number of maps used during distcp", false),
+    MAP_BANDWIDTH_IN_MB("mapBandwidth", "Bandwidth in MB/s used by each mapper 
during replication", false),
 
     TDE_ENCRYPTION_ENABLED("tdeEncryptionEnabled", "Is TDE encryption enabled 
on source and target", false),
     SNAPSHOT_JOB_NAME("snapshotJobName", "Name of snapshot based mirror job", 
false);

http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirroringExtension.java
----------------------------------------------------------------------
diff --git 
a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirroringExtension.java
 
b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirroringExtension.java
index 09cce3b..16b087d 100644
--- 
a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirroringExtension.java
+++ 
b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirroringExtension.java
@@ -133,9 +133,9 @@ public class HdfsSnapshotMirroringExtension extends 
AbstractExtension {
         Properties additionalProperties = new Properties();
 
         // Add default properties if not passed
-        String distcpMaxMaps = 
extensionProperties.getProperty(HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName());
+        String distcpMaxMaps = 
extensionProperties.getProperty(HdfsSnapshotMirrorProperties.MAX_MAPS.getName());
         if (StringUtils.isBlank(distcpMaxMaps)) {
-            
additionalProperties.put(HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName(),
 "1");
+            
additionalProperties.put(HdfsSnapshotMirrorProperties.MAX_MAPS.getName(), "1");
         }
 
         String distcpMapBandwidth = extensionProperties.getProperty(
@@ -223,6 +223,8 @@ public class HdfsSnapshotMirroringExtension extends 
AbstractExtension {
             throw new FalconException("Cluster entity "
                     + ExtensionProperties.CLUSTER_NAME.getName() + " not 
found");
         }
+
+        addAdditionalDistCPProperties(extensionProperties, 
additionalProperties);
         return additionalProperties;
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java
----------------------------------------------------------------------
diff --git 
a/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java 
b/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java
index 21e78d8..3386a31 100644
--- a/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java
+++ b/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java
@@ -136,7 +136,7 @@ public class ExtensionTest extends 
AbstractTestExtensionStore {
                 RETENTION_NUM);
         
properties.setProperty(HdfsSnapshotMirrorProperties.TARGET_NN.getName(),
                 NN_URI);
-        
properties.setProperty(HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName(),
+        properties.setProperty(HdfsSnapshotMirrorProperties.MAX_MAPS.getName(),
                 "5");
         
properties.setProperty(HdfsSnapshotMirrorProperties.MAP_BANDWIDTH_IN_MB.getName(),
                 "100");

http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
 
b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
index e45dfc5..cfcc698 100644
--- 
a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
+++ 
b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
@@ -58,6 +58,7 @@ public class FSReplicationWorkflowBuilder extends 
FeedReplicationWorkflowBuilder
         addHDFSServersConfig(replication, src, target);
         addAdditionalReplicationProperties(replication);
         enableCounters(replication);
+        enableTDE(replication);
         addTransition(replication, SUCCESS_POSTPROCESS_ACTION_NAME, 
FAIL_POSTPROCESS_ACTION_NAME);
         workflow.getDecisionOrForkOrJoin().add(replication);
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
 
b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
index 010446b..db647aa 100644
--- 
a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
+++ 
b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
@@ -49,6 +49,7 @@ public abstract class FeedReplicationWorkflowBuilder extends 
OozieOrchestrationW
     private static final String MR_MAX_MAPS = "maxMaps";
     private static final String MR_MAP_BANDWIDTH = "mapBandwidth";
     private static final String REPLICATION_JOB_COUNTER = "job.counter";
+    private static final String TDE_ENCRYPTION_ENABLED = 
"tdeEncryptionEnabled";
 
     public FeedReplicationWorkflowBuilder(Feed entity) {
         super(entity, LifeCycle.REPLICATION);
@@ -58,7 +59,7 @@ public abstract class FeedReplicationWorkflowBuilder extends 
OozieOrchestrationW
         if (entity.getProperties() != null) {
             List<Property> propertyList = 
entity.getProperties().getProperties();
             for (Property prop : propertyList) {
-                if (prop.getName().equals(REPLICATION_JOB_COUNTER) && 
"true".equalsIgnoreCase(prop.getValue())) {
+                if (prop.getName().equals(REPLICATION_JOB_COUNTER) && "true" 
.equalsIgnoreCase(prop.getValue())) {
                     return true;
                 }
             }
@@ -66,7 +67,8 @@ public abstract class FeedReplicationWorkflowBuilder extends 
OozieOrchestrationW
         return false;
     }
 
-    @Override public Properties build(Cluster cluster, Path buildPath) throws 
FalconException {
+    @Override
+    public Properties build(Cluster cluster, Path buildPath) throws 
FalconException {
         Cluster srcCluster = ConfigurationStore.get().get(EntityType.CLUSTER, 
buildPath.getName());
 
         WORKFLOWAPP workflow = getWorkflow(srcCluster, cluster);
@@ -119,6 +121,15 @@ public abstract class FeedReplicationWorkflowBuilder 
extends OozieOrchestrationW
         return action;
     }
 
+    protected ACTION enableTDE(ACTION action) throws FalconException {
+        if (isTDEEnabled()) {
+            List<String> args = action.getJava().getArg();
+            args.add("-tdeEncryptionEnabled");
+            args.add("true");
+        }
+        return action;
+    }
+
     protected abstract WORKFLOWAPP getWorkflow(Cluster src, Cluster target) 
throws FalconException;
 
     @Override
@@ -133,4 +144,9 @@ public abstract class FeedReplicationWorkflowBuilder 
extends OozieOrchestrationW
     private String getDefaultMapBandwidth() {
         return 
RuntimeProperties.get().getProperty("falcon.replication.workflow.mapbandwidth", 
"100");
     }
+
+    private boolean isTDEEnabled() {
+        String tdeEncryptionEnabled = FeedHelper.getPropertyValue(entity, 
TDE_ENCRYPTION_ENABLED);
+        return "true" .equalsIgnoreCase(tdeEncryptionEnabled);
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
----------------------------------------------------------------------
diff --git 
a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java 
b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
index 0906bd5..9c2c522 100644
--- 
a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
+++ 
b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
@@ -30,6 +30,7 @@ import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.job.JobCountersHandler;
 import org.apache.falcon.job.JobType;
 import org.apache.falcon.job.JobCounters;
+import org.apache.falcon.util.DistCPOptionsUtil;
 import org.apache.falcon.util.ReplicationDistCpOption;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
@@ -57,6 +58,7 @@ public class FeedReplicator extends Configured implements 
Tool {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(FeedReplicator.class);
     private static final String IGNORE = "IGNORE";
+    private static final String TDE_ENCRYPTION_ENABLED = 
"tdeEncryptionEnabled";
 
     public static void main(String[] args) throws Exception {
         ToolRunner.run(new Configuration(), new FeedReplicator(), args);
@@ -177,10 +179,44 @@ public class FeedReplicator extends Configured implements 
Tool {
         opt.setRequired(false);
         options.addOption(opt);
 
+        opt = new 
Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_USER.getName(), true,
+                "preserve user");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new 
Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_GROUP.getName(), true,
+                "preserve group");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new 
Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_CHECKSUM_TYPE.getName(), 
true,
+                "preserve checksum type");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new 
Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_ACL.getName(), true,
+                "preserve ACL");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new 
Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_XATTR.getName(), true,
+                "preserve XATTR");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new 
Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_TIMES.getName(), true,
+                "preserve access and modification times");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         opt = new Option("counterLogDir", true, "log directory to store job 
counter file");
         opt.setRequired(false);
         options.addOption(opt);
 
+        opt = new Option(TDE_ENCRYPTION_ENABLED, true, "TDE encryption 
enabled");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         return new GnuParser().parse(options, args);
     }
 
@@ -190,61 +226,7 @@ public class FeedReplicator extends Configured implements 
Tool {
         String targetPathString = cmd.getOptionValue("targetPath").trim();
         Path targetPath = new Path(targetPathString);
 
-        DistCpOptions distcpOptions = new DistCpOptions(srcPaths, targetPath);
-        distcpOptions.setBlocking(true);
-        
distcpOptions.setMaxMaps(Integer.parseInt(cmd.getOptionValue("maxMaps")));
-        
distcpOptions.setMapBandwidth(Integer.parseInt(cmd.getOptionValue("mapBandwidth")));
-
-        String overwrite = 
cmd.getOptionValue(ReplicationDistCpOption.DISTCP_OPTION_OVERWRITE.getName());
-        if (StringUtils.isNotEmpty(overwrite) && 
overwrite.equalsIgnoreCase(Boolean.TRUE.toString())) {
-            distcpOptions.setOverwrite(Boolean.parseBoolean(overwrite));
-        } else {
-            distcpOptions.setSyncFolder(true);
-        }
-
-        String ignoreErrors = 
cmd.getOptionValue(ReplicationDistCpOption.DISTCP_OPTION_IGNORE_ERRORS.getName());
-        if (StringUtils.isNotEmpty(ignoreErrors)) {
-            
distcpOptions.setIgnoreFailures(Boolean.parseBoolean(ignoreErrors));
-        }
-
-        String skipChecksum = 
cmd.getOptionValue(ReplicationDistCpOption.DISTCP_OPTION_SKIP_CHECKSUM.getName());
-        if (StringUtils.isNotEmpty(skipChecksum)) {
-            distcpOptions.setSkipCRC(Boolean.parseBoolean(skipChecksum));
-        }
-
-        // Removing deleted files by default - FALCON-1844
-        String removeDeletedFiles = cmd.getOptionValue(
-                
ReplicationDistCpOption.DISTCP_OPTION_REMOVE_DELETED_FILES.getName(), "true");
-        boolean deleteMissing = Boolean.parseBoolean(removeDeletedFiles);
-        distcpOptions.setDeleteMissing(deleteMissing);
-        if (deleteMissing) {
-            // DistCP will fail with InvalidInputException if deleteMissing is 
set to true and
-            // if targetPath does not exist. Create targetPath to avoid 
failures.
-            FileSystem fs = 
HadoopClientFactory.get().createProxiedFileSystem(targetPath.toUri(), 
getConf());
-            if (!fs.exists(targetPath)) {
-                fs.mkdirs(targetPath);
-            }
-        }
-
-        String preserveBlockSize = cmd.getOptionValue(
-                
ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_BLOCK_SIZE.getName());
-        if (preserveBlockSize != null && 
Boolean.parseBoolean(preserveBlockSize)) {
-            distcpOptions.preserve(DistCpOptions.FileAttribute.BLOCKSIZE);
-        }
-
-        String preserveReplicationCount = 
cmd.getOptionValue(ReplicationDistCpOption
-                .DISTCP_OPTION_PRESERVE_REPLICATION_NUMBER.getName());
-        if (preserveReplicationCount != null && 
Boolean.parseBoolean(preserveReplicationCount)) {
-            distcpOptions.preserve(DistCpOptions.FileAttribute.REPLICATION);
-        }
-
-        String preservePermission = cmd.getOptionValue(
-                
ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_PERMISSIONS.getName());
-        if (preservePermission != null && 
Boolean.parseBoolean(preservePermission)) {
-            distcpOptions.preserve(DistCpOptions.FileAttribute.PERMISSION);
-        }
-
-        return distcpOptions;
+        return DistCPOptionsUtil.getDistCpOptions(cmd, srcPaths, targetPath, 
false, getConf());
     }
 
     private List<Path> getPaths(String[] paths) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java
----------------------------------------------------------------------
diff --git 
a/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java
 
b/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java
index 2662ade..b9b383d 100644
--- 
a/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java
+++ 
b/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java
@@ -84,7 +84,13 @@ public class FeedReplicatorTest {
          * <arg>-removeDeletedFiles</arg><arg>true</arg>
          * <arg>-preserveBlockSize</arg><arg>false</arg>
          * <arg>-preserveReplicationCount</arg><arg>true</arg>
-         * <arg>-preserveBlockSize</arg><arg>false</arg>
+         * <arg>-preservePermission</arg><arg>false</arg>
+         * <arg>-preserveUser</arg><arg>true</arg>
+         * <arg>-preserveGroup</arg><arg>false</arg>
+         * <arg>-preserveChecksumType</arg><arg>false</arg>
+         * <arg>-preserveAcl</arg><arg>true</arg>
+         * <arg>-preserveXattr</arg><arg>false</arg>
+         * <arg>-preserveTimes</arg><arg>false</arg>
          */
         final String[] optionalArgs = {
             "true",
@@ -100,6 +106,12 @@ public class FeedReplicatorTest {
             "-preserveBlockSize", "false",
             "-preserveReplicationNumber", "true",
             "-preservePermission", "false",
+            "-preserveUser", "true",
+            "-preserveGroup", "false",
+            "-preserveChecksumType", "false",
+            "-preserveAcl", "true",
+            "-preserveXattr", "false",
+            "-preserveTimes", "false",
         };
 
         FeedReplicator replicator = new FeedReplicator();
@@ -128,5 +140,11 @@ public class FeedReplicatorTest {
         
Assert.assertFalse(options.shouldPreserve(DistCpOptions.FileAttribute.BLOCKSIZE));
         
Assert.assertTrue(options.shouldPreserve(DistCpOptions.FileAttribute.REPLICATION));
         
Assert.assertFalse(options.shouldPreserve(DistCpOptions.FileAttribute.PERMISSION));
+        
Assert.assertTrue(options.shouldPreserve(DistCpOptions.FileAttribute.USER));
+        
Assert.assertFalse(options.shouldPreserve(DistCpOptions.FileAttribute.GROUP));
+        
Assert.assertFalse(options.shouldPreserve(DistCpOptions.FileAttribute.CHECKSUMTYPE));
+        
Assert.assertTrue(options.shouldPreserve(DistCpOptions.FileAttribute.ACL));
+        
Assert.assertFalse(options.shouldPreserve(DistCpOptions.FileAttribute.XATTR));
+        
Assert.assertFalse(options.shouldPreserve(DistCpOptions.FileAttribute.TIMES));
     }
 }

Reply via email to