Repository: falcon
Updated Branches:
  refs/heads/master 952a9e6c3 -> cc80a1754


FALCON-668 FeedReplicator improvement to include more DistCP options. 
Contributed by Sowmya Ramesh.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/cc80a175
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/cc80a175
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/cc80a175

Branch: refs/heads/master
Commit: cc80a17546db614098709d71dfbb2a270bfbf4ac
Parents: 952a9e6
Author: Ajay Yadava <[email protected]>
Authored: Fri Jul 24 05:23:02 2015 +0530
Committer: Ajay Yadava <[email protected]>
Committed: Fri Jul 24 05:23:59 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../falcon/util/ReplicationDistCpOption.java    | 43 +++++++++++
 docs/src/site/twiki/EntitySpecification.twiki   | 19 ++++-
 .../OozieOrchestrationWorkflowBuilder.java      | 15 ++++
 .../feed/FSReplicationWorkflowBuilder.java      |  1 +
 .../feed/HCatReplicationWorkflowBuilder.java    |  1 +
 .../falcon/replication/FeedReplicator.java      | 77 +++++++++++++++++++-
 .../falcon/replication/FeedReplicatorTest.java  | 58 +++++++++++++++
 8 files changed, 212 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/cc80a175/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0c8e31a..45d01b0 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,8 @@ Trunk (Unreleased)
     FALCON-796 Enable users to triage data processing issues through falcon 
(Ajay Yadava)
     
   IMPROVEMENTS
+    FALCON-668 FeedReplicator improvement to include more DistCP 
options(Sowmya Ramesh via Ajay Yadava)
+
     FALCON-1320 Adding equals() and hashCode() method in 
LineageGraphResult.Edge(Pragya Mittal via Ajay Yadava)
 
     FALCON-1139 Validation issues in Falcon UI(Pallavi Rao via Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc80a175/common/src/main/java/org/apache/falcon/util/ReplicationDistCpOption.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/util/ReplicationDistCpOption.java 
b/common/src/main/java/org/apache/falcon/util/ReplicationDistCpOption.java
new file mode 100644
index 0000000..a8b99bb
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/util/ReplicationDistCpOption.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.util;
+
+/**
+ * enum for DistCp options.
+ */
+public enum ReplicationDistCpOption {
+
+    DISTCP_OPTION_OVERWRITE("overwrite"),
+    DISTCP_OPTION_IGNORE_ERRORS("ignoreErrors"),
+    DISTCP_OPTION_SKIP_CHECKSUM("skipChecksum"),
+    DISTCP_OPTION_REMOVE_DELETED_FILES("removeDeletedFiles"),
+    DISTCP_OPTION_PRESERVE_BLOCK_SIZE("preserveBlockSize"),
+    DISTCP_OPTION_PRESERVE_REPLICATION_NUMBER("preserveReplicationNumber"),
+    DISTCP_OPTION_PRESERVE_PERMISSIONS("preservePermission");
+
+    private final String name;
+
+    ReplicationDistCpOption(String name) {
+        this.name = name;
+    }
+
+    public String getName() {
+        return name;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc80a175/docs/src/site/twiki/EntitySpecification.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/EntitySpecification.twiki 
b/docs/src/site/twiki/EntitySpecification.twiki
index 0c1fae2..98d6153 100644
--- a/docs/src/site/twiki/EntitySpecification.twiki
+++ b/docs/src/site/twiki/EntitySpecification.twiki
@@ -280,6 +280,13 @@ permission indicates the permission.
         <property name="parallel" value="3"/>
         <property name="maxMaps" value="8"/>
         <property name="mapBandwidth" value="1"/>
+        <property name="overwrite" value="true"/>
+        <property name="ignoreErrors" value="false"/>
+        <property name="skipChecksum" value="false"/>
+        <property name="removeDeletedFiles" value="true"/>
+        <property name="preserveBlockSize" value="true"/>
+        <property name="preserveReplicationNumber" value="true"/>
+        <property name="preservePermission" value="true"/>
         <property name="order" value="LIFO"/>
     </properties>
 </verbatim>
@@ -288,9 +295,15 @@ available to user to specify the Hadoop job queue and 
priority, the same values
 "timeout", "parallel" and "order" are other special properties which decides 
replication instance's timeout value while
 waiting for the feed instance, parallel decides the concurrent replication 
instances that can run at any given time and
 order decides the execution order for replication instances like FIFO, LIFO 
and LAST_ONLY.
-"maxMaps" represents the maximum number of maps used during replication. 
"mapBandwidth" represents the bandwidth in MB/s
-used by each mapper during replication.
- 
+DistCp options can be passed as custom properties, which will be propagated to 
the DistCp tool. "maxMaps" represents
+the maximum number of maps used during replication. "mapBandwidth" represents 
the bandwidth in MB/s
+used by each mapper during replication. "overwrite" represents overwrite 
destination during replication.
+"ignoreErrors" represents ignore failures not causing the job to fail during 
replication. "skipChecksum" represents
+bypassing checksum verification during replication. "removeDeletedFiles" 
represents deleting the files existing in the
+destination but not in source during replication. "preserveBlockSize" 
represents preserving block size during
+replication. "preserveReplicationNumber" represents preserving replication 
number during replication.
+"preservePermission" represents preserving permission during
+
 ---++ Process Specification
 A process defines configuration for a workflow. A workflow is a directed 
acyclic graph(DAG) which defines the job for the workflow engine. A process 
definition defines  the configurations required to run the workflow job. For 
example, process defines the frequency at which the workflow should run, the 
clusters on which the workflow should run, the inputs and outputs for the 
workflow, how the workflow failures should be handled, how the late inputs 
should be handled and so on.  
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc80a175/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
 
b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
index f8220ec..f7193a3 100644
--- 
a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
+++ 
b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
@@ -19,6 +19,7 @@
 package org.apache.falcon.oozie;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.LifeCycle;
 import org.apache.falcon.Tag;
@@ -46,6 +47,7 @@ import org.apache.falcon.oozie.workflow.START;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
 import org.apache.falcon.security.SecurityUtil;
 import org.apache.falcon.util.OozieUtils;
+import org.apache.falcon.util.ReplicationDistCpOption;
 import org.apache.falcon.util.RuntimeProperties;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
@@ -160,6 +162,19 @@ public abstract class OozieOrchestrationWorkflowBuilder<T 
extends Entity> extend
         action.getError().setTo(fail);
     }
 
+    protected void addAdditionalReplicationProperties(ACTION 
replicationAction) {
+        List<String> args = replicationAction.getJava().getArg();
+        Properties props = getEntityProperties(entity);
+
+        for (ReplicationDistCpOption distcpOption : 
ReplicationDistCpOption.values()) {
+            String propertyValue = props.getProperty(distcpOption.getName());
+            if (StringUtils.isNotEmpty(propertyValue)) {
+                args.add("-" + distcpOption.getName());
+                args.add(propertyValue);
+            }
+        }
+    }
+
     protected void decorateWorkflow(WORKFLOWAPP wf, String name, String 
startAction) {
         wf.setName(name);
         wf.setStart(new START());

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc80a175/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
 
b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
index 0381e59..b82f4e0 100644
--- 
a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
+++ 
b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
@@ -55,6 +55,7 @@ public class FSReplicationWorkflowBuilder extends 
FeedReplicationWorkflowBuilder
         //Add replication
         ACTION replication = unmarshalAction(REPLICATION_ACTION_TEMPLATE);
         addHDFSServersConfig(replication, src, target);
+        addAdditionalReplicationProperties(replication);
         addTransition(replication, SUCCESS_POSTPROCESS_ACTION_NAME, 
FAIL_POSTPROCESS_ACTION_NAME);
         workflow.getDecisionOrForkOrJoin().add(replication);
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc80a175/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
 
b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
index 347ddaf..ed86b32 100644
--- 
a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
+++ 
b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
@@ -69,6 +69,7 @@ public class HCatReplicationWorkflowBuilder extends 
FeedReplicationWorkflowBuild
 
         //Add replication
         ACTION replication = unmarshalAction(REPLICATION_ACTION_TEMPLATE);
+        addAdditionalReplicationProperties(replication);
         addTransition(replication, IMPORT_ACTION_NAME, 
FAIL_POSTPROCESS_ACTION_NAME);
         workflow.getDecisionOrForkOrJoin().add(replication);
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc80a175/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
----------------------------------------------------------------------
diff --git 
a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java 
b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
index b2175b2..a226058 100644
--- 
a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
+++ 
b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
@@ -27,6 +27,7 @@ import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.Storage;
 import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.util.ReplicationDistCpOption;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
@@ -127,6 +128,39 @@ public class FeedReplicator extends Configured implements 
Tool {
         opt.setRequired(false);
         options.addOption(opt);
 
+        opt = new 
Option(ReplicationDistCpOption.DISTCP_OPTION_OVERWRITE.getName(), true, "option 
to force overwrite");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new 
Option(ReplicationDistCpOption.DISTCP_OPTION_IGNORE_ERRORS.getName(), true, 
"abort on error");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new 
Option(ReplicationDistCpOption.DISTCP_OPTION_SKIP_CHECKSUM.getName(), true, 
"skip checksums");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new 
Option(ReplicationDistCpOption.DISTCP_OPTION_REMOVE_DELETED_FILES.getName(), 
true,
+                "remove deleted files - should there be files in the target 
directory that"
+                        + "were removed from the source directory");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new 
Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_BLOCK_SIZE.getName(), 
true,
+                "preserve block size");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new 
Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_REPLICATION_NUMBER.getName(),
 true,
+                "preserve replication count");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new 
Option(ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_PERMISSIONS.getName(), 
true,
+                "preserve permissions");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         return new GnuParser().parse(options, args);
     }
 
@@ -136,10 +170,51 @@ public class FeedReplicator extends Configured implements 
Tool {
         String trgPath = cmd.getOptionValue("targetPath").trim();
 
         DistCpOptions distcpOptions = new DistCpOptions(srcPaths, new 
Path(trgPath));
-        distcpOptions.setSyncFolder(true);
         distcpOptions.setBlocking(true);
         
distcpOptions.setMaxMaps(Integer.valueOf(cmd.getOptionValue("maxMaps")));
         
distcpOptions.setMapBandwidth(Integer.valueOf(cmd.getOptionValue("mapBandwidth")));
+
+        String overwrite = 
cmd.getOptionValue(ReplicationDistCpOption.DISTCP_OPTION_OVERWRITE.getName());
+        if (StringUtils.isNotEmpty(overwrite) && 
overwrite.equalsIgnoreCase(Boolean.TRUE.toString())) {
+            distcpOptions.setOverwrite(Boolean.parseBoolean(overwrite));
+        } else {
+            distcpOptions.setSyncFolder(true);
+        }
+
+        String ignoreErrors = 
cmd.getOptionValue(ReplicationDistCpOption.DISTCP_OPTION_IGNORE_ERRORS.getName());
+        if (StringUtils.isNotEmpty(ignoreErrors)) {
+            
distcpOptions.setIgnoreFailures(Boolean.parseBoolean(ignoreErrors));
+        }
+
+        String skipChecksum = 
cmd.getOptionValue(ReplicationDistCpOption.DISTCP_OPTION_SKIP_CHECKSUM.getName());
+        if (StringUtils.isNotEmpty(skipChecksum)) {
+            distcpOptions.setSkipCRC(Boolean.parseBoolean(skipChecksum));
+        }
+
+        String removeDeletedFiles = cmd.getOptionValue(
+                
ReplicationDistCpOption.DISTCP_OPTION_REMOVE_DELETED_FILES.getName());
+        if (StringUtils.isNotEmpty(removeDeletedFiles)) {
+            
distcpOptions.setDeleteMissing(Boolean.parseBoolean(removeDeletedFiles));
+        }
+
+        String preserveBlockSize = cmd.getOptionValue(
+                
ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_BLOCK_SIZE.getName());
+        if (preserveBlockSize != null && 
Boolean.parseBoolean(preserveBlockSize)) {
+            distcpOptions.preserve(DistCpOptions.FileAttribute.BLOCKSIZE);
+        }
+
+        String preserveReplicationCount = 
cmd.getOptionValue(ReplicationDistCpOption
+                .DISTCP_OPTION_PRESERVE_REPLICATION_NUMBER.getName());
+        if (preserveReplicationCount != null && 
Boolean.parseBoolean(preserveReplicationCount)) {
+            distcpOptions.preserve(DistCpOptions.FileAttribute.REPLICATION);
+        }
+
+        String preservePermission = cmd.getOptionValue(
+                
ReplicationDistCpOption.DISTCP_OPTION_PRESERVE_PERMISSIONS.getName());
+        if (preservePermission != null && 
Boolean.parseBoolean(preservePermission)) {
+            distcpOptions.preserve(DistCpOptions.FileAttribute.PERMISSION);
+        }
+
         return distcpOptions;
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc80a175/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java
----------------------------------------------------------------------
diff --git 
a/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java
 
b/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java
index 539d00d..9cfeb30 100644
--- 
a/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java
+++ 
b/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java
@@ -57,9 +57,67 @@ public class FeedReplicatorTest {
 
         List<Path> srcPaths = new ArrayList<Path>();
         srcPaths.add(new Path("hdfs://localhost:8020/tmp/"));
+        validateMandatoryArguments(options, srcPaths, true);
+    }
+
+    @Test
+    public void testOptionalArguments() throws Exception {
+        /*
+         * <arg>-update</arg>
+         * <arg>-blocking</arg><arg>true</arg>
+         * <arg>-maxMaps</arg><arg>3</arg>
+         * <arg>-mapBandwidthKB</arg><arg>4</arg>
+         * <arg>-sourcePaths</arg><arg>${distcpSourcePaths}</arg>
+         * <arg>-targetPath</arg><arg>${distcpTargetPaths}</arg>
+         * <arg>-overwrite</arg><arg>true</arg>
+         * <arg>-ignoreErrors</arg><arg>false</arg>
+         * <arg>-skipChecksum</arg><arg>false</arg>
+         * <arg>-removeDeletedFiles</arg><arg>true</arg>
+         * <arg>-preserveBlockSize</arg><arg>false</arg>
+         * <arg>-preserveReplicationCount</arg><arg>true</arg>
+         * <arg>-preserveBlockSize</arg><arg>false</arg>
+         */
+        final String[] optionalArgs = {
+            "true",
+            "-maxMaps", "3",
+            "-mapBandwidth", "4",
+            "-sourcePaths", "hdfs://localhost:8020/tmp/",
+            "-targetPath", "hdfs://localhost1:8020/tmp/",
+            "-falconFeedStorageType", Storage.TYPE.FILESYSTEM.name(),
+            "-overwrite", "true",
+            "-ignoreErrors", "false",
+            "-skipChecksum", "false",
+            "-removeDeletedFiles", "true",
+            "-preserveBlockSize", "false",
+            "-preserveReplicationNumber", "true",
+            "-preservePermission", "false",
+        };
+
+        FeedReplicator replicator = new FeedReplicator();
+        CommandLine cmd = replicator.getCommand(optionalArgs);
+        DistCpOptions options = replicator.getDistCpOptions(cmd);
+
+        List<Path> srcPaths = new ArrayList<Path>();
+        srcPaths.add(new Path("hdfs://localhost:8020/tmp/"));
+        validateMandatoryArguments(options, srcPaths, false);
+        validateOptionalArguments(options);
+    }
+
+    private void validateMandatoryArguments(DistCpOptions options, List<Path> 
srcPaths, boolean shouldSyncFolder) {
         Assert.assertEquals(options.getMaxMaps(), 3);
         Assert.assertEquals(options.getMapBandwidth(), 4);
         Assert.assertEquals(options.getSourcePaths(), srcPaths);
         Assert.assertEquals(options.getTargetPath(), new 
Path("hdfs://localhost1:8020/tmp/"));
+        Assert.assertEquals(options.shouldSyncFolder(), shouldSyncFolder);
+    }
+
+    private void validateOptionalArguments(DistCpOptions options) {
+        Assert.assertTrue(options.shouldOverwrite());
+        Assert.assertFalse(options.shouldIgnoreFailures());
+        Assert.assertFalse(options.shouldSkipCRC());
+        Assert.assertTrue(options.shouldDeleteMissing());
+        
Assert.assertFalse(options.shouldPreserve(DistCpOptions.FileAttribute.BLOCKSIZE));
+        
Assert.assertTrue(options.shouldPreserve(DistCpOptions.FileAttribute.REPLICATION));
+        
Assert.assertFalse(options.shouldPreserve(DistCpOptions.FileAttribute.PERMISSION));
     }
 }

Reply via email to