Repository: falcon Updated Branches: refs/heads/master b0efc6fbf -> 3efffd4d9
FALCON-1944 Ability to provide additional DistCP options for mirroring extensions and feed replication Author: Sowmya Ramesh <[email protected]> Reviewers: "Venkat Ranganathan <[email protected]>, Balu Vellanki <[email protected]>" Closes #251 from sowmyaramesh/FALCON-1944 Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/3efffd4d Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/3efffd4d Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/3efffd4d Branch: refs/heads/master Commit: 3efffd4d946c0a4a8e6885f86e1da3107de52e00 Parents: b0efc6f Author: Sowmya Ramesh <[email protected]> Authored: Wed Aug 3 15:09:29 2016 -0700 Committer: bvellanki <[email protected]> Committed: Wed Aug 3 15:09:29 2016 -0700 ---------------------------------------------------------------------- .../main/META/hdfs-mirroring-properties.json | 84 ++++++++++ .../runtime/hdfs-mirroring-workflow.xml | 28 ++++ .../hdfs-snapshot-mirroring-properties.json | 78 +++++++++- .../hdfs-snapshot-mirroring-workflow.xml | 36 ++++- .../replication/HdfsSnapshotReplicator.java | 99 +++++++++--- .../replication/HdfsSnapshotReplicatorTest.java | 7 +- .../apache/falcon/util/DistCPOptionsUtil.java | 153 +++++++++++++++++++ .../falcon/util/ReplicationDistCpOption.java | 8 +- docs/src/site/twiki/EntitySpecification.twiki | 12 +- .../falcon/extensions/AbstractExtension.java | 12 ++ .../mirroring/hdfs/HdfsMirroringExtension.java | 16 +- .../hdfs/HdfsMirroringExtensionProperties.java | 7 +- .../HdfsSnapshotMirrorProperties.java | 4 +- .../HdfsSnapshotMirroringExtension.java | 6 +- .../apache/falcon/extensions/ExtensionTest.java | 2 +- .../feed/FSReplicationWorkflowBuilder.java | 1 + .../feed/FeedReplicationWorkflowBuilder.java | 20 ++- .../falcon/replication/FeedReplicator.java | 92 +++++------ .../falcon/replication/FeedReplicatorTest.java | 20 ++- 19 files changed, 580 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/addons/extensions/hdfs-mirroring/src/main/META/hdfs-mirroring-properties.json ---------------------------------------------------------------------- diff --git a/addons/extensions/hdfs-mirroring/src/main/META/hdfs-mirroring-properties.json b/addons/extensions/hdfs-mirroring/src/main/META/hdfs-mirroring-properties.json index f1b4775..9d4a425 100644 --- a/addons/extensions/hdfs-mirroring/src/main/META/hdfs-mirroring-properties.json +++ b/addons/extensions/hdfs-mirroring/src/main/META/hdfs-mirroring-properties.json @@ -122,6 +122,90 @@ "example":"100" }, { + "propertyName":"tdeEncryptionEnabled", + "required":false, + "description":"Set this flag to true if TDE encryption is enabled on source and target. Default value is false", + "example":"true" + }, + { + "propertyName":"overwrite", + "required":false, + "description":"DitcCP overwrites target files even if they exist at the source, or have the same contents", + "example":"true" + }, + { + "propertyName":"ignoreErrors", + "required":false, + "description":"Ignore failures during DistCp", + "example":"true" + }, + { + "propertyName":"skipChecksum", + "required":false, + "description":"Skip checksum errors during DistCP", + "example":"true" + }, + { + "propertyName":"removeDeletedFiles", + "required":false, + "description":"DistCP deletes the files existing in the dst but not in src", + "example":"true" + }, + { + "propertyName":"preserveBlockSize", + "required":false, + "description":"Preserve block size during DistCP", + "example":"true" + }, + { + "propertyName":"preserveReplicationNumber", + "required":false, + "description":"Preserve replication number during DistCP", + "example":"false" + }, + { + "propertyName":"preservePermission", + "required":false, + "description":"Preserve permission during DistCP", + "example":"true" + }, + { + "propertyName":"preserveUser", + "required":false, + "description":"Preserve user during DistCP", + "example":"true" + }, + { + "propertyName":"preserveGroup", + "required":false, + "description":"Preserve group during DistCP", + "example":"true" + }, + { + "propertyName":"preserveChecksumType", + "required":false, + "description":"Preserve checksum type during DistCP", + "example":"true" + }, + { + "propertyName":"preserveAcl", + "required":false, + "description":"Preserve ACL during DistCP", + "example":"false" + }, + { + "propertyName":"preserveXattr", + "required":false, + "description":"Preserve Xattr during DistCP", + "example":"true" + }, + { + "propertyName":"preserveTimes", + "required":false, + "description":"Preserve access and modification times during DistCP", + "example":"true" + }, + { "propertyName":"jobNotificationType", "required":false, "description":"Email Notification for Falcon instance completion", http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/addons/extensions/hdfs-mirroring/src/main/resources/runtime/hdfs-mirroring-workflow.xml ---------------------------------------------------------------------- diff --git a/addons/extensions/hdfs-mirroring/src/main/resources/runtime/hdfs-mirroring-workflow.xml b/addons/extensions/hdfs-mirroring/src/main/resources/runtime/hdfs-mirroring-workflow.xml index c0504fb..7929dd7 100644 --- a/addons/extensions/hdfs-mirroring/src/main/resources/runtime/hdfs-mirroring-workflow.xml +++ b/addons/extensions/hdfs-mirroring/src/main/resources/runtime/hdfs-mirroring-workflow.xml @@ -63,6 +63,32 @@ <arg>${distcpMaxMaps}</arg> <arg>-mapBandwidth</arg> <arg>${distcpMapBandwidth}</arg> + <arg>-overwrite</arg> + <arg>${overwrite}</arg> + <arg>-ignoreErrors</arg> + <arg>${ignoreErrors}</arg> + <arg>-skipChecksum</arg> + <arg>${skipChecksum}</arg> + <arg>-removeDeletedFiles</arg> + <arg>${removeDeletedFiles}</arg> + <arg>-preserveBlockSize</arg> + <arg>${preserveBlockSize}</arg> + <arg>-preserveReplicationNumber</arg> + <arg>${preserveReplicationNumber}</arg> + <arg>-preservePermission</arg> + <arg>${preservePermission}</arg> + <arg>-preserveUser</arg> + <arg>${preserveUser}</arg> + <arg>-preserveGroup</arg> + <arg>${preserveGroup}</arg> + <arg>-preserveChecksumType</arg> + <arg>${preserveChecksumType}</arg> + <arg>-preserveAcl</arg> + <arg>${preserveAcl}</arg> + <arg>-preserveXattr</arg> + <arg>${preserveXattr}</arg> + <arg>-preserveTimes</arg> + <arg>${preserveTimes}</arg> <arg>-sourcePaths</arg> <arg>${sourceDir}</arg> <arg>-targetPath</arg> @@ -71,6 +97,8 @@ <arg>FILESYSTEM</arg> <arg>-availabilityFlag</arg> <arg>${availabilityFlag == 'NA' ? "NA" : availabilityFlag}</arg> + <arg>-tdeEncryptionEnabled</arg> + <arg>${tdeEncryptionEnabled}</arg> <arg>-counterLogDir</arg> <arg>${logDir}/job-${nominalTime}/${srcClusterName == 'NA' ? '' : srcClusterName}</arg> </java> http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/addons/extensions/hdfs-snapshot-mirroring/src/main/META/hdfs-snapshot-mirroring-properties.json ---------------------------------------------------------------------- diff --git a/addons/extensions/hdfs-snapshot-mirroring/src/main/META/hdfs-snapshot-mirroring-properties.json b/addons/extensions/hdfs-snapshot-mirroring/src/main/META/hdfs-snapshot-mirroring-properties.json index 46554c1..430f1ec 100644 --- a/addons/extensions/hdfs-snapshot-mirroring/src/main/META/hdfs-snapshot-mirroring-properties.json +++ b/addons/extensions/hdfs-snapshot-mirroring/src/main/META/hdfs-snapshot-mirroring-properties.json @@ -140,13 +140,13 @@ "example":"10" }, { - "propertyName":"distcpMaxMaps", + "propertyName":"maxMaps", "required":false, "description":"Maximum number of mappers for DistCP", "example":"1" }, { - "propertyName":"distcpMapBandwidth", + "propertyName":"mapBandwidth", "required":false, "description":"Bandwidth in MB for each mapper in DistCP", "example":"100" @@ -158,6 +158,78 @@ "example":"false" }, { + "propertyName":"ignoreErrors", + "required":false, + "description":"Ignore failures during DistCp", + "example":"true" + }, + { + "propertyName":"skipChecksum", + "required":false, + "description":"Skip checksum errors during DistCP", + "example":"true" + }, + { + "propertyName":"removeDeletedFiles", + "required":false, + "description":"DistCP deletes the files existing in the dst but not in src", + "example":"true" + }, + { + "propertyName":"preserveBlockSize", + "required":false, + "description":"Preserve block size during DistCP", + "example":"true" + }, + { + "propertyName":"preserveReplicationNumber", + "required":false, + "description":"Preserve replication number during DistCP", + "example":"false" + }, + { + "propertyName":"preservePermission", + "required":false, + "description":"Preserve permission during DistCP", + "example":"true" + }, + { + "propertyName":"preserveUser", + "required":false, + "description":"Preserve user during DistCP", + "example":"true" + }, + { + "propertyName":"preserveGroup", + "required":false, + "description":"Preserve group during DistCP", + "example":"true" + }, + { + "propertyName":"preserveChecksumType", + "required":false, + "description":"Preserve checksum type during DistCP", + "example":"true" + }, + { + "propertyName":"preserveAcl", + "required":false, + "description":"Preserve ACL during DistCP", + "example":"false" + }, + { + "propertyName":"preserveXattr", + "required":false, + "description":"Preserve Xattr during DistCP", + "example":"true" + }, + { + "propertyName":"preserveTimes", + "required":false, + "description":"Preserve access and modification times during DistCP", + "example":"true" + }, + { "propertyName":"jobNotificationType", "required":false, "description":"Email Notification for Falcon instance completion", @@ -170,4 +242,4 @@ "example":"[email protected], [email protected]" } ] -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/addons/extensions/hdfs-snapshot-mirroring/src/main/resources/runtime/hdfs-snapshot-mirroring-workflow.xml ---------------------------------------------------------------------- diff --git a/addons/extensions/hdfs-snapshot-mirroring/src/main/resources/runtime/hdfs-snapshot-mirroring-workflow.xml b/addons/extensions/hdfs-snapshot-mirroring/src/main/resources/runtime/hdfs-snapshot-mirroring-workflow.xml index c735167..899f6b0 100644 --- a/addons/extensions/hdfs-snapshot-mirroring/src/main/resources/runtime/hdfs-snapshot-mirroring-workflow.xml +++ b/addons/extensions/hdfs-snapshot-mirroring/src/main/resources/runtime/hdfs-snapshot-mirroring-workflow.xml @@ -59,10 +59,36 @@ <main-class>org.apache.falcon.snapshots.replication.HdfsSnapshotReplicator</main-class> <arg>-Dmapred.job.queue.name=${queueName}</arg> <arg>-Dmapred.job.priority=${jobPriority}</arg> - <arg>-distcpMaxMaps</arg> - <arg>${distcpMaxMaps}</arg> - <arg>-distcpMapBandwidth</arg> - <arg>${distcpMapBandwidth}</arg> + <arg>-maxMaps</arg> + <arg>${maxMaps}</arg> + <arg>-mapBandwidth</arg> + <arg>${mapBandwidth}</arg> + <arg>-overwrite</arg> + <arg>${overwrite}</arg> + <arg>-ignoreErrors</arg> + <arg>${ignoreErrors}</arg> + <arg>-skipChecksum</arg> + <arg>${skipChecksum}</arg> + <arg>-removeDeletedFiles</arg> + <arg>${removeDeletedFiles}</arg> + <arg>-preserveBlockSize</arg> + <arg>${preserveBlockSize}</arg> + <arg>-preserveReplicationNumber</arg> + <arg>${preserveReplicationNumber}</arg> + <arg>-preservePermission</arg> + <arg>${preservePermission}</arg> + <arg>-preserveUser</arg> + <arg>${preserveUser}</arg> + <arg>-preserveGroup</arg> + <arg>${preserveGroup}</arg> + <arg>-preserveChecksumType</arg> + <arg>${preserveChecksumType}</arg> + <arg>-preserveAcl</arg> + <arg>${preserveAcl}</arg> + <arg>-preserveXattr</arg> + <arg>${preserveXattr}</arg> + <arg>-preserveTimes</arg> + <arg>${preserveTimes}</arg> <arg>-sourceNN</arg> <arg>${sourceNN}</arg> <arg>-sourceExecUrl</arg> @@ -169,4 +195,4 @@ </message> </kill> <end name="end"/> -</workflow-app> \ No newline at end of file +</workflow-app> http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicator.java ---------------------------------------------------------------------- diff --git a/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicator.java b/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicator.java index 6f5defe..cb597fe 100644 --- a/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicator.java +++ b/addons/hdfs-snapshot-mirroring/src/main/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicator.java @@ -28,6 +28,8 @@ import org.apache.falcon.FalconException; import org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirrorProperties; import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.snapshots.util.HdfsSnapshotUtil; +import org.apache.falcon.util.DistCPOptionsUtil; +import org.apache.falcon.util.ReplicationDistCpOption; import org.apache.falcon.workflow.util.OozieActionConfigurationHelper; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; @@ -138,17 +140,14 @@ public class HdfsSnapshotReplicator extends Configured implements Tool { private DistCpOptions getDistCpOptions(String sourceStorageUrl, String targetStorageUrl, DistributedFileSystem sourceFs, DistributedFileSystem targetFs, String sourceDir, String targetDir, - String currentSnapshotName) throws FalconException { + String currentSnapshotName) throws FalconException, IOException { - List<Path> sourceUris=new ArrayList<Path>(); + List<Path> sourceUris = new ArrayList<>(); sourceUris.add(new Path(getStagingUri(sourceStorageUrl, sourceDir))); - DistCpOptions distcpOptions = new DistCpOptions(sourceUris, - new Path(getStagingUri(targetStorageUrl, targetDir))); - // Settings needed for Snapshot distCp. - distcpOptions.setSyncFolder(true); - distcpOptions.setDeleteMissing(true); + DistCpOptions distcpOptions = DistCPOptionsUtil.getDistCpOptions(cmd, sourceUris, + new Path(getStagingUri(targetStorageUrl, targetDir)), true, null); // Use snapshot diff if two snapshots exist. Else treat it as simple distCp. // get latest replicated snapshot. @@ -157,24 +156,14 @@ public class HdfsSnapshotReplicator extends Configured implements Tool { distcpOptions.setUseDiff(true, replicatedSnapshotName, currentSnapshotName); } - if (Boolean.valueOf(cmd.getOptionValue(HdfsSnapshotMirrorProperties.TDE_ENCRYPTION_ENABLED.getName()))) { - // skipCRCCheck and update enabled - distcpOptions.setSkipCRC(true); - } - - distcpOptions.setBlocking(true); - distcpOptions.setMaxMaps( - Integer.parseInt(cmd.getOptionValue(HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName()))); - distcpOptions.setMapBandwidth( - Integer.parseInt(cmd.getOptionValue(HdfsSnapshotMirrorProperties.MAP_BANDWIDTH_IN_MB.getName()))); return distcpOptions; } private String findLatestReplicatedSnapshot(DistributedFileSystem sourceFs, DistributedFileSystem targetFs, - String sourceDir, String targetDir) throws FalconException { + String sourceDir, String targetDir) throws FalconException { try { FileStatus[] sourceSnapshots = sourceFs.listStatus(new Path(getSnapshotDir(sourceDir))); - Set<String> sourceSnapshotNames = new HashSet<String>(); + Set<String> sourceSnapshotNames = new HashSet<>(); for (FileStatus snapshot : sourceSnapshots) { sourceSnapshotNames.add(snapshot.getPath().getName()); } @@ -190,8 +179,8 @@ public class HdfsSnapshotReplicator extends Configured implements Tool { }); // get most recent snapshot name that exists in source. - for (int i = 0; i < targetSnapshots.length; i++) { - String name = targetSnapshots[i].getPath().getName(); + for (FileStatus targetSnapshot : targetSnapshots) { + String name = targetSnapshot.getPath().getName(); if (sourceSnapshotNames.contains(name)) { return name; } @@ -219,7 +208,7 @@ public class HdfsSnapshotReplicator extends Configured implements Tool { protected CommandLine getCommand(String[] args) throws FalconException { Options options = new Options(); - Option opt = new Option(HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName(), + Option opt = new Option(HdfsSnapshotMirrorProperties.MAX_MAPS.getName(), true, "max number of maps to use for distcp"); opt.setRequired(true); options.addOption(opt); @@ -270,11 +259,75 @@ public class HdfsSnapshotReplicator extends Configured implements Tool { opt.setRequired(true); options.addOption(opt); + opt = new Option(ReplicationDistCpOption.DISTCP_OPTION_OVERWRITE.getName(), true, "option to force overwrite"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option(ReplicationDistCpOption.DISTCP_OPTION_IGNORE_ERRORS.getName(), true, "abort on error"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option(ReplicationDistCpOption.DISTCP_OPTION_SKIP_CHECKSUM.getName(), true, "skip checksums"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option(ReplicationDistCpOption.DISTCP_OPTION_REMOVE_DELETED_FILES.getName(), true, + "remove deleted files - should there be files in the target directory that" + + "were removed from the source directory" + ); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_BLOCK_SIZE.getName(), true, + "preserve block size"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_REPLICATION_NUMBER.getName(), true, + "preserve replication count"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_PERMISSIONS.getName(), true, + "preserve permissions"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_USER.getName(), true, + "preserve user"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_GROUP.getName(), true, + "preserve group"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_CHECKSUM_TYPE.getName(), true, + "preserve checksum type"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_ACL.getName(), true, + "preserve ACL"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_XATTR.getName(), true, + "preserve XATTR"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_TIMES.getName(), true, + "preserve access and modification times"); + opt.setRequired(false); + options.addOption(opt); + try { return new GnuParser().parse(options, args); } catch (ParseException pe) { LOG.info("Unabel to parse commad line arguments for HdfsSnapshotReplicator " + pe.getMessage()); - throw new FalconException(pe.getMessage()); + throw new FalconException(pe.getMessage()); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicatorTest.java ---------------------------------------------------------------------- diff --git a/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicatorTest.java b/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicatorTest.java index fe7ced5..e66544d 100644 --- a/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicatorTest.java +++ b/addons/hdfs-snapshot-mirroring/src/test/java/org/apache/falcon/snapshots/replication/HdfsSnapshotReplicatorTest.java @@ -25,6 +25,7 @@ import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirrorProperties; +import org.apache.falcon.util.ReplicationDistCpOption; import org.apache.falcon.snapshots.util.HdfsSnapshotUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -57,7 +58,7 @@ public class HdfsSnapshotReplicatorTest extends HdfsSnapshotReplicator { private FsPermission fsPermission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL); - private String[] args = {"--" + HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName(), "1", + private String[] args = {"--" + HdfsSnapshotMirrorProperties.MAX_MAPS.getName(), "1", "--" + HdfsSnapshotMirrorProperties.MAP_BANDWIDTH_IN_MB.getName(), "100", "--" + HdfsSnapshotMirrorProperties.SOURCE_NN.getName(), "hdfs://localhost:54136", "--" + HdfsSnapshotMirrorProperties.SOURCE_EXEC_URL.getName(), "localhost:8021", @@ -67,6 +68,8 @@ public class HdfsSnapshotReplicatorTest extends HdfsSnapshotReplicator { "/apps/falcon/snapshot-replication/sourceDir/", "--" + HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_DIR.getName(), "/apps/falcon/snapshot-replication/targetDir/", + "--" + ReplicationDistCpOption.DISTCP_OPTION_IGNORE_ERRORS.getName(), "false", + "--" + ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_ACL.getName(), "false", "--" + HdfsSnapshotMirrorProperties.TDE_ENCRYPTION_ENABLED.getName(), "false", "--" + HdfsSnapshotMirrorProperties.SNAPSHOT_JOB_NAME.getName(), "snapshotJobName", }; @@ -87,7 +90,7 @@ public class HdfsSnapshotReplicatorTest extends HdfsSnapshotReplicator { miniDfs.allowSnapshot(targetDir); cmd = getCommand(args); - Assert.assertEquals(cmd.getOptionValue(HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName()), "1"); + Assert.assertEquals(cmd.getOptionValue(HdfsSnapshotMirrorProperties.MAX_MAPS.getName()), "1"); Assert.assertEquals(cmd.getOptionValue(HdfsSnapshotMirrorProperties.MAP_BANDWIDTH_IN_MB.getName()), "100"); } http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/common/src/main/java/org/apache/falcon/util/DistCPOptionsUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/DistCPOptionsUtil.java b/common/src/main/java/org/apache/falcon/util/DistCPOptionsUtil.java new file mode 100644 index 0000000..bbeb3e9 --- /dev/null +++ b/common/src/main/java/org/apache/falcon/util/DistCPOptionsUtil.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.util; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.tools.DistCpOptions; +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; +import java.util.List; + +/** + * Utility to set DistCp options. + */ +public final class DistCPOptionsUtil { + private static final String TDE_ENCRYPTION_ENABLED = "tdeEncryptionEnabled"; + + private DistCPOptionsUtil() {} + + public static DistCpOptions getDistCpOptions(CommandLine cmd, + List<Path> sourcePaths, + Path targetPath, + boolean isSnapshot, + Configuration conf) throws FalconException, IOException { + DistCpOptions distcpOptions = new DistCpOptions(sourcePaths, targetPath); + distcpOptions.setBlocking(true); + + distcpOptions.setMaxMaps(Integer.parseInt(cmd.getOptionValue("maxMaps"))); + distcpOptions.setMapBandwidth(Integer.parseInt(cmd.getOptionValue("mapBandwidth"))); + + String tdeEncryptionEnabled = cmd.getOptionValue(TDE_ENCRYPTION_ENABLED); + if (StringUtils.isNotBlank(tdeEncryptionEnabled) && tdeEncryptionEnabled.equalsIgnoreCase(Boolean.TRUE.toString())) { + distcpOptions.setSyncFolder(true); + distcpOptions.setSkipCRC(true); + } else { + if (!isSnapshot) { + String overwrite = cmd.getOptionValue(ReplicationDistCpOption.DISTCP_OPTION_OVERWRITE.getName()); + if (StringUtils.isNotEmpty(overwrite) && overwrite.equalsIgnoreCase(Boolean.TRUE.toString())) { + distcpOptions.setOverwrite(Boolean.parseBoolean(overwrite)); + } else { + distcpOptions.setSyncFolder(true); + } + } + + String skipChecksum = cmd.getOptionValue(ReplicationDistCpOption.DISTCP_OPTION_SKIP_CHECKSUM.getName()); + if (StringUtils.isNotEmpty(skipChecksum)) { + distcpOptions.setSkipCRC(Boolean.parseBoolean(skipChecksum)); + } + } + + if (isSnapshot) { + // Settings needed for Snapshot distCp. + distcpOptions.setSyncFolder(true); + distcpOptions.setDeleteMissing(true); + } else { + // Removing deleted files by default - FALCON-1844 + String removeDeletedFiles = cmd.getOptionValue( + ReplicationDistCpOption.DISTCP_OPTION_REMOVE_DELETED_FILES.getName(), "true"); + boolean deleteMissing = Boolean.parseBoolean(removeDeletedFiles); + distcpOptions.setDeleteMissing(deleteMissing); + if (deleteMissing) { + // DistCP will fail with InvalidInputException if deleteMissing is set to true and + // if targetPath does not exist. Create targetPath to avoid failures. + FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(targetPath.toUri(), conf); + if (!fs.exists(targetPath)) { + fs.mkdirs(targetPath); + } + } + } + + String ignoreErrors = cmd.getOptionValue(ReplicationDistCpOption.DISTCP_OPTION_IGNORE_ERRORS.getName()); + if (StringUtils.isNotBlank(ignoreErrors)) { + distcpOptions.setIgnoreFailures(Boolean.parseBoolean(ignoreErrors)); + } + + String preserveBlockSize = cmd.getOptionValue( + ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_BLOCK_SIZE.getName()); + if (StringUtils.isNotBlank(preserveBlockSize) && Boolean.parseBoolean(preserveBlockSize)) { + distcpOptions.preserve(DistCpOptions.FileAttribute.BLOCKSIZE); + } + + String preserveReplicationCount = cmd.getOptionValue(ReplicationDistCpOption + .DISTCP_OPTION_PRESERVE_REPLICATION_NUMBER.getName()); + if (StringUtils.isNotBlank(preserveReplicationCount) && Boolean.parseBoolean(preserveReplicationCount)) { + distcpOptions.preserve(DistCpOptions.FileAttribute.REPLICATION); + } + + String preservePermission = cmd.getOptionValue( + ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_PERMISSIONS.getName()); + if (StringUtils.isNotBlank(preservePermission) && Boolean.parseBoolean(preservePermission)) { + distcpOptions.preserve(DistCpOptions.FileAttribute.PERMISSION); + } + + String preserveUser = cmd.getOptionValue( + ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_USER.getName()); + if (StringUtils.isNotBlank(preserveUser) && Boolean.parseBoolean(preserveUser)) { + distcpOptions.preserve(DistCpOptions.FileAttribute.USER); + } + + String preserveGroup = cmd.getOptionValue( + ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_GROUP.getName()); + if (StringUtils.isNotBlank(preserveGroup) && Boolean.parseBoolean(preserveGroup)) { + distcpOptions.preserve(DistCpOptions.FileAttribute.GROUP); + } + + String preserveChecksumType = cmd.getOptionValue( + ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_CHECKSUM_TYPE.getName()); + if (StringUtils.isNotBlank(preserveChecksumType) && Boolean.parseBoolean(preserveChecksumType)) { + distcpOptions.preserve(DistCpOptions.FileAttribute.CHECKSUMTYPE); + } + + String preserveAcl = cmd.getOptionValue( + ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_ACL.getName()); + if (StringUtils.isNotBlank(preserveAcl) && Boolean.parseBoolean(preserveAcl)) { + distcpOptions.preserve(DistCpOptions.FileAttribute.ACL); + } + + String preserveXattr = cmd.getOptionValue( + ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_XATTR.getName()); + if (StringUtils.isNotBlank(preserveXattr) && Boolean.parseBoolean(preserveXattr)) { + distcpOptions.preserve(DistCpOptions.FileAttribute.XATTR); + } + + String preserveTimes = cmd.getOptionValue( + ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_TIMES.getName()); + if (StringUtils.isNotBlank(preserveTimes) && Boolean.parseBoolean(preserveTimes)) { + distcpOptions.preserve(DistCpOptions.FileAttribute.TIMES); + } + + return distcpOptions; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/common/src/main/java/org/apache/falcon/util/ReplicationDistCpOption.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/ReplicationDistCpOption.java b/common/src/main/java/org/apache/falcon/util/ReplicationDistCpOption.java index a8b99bb..65f371c 100644 --- a/common/src/main/java/org/apache/falcon/util/ReplicationDistCpOption.java +++ b/common/src/main/java/org/apache/falcon/util/ReplicationDistCpOption.java @@ -29,7 +29,13 @@ public enum ReplicationDistCpOption { DISTCP_OPTION_REMOVE_DELETED_FILES("removeDeletedFiles"), DISTCP_OPTION_PRESERVE_BLOCK_SIZE("preserveBlockSize"), DISTCP_OPTION_PRESERVE_REPLICATION_NUMBER("preserveReplicationNumber"), - DISTCP_OPTION_PRESERVE_PERMISSIONS("preservePermission"); + DISTCP_OPTION_PRESERVE_PERMISSIONS("preservePermission"), + DISTCP_OPTION_PRESERVE_USER("preserveUser"), + DISTCP_OPTION_PRESERVE_GROUP("preserveGroup"), + DISTCP_OPTION_PRESERVE_CHECKSUM_TYPE("preserveChecksumType"), + DISTCP_OPTION_PRESERVE_ACL("preserveAcl"), + DISTCP_OPTION_PRESERVE_XATTR("preserveXattr"), + DISTCP_OPTION_PRESERVE_TIMES("preserveTimes"); private final String name; http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/docs/src/site/twiki/EntitySpecification.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/EntitySpecification.twiki b/docs/src/site/twiki/EntitySpecification.twiki index 2615e5d..11d1e1b 100644 --- a/docs/src/site/twiki/EntitySpecification.twiki +++ b/docs/src/site/twiki/EntitySpecification.twiki @@ -399,6 +399,13 @@ permission indicates the permission. <property name="preserveBlockSize" value="true"/> <property name="preserveReplicationNumber" value="true"/> <property name="preservePermission" value="true"/> + <property name="preserveUser" value="true"/> + <property name="preserveGroup" value="false"/> + <property name="preserveChecksumType" value="true"/> + <property name="preserveAcl" value="true"/> + <property name="preserveXattr" value="true"/> + <property name="preserveTimes" value="true"/> + <property name="tdeEncryptionEnabled" value="false"/> <property name="order" value="LIFO"/> </properties> </verbatim> @@ -414,7 +421,10 @@ used by each mapper during replication. "overwrite" represents overwrite destina bypassing checksum verification during replication. "removeDeletedFiles" represents deleting the files existing in the destination but not in source during replication. "preserveBlockSize" represents preserving block size during replication. "preserveReplicationNumber" represents preserving replication number during replication. -"preservePermission" represents preserving permission during +"preservePermission" represents preserving permission during replication. "preserveUser" represents preserving user during replication. +"preserveGroup" represents preserving group during replication. "preserveChecksumType" represents preserving checksum type during replication. +"preserveAcl" represents preserving ACL during replication. "preserveXattr" represents preserving Xattr during replication. +"preserveTimes" represents preserving access and modification times during replication. "tdeEncryptionEnabled" if TDE is enabled. ---+++ Lifecycle http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/extensions/src/main/java/org/apache/falcon/extensions/AbstractExtension.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/AbstractExtension.java b/extensions/src/main/java/org/apache/falcon/extensions/AbstractExtension.java index 24bbb87..b160bb5 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/AbstractExtension.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/AbstractExtension.java @@ -18,10 +18,12 @@ package org.apache.falcon.extensions; +import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.extensions.mirroring.hdfs.HdfsMirroringExtension; import org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirroringExtension; import org.apache.falcon.extensions.mirroring.hive.HiveMirroringExtension; +import org.apache.falcon.util.ReplicationDistCpOption; import java.util.ArrayList; import java.util.Arrays; @@ -57,5 +59,15 @@ public abstract class AbstractExtension { public abstract void validate(final Properties extensionProperties) throws FalconException; public abstract Properties getAdditionalProperties(final Properties extensionProperties) throws FalconException; + + public static void addAdditionalDistCPProperties(final Properties extensionProperties, + final Properties additionalProperties) { + for (ReplicationDistCpOption distcpOption : ReplicationDistCpOption.values()) { + if (StringUtils.isBlank( + extensionProperties.getProperty(distcpOption.getName()))) { + additionalProperties.put(distcpOption.getName(), "false"); + } + } + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtension.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtension.java b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtension.java index f1acae2..ef26d81 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtension.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtension.java @@ -53,15 +53,16 @@ public class HdfsMirroringExtension extends AbstractExtension { Properties additionalProperties = new Properties(); // Add default properties if not passed - String distcpMaxMaps = extensionProperties.getProperty(HdfsMirroringExtensionProperties.MAX_MAPS.getName()); + String distcpMaxMaps = extensionProperties.getProperty( + HdfsMirroringExtensionProperties.DISTCP_MAX_MAPS.getName()); if (StringUtils.isBlank(distcpMaxMaps)) { - additionalProperties.put(HdfsMirroringExtensionProperties.MAX_MAPS.getName(), "1"); + additionalProperties.put(HdfsMirroringExtensionProperties.DISTCP_MAX_MAPS.getName(), "1"); } String distcpMapBandwidth = extensionProperties.getProperty( - HdfsMirroringExtensionProperties.MAP_BANDWIDTH_IN_MB.getName()); + HdfsMirroringExtensionProperties.DISTCP_MAP_BANDWIDTH_IN_MB.getName()); if (StringUtils.isBlank(distcpMapBandwidth)) { - additionalProperties.put(HdfsMirroringExtensionProperties.MAP_BANDWIDTH_IN_MB.getName(), "100"); + additionalProperties.put(HdfsMirroringExtensionProperties.DISTCP_MAP_BANDWIDTH_IN_MB.getName(), "100"); } // Construct fully qualified hdfs src path @@ -105,6 +106,13 @@ public class HdfsMirroringExtension extends AbstractExtension { } additionalProperties.put(HdfsMirroringExtensionProperties.TARGET_CLUSTER_FS_WRITE_ENDPOINT.getName(), ClusterHelper.getStorageUrl(targetCluster)); + + if (StringUtils.isBlank( + extensionProperties.getProperty(HdfsMirroringExtensionProperties.TDE_ENCRYPTION_ENABLED.getName()))) { + additionalProperties.put(HdfsMirroringExtensionProperties.TDE_ENCRYPTION_ENABLED.getName(), "false"); + } + + addAdditionalDistCPProperties(extensionProperties, additionalProperties); return additionalProperties; } http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtensionProperties.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtensionProperties.java b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtensionProperties.java index 7d24b45..52ae0c0 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtensionProperties.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtensionProperties.java @@ -28,9 +28,10 @@ public enum HdfsMirroringExtensionProperties { TARGET_DIR("targetDir", "Location on target cluster for replication"), TARGET_CLUSTER("targetCluster", "Target cluster"), TARGET_CLUSTER_FS_WRITE_ENDPOINT("targetClusterFS", "Target cluster end point", false), - MAX_MAPS("distcpMaxMaps", "Maximum number of maps used during replication", false), - MAP_BANDWIDTH_IN_MB("distcpMapBandwidth", "Bandwidth in MB/s used by each mapper during replication", - false); + DISTCP_MAX_MAPS("distcpMaxMaps", "Maximum number of maps used during replication", false), + DISTCP_MAP_BANDWIDTH_IN_MB("distcpMapBandwidth", "Bandwidth in MB/s used by each mapper during replication", + false), + TDE_ENCRYPTION_ENABLED("tdeEncryptionEnabled", "Set to true if TDE encryption is enabled", false); private final String name; private final String description; http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirrorProperties.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirrorProperties.java b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirrorProperties.java index f179896..ad707c8 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirrorProperties.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirrorProperties.java @@ -48,8 +48,8 @@ public enum HdfsSnapshotMirrorProperties { TARGET_SNAPSHOT_RETENTION_NUMBER("targetSnapshotRetentionNumber", "Number of latest target snapshots to retain on source", true), - DISTCP_MAX_MAPS("distcpMaxMaps", "Maximum number of maps used during distcp", false), - MAP_BANDWIDTH_IN_MB("distcpMapBandwidth", "Bandwidth in MB/s used by each mapper during replication", false), + MAX_MAPS("maxMaps", "Maximum number of maps used during distcp", false), + MAP_BANDWIDTH_IN_MB("mapBandwidth", "Bandwidth in MB/s used by each mapper during replication", false), TDE_ENCRYPTION_ENABLED("tdeEncryptionEnabled", "Is TDE encryption enabled on source and target", false), SNAPSHOT_JOB_NAME("snapshotJobName", "Name of snapshot based mirror job", false); http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirroringExtension.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirroringExtension.java b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirroringExtension.java index 09cce3b..16b087d 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirroringExtension.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirroringExtension.java @@ -133,9 +133,9 @@ public class HdfsSnapshotMirroringExtension extends AbstractExtension { Properties additionalProperties = new Properties(); // Add default properties if not passed - String distcpMaxMaps = extensionProperties.getProperty(HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName()); + String distcpMaxMaps = extensionProperties.getProperty(HdfsSnapshotMirrorProperties.MAX_MAPS.getName()); if (StringUtils.isBlank(distcpMaxMaps)) { - additionalProperties.put(HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName(), "1"); + additionalProperties.put(HdfsSnapshotMirrorProperties.MAX_MAPS.getName(), "1"); } String distcpMapBandwidth = extensionProperties.getProperty( @@ -223,6 +223,8 @@ public class HdfsSnapshotMirroringExtension extends AbstractExtension { throw new FalconException("Cluster entity " + ExtensionProperties.CLUSTER_NAME.getName() + " not found"); } + + addAdditionalDistCPProperties(extensionProperties, additionalProperties); return additionalProperties; } http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java ---------------------------------------------------------------------- diff --git a/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java b/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java index 21e78d8..3386a31 100644 --- a/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java +++ b/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java @@ -136,7 +136,7 @@ public class ExtensionTest extends AbstractTestExtensionStore { RETENTION_NUM); properties.setProperty(HdfsSnapshotMirrorProperties.TARGET_NN.getName(), NN_URI); - properties.setProperty(HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName(), + properties.setProperty(HdfsSnapshotMirrorProperties.MAX_MAPS.getName(), "5"); properties.setProperty(HdfsSnapshotMirrorProperties.MAP_BANDWIDTH_IN_MB.getName(), "100"); http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java index e45dfc5..cfcc698 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java @@ -58,6 +58,7 @@ public class FSReplicationWorkflowBuilder extends FeedReplicationWorkflowBuilder addHDFSServersConfig(replication, src, target); addAdditionalReplicationProperties(replication); enableCounters(replication); + enableTDE(replication); addTransition(replication, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME); workflow.getDecisionOrForkOrJoin().add(replication); http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java index 010446b..db647aa 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java @@ -49,6 +49,7 @@ public abstract class FeedReplicationWorkflowBuilder extends OozieOrchestrationW private static final String MR_MAX_MAPS = "maxMaps"; private static final String MR_MAP_BANDWIDTH = "mapBandwidth"; private static final String REPLICATION_JOB_COUNTER = "job.counter"; + private static final String TDE_ENCRYPTION_ENABLED = "tdeEncryptionEnabled"; public FeedReplicationWorkflowBuilder(Feed entity) { super(entity, LifeCycle.REPLICATION); @@ -58,7 +59,7 @@ public abstract class FeedReplicationWorkflowBuilder extends OozieOrchestrationW if (entity.getProperties() != null) { List<Property> propertyList = entity.getProperties().getProperties(); for (Property prop : propertyList) { - if (prop.getName().equals(REPLICATION_JOB_COUNTER) && "true".equalsIgnoreCase(prop.getValue())) { + if (prop.getName().equals(REPLICATION_JOB_COUNTER) && "true" .equalsIgnoreCase(prop.getValue())) { return true; } } @@ -66,7 +67,8 @@ public abstract class FeedReplicationWorkflowBuilder extends OozieOrchestrationW return false; } - @Override public Properties build(Cluster cluster, Path buildPath) throws FalconException { + @Override + public Properties build(Cluster cluster, Path buildPath) throws FalconException { Cluster srcCluster = ConfigurationStore.get().get(EntityType.CLUSTER, buildPath.getName()); WORKFLOWAPP workflow = getWorkflow(srcCluster, cluster); @@ -119,6 +121,15 @@ public abstract class FeedReplicationWorkflowBuilder extends OozieOrchestrationW return action; } + protected ACTION enableTDE(ACTION action) throws FalconException { + if (isTDEEnabled()) { + List<String> args = action.getJava().getArg(); + args.add("-tdeEncryptionEnabled"); + args.add("true"); + } + return action; + } + protected abstract WORKFLOWAPP getWorkflow(Cluster src, Cluster target) throws FalconException; @Override @@ -133,4 +144,9 @@ public abstract class FeedReplicationWorkflowBuilder extends OozieOrchestrationW private String getDefaultMapBandwidth() { return RuntimeProperties.get().getProperty("falcon.replication.workflow.mapbandwidth", "100"); } + + private boolean isTDEEnabled() { + String tdeEncryptionEnabled = FeedHelper.getPropertyValue(entity, TDE_ENCRYPTION_ENABLED); + return "true" .equalsIgnoreCase(tdeEncryptionEnabled); + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java ---------------------------------------------------------------------- diff --git a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java index 0906bd5..9c2c522 100644 --- a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java +++ b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java @@ -30,6 +30,7 @@ import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.job.JobCountersHandler; import org.apache.falcon.job.JobType; import org.apache.falcon.job.JobCounters; +import org.apache.falcon.util.DistCPOptionsUtil; import org.apache.falcon.util.ReplicationDistCpOption; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; @@ -57,6 +58,7 @@ public class FeedReplicator extends Configured implements Tool { private static final Logger LOG = LoggerFactory.getLogger(FeedReplicator.class); private static final String IGNORE = "IGNORE"; + private static final String TDE_ENCRYPTION_ENABLED = "tdeEncryptionEnabled"; public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(), new FeedReplicator(), args); @@ -177,10 +179,44 @@ public class FeedReplicator extends Configured implements Tool { opt.setRequired(false); options.addOption(opt); + opt = new Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_USER.getName(), true, + "preserve user"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_GROUP.getName(), true, + "preserve group"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_CHECKSUM_TYPE.getName(), true, + "preserve checksum type"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_ACL.getName(), true, + "preserve ACL"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_XATTR.getName(), true, + "preserve XATTR"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_TIMES.getName(), true, + "preserve access and modification times"); + opt.setRequired(false); + options.addOption(opt); + opt = new Option("counterLogDir", true, "log directory to store job counter file"); opt.setRequired(false); options.addOption(opt); + opt = new Option(TDE_ENCRYPTION_ENABLED, true, "TDE encryption enabled"); + opt.setRequired(false); + options.addOption(opt); + return new GnuParser().parse(options, args); } @@ -190,61 +226,7 @@ public class FeedReplicator extends Configured implements Tool { String targetPathString = cmd.getOptionValue("targetPath").trim(); Path targetPath = new Path(targetPathString); - DistCpOptions distcpOptions = new DistCpOptions(srcPaths, targetPath); - distcpOptions.setBlocking(true); - distcpOptions.setMaxMaps(Integer.parseInt(cmd.getOptionValue("maxMaps"))); - distcpOptions.setMapBandwidth(Integer.parseInt(cmd.getOptionValue("mapBandwidth"))); - - String overwrite = cmd.getOptionValue(ReplicationDistCpOption.DISTCP_OPTION_OVERWRITE.getName()); - if (StringUtils.isNotEmpty(overwrite) && overwrite.equalsIgnoreCase(Boolean.TRUE.toString())) { - distcpOptions.setOverwrite(Boolean.parseBoolean(overwrite)); - } else { - distcpOptions.setSyncFolder(true); - } - - String ignoreErrors = cmd.getOptionValue(ReplicationDistCpOption.DISTCP_OPTION_IGNORE_ERRORS.getName()); - if (StringUtils.isNotEmpty(ignoreErrors)) { - distcpOptions.setIgnoreFailures(Boolean.parseBoolean(ignoreErrors)); - } - - String skipChecksum = cmd.getOptionValue(ReplicationDistCpOption.DISTCP_OPTION_SKIP_CHECKSUM.getName()); - if (StringUtils.isNotEmpty(skipChecksum)) { - distcpOptions.setSkipCRC(Boolean.parseBoolean(skipChecksum)); - } - - // Removing deleted files by default - FALCON-1844 - String removeDeletedFiles = cmd.getOptionValue( - ReplicationDistCpOption.DISTCP_OPTION_REMOVE_DELETED_FILES.getName(), "true"); - boolean deleteMissing = Boolean.parseBoolean(removeDeletedFiles); - distcpOptions.setDeleteMissing(deleteMissing); - if (deleteMissing) { - // DistCP will fail with InvalidInputException if deleteMissing is set to true and - // if targetPath does not exist. Create targetPath to avoid failures. - FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(targetPath.toUri(), getConf()); - if (!fs.exists(targetPath)) { - fs.mkdirs(targetPath); - } - } - - String preserveBlockSize = cmd.getOptionValue( - ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_BLOCK_SIZE.getName()); - if (preserveBlockSize != null && Boolean.parseBoolean(preserveBlockSize)) { - distcpOptions.preserve(DistCpOptions.FileAttribute.BLOCKSIZE); - } - - String preserveReplicationCount = cmd.getOptionValue(ReplicationDistCpOption - .DISTCP_OPTION_PRESERVE_REPLICATION_NUMBER.getName()); - if (preserveReplicationCount != null && Boolean.parseBoolean(preserveReplicationCount)) { - distcpOptions.preserve(DistCpOptions.FileAttribute.REPLICATION); - } - - String preservePermission = cmd.getOptionValue( - ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_PERMISSIONS.getName()); - if (preservePermission != null && Boolean.parseBoolean(preservePermission)) { - distcpOptions.preserve(DistCpOptions.FileAttribute.PERMISSION); - } - - return distcpOptions; + return DistCPOptionsUtil.getDistCpOptions(cmd, srcPaths, targetPath, false, getConf()); } private List<Path> getPaths(String[] paths) { http://git-wip-us.apache.org/repos/asf/falcon/blob/3efffd4d/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java ---------------------------------------------------------------------- diff --git a/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java b/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java index 2662ade..b9b383d 100644 --- a/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java +++ b/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java @@ -84,7 +84,13 @@ public class FeedReplicatorTest { * <arg>-removeDeletedFiles</arg><arg>true</arg> * <arg>-preserveBlockSize</arg><arg>false</arg> * <arg>-preserveReplicationCount</arg><arg>true</arg> - * <arg>-preserveBlockSize</arg><arg>false</arg> + * <arg>-preservePermission</arg><arg>false</arg> + * <arg>-preserveUser</arg><arg>true</arg> + * <arg>-preserveGroup</arg><arg>false</arg> + * <arg>-preserveChecksumType</arg><arg>false</arg> + * <arg>-preserveAcl</arg><arg>true</arg> + * <arg>-preserveXattr</arg><arg>false</arg> + * <arg>-preserveTimes</arg><arg>false</arg> */ final String[] optionalArgs = { "true", @@ -100,6 +106,12 @@ public class FeedReplicatorTest { "-preserveBlockSize", "false", "-preserveReplicationNumber", "true", "-preservePermission", "false", + "-preserveUser", "true", + "-preserveGroup", "false", + "-preserveChecksumType", "false", + "-preserveAcl", "true", + "-preserveXattr", "false", + "-preserveTimes", "false", }; FeedReplicator replicator = new FeedReplicator(); @@ -128,5 +140,11 @@ public class FeedReplicatorTest { Assert.assertFalse(options.shouldPreserve(DistCpOptions.FileAttribute.BLOCKSIZE)); Assert.assertTrue(options.shouldPreserve(DistCpOptions.FileAttribute.REPLICATION)); Assert.assertFalse(options.shouldPreserve(DistCpOptions.FileAttribute.PERMISSION)); + Assert.assertTrue(options.shouldPreserve(DistCpOptions.FileAttribute.USER)); + Assert.assertFalse(options.shouldPreserve(DistCpOptions.FileAttribute.GROUP)); + Assert.assertFalse(options.shouldPreserve(DistCpOptions.FileAttribute.CHECKSUMTYPE)); + Assert.assertTrue(options.shouldPreserve(DistCpOptions.FileAttribute.ACL)); + Assert.assertFalse(options.shouldPreserve(DistCpOptions.FileAttribute.XATTR)); + Assert.assertFalse(options.shouldPreserve(DistCpOptions.FileAttribute.TIMES)); } }
