Repository: falcon
Updated Branches:
  refs/heads/master 75079c2e5 -> 48d0723cc


FALCON-1373 HiveDR does not work when job is run on destination cluster. 
Contributed by Balu Vellanki.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/11f20f47
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/11f20f47
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/11f20f47

Branch: refs/heads/master
Commit: 11f20f4734eccc01d63516b035073237bcb79837
Parents: 75079c2
Author: Ajay Yadava <[email protected]>
Authored: Tue Sep 22 21:52:41 2015 +0530
Committer: Ajay Yadava <[email protected]>
Committed: Tue Sep 22 21:52:41 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../java/org/apache/falcon/hive/HiveDRArgs.java |  8 +++---
 .../org/apache/falcon/hive/HiveDROptions.java   | 12 ++++++--
 .../java/org/apache/falcon/hive/HiveDRTool.java | 29 ++++++++++++--------
 .../org/apache/falcon/hive/util/EventUtils.java | 27 ++++++++++++++----
 .../hive-disaster-recovery-secure.properties    |  4 ++-
 .../resources/hive-disaster-recovery.properties |  4 ++-
 7 files changed, 61 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/11f20f47/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6836690..511c4ec 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -33,6 +33,8 @@ Trunk (Unreleased)
     FALCON-1403 Revisit IT cleanup and teardown(Narayan Periwal via Pallavi 
Rao)
 
   BUG FIXES
+    FALCON-1373 HiveDR does not work when job is run on destination 
cluster(Balu Vellanki via Ajay Yadava)
+
     FALCON-1401 MetadataMappingService fails to add an edge for a process 
instance(Pallavi Rao) 
 
     FALCON-1465 Cluster submission fails with 
java.lang.IllegalArgumentException in distributed mode(Ajay Yadava via Sowmya 
Ramesh)

http://git-wip-us.apache.org/repos/asf/falcon/blob/11f20f47/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java
index 1ad6a62..574524d 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java
@@ -57,9 +57,6 @@ public enum HiveDRArgs {
             "Target hive metastore kerberos principal", false),
     TARGET_HIVE2_KERBEROS_PRINCIPAL("targetHive2KerberosPrincipal", "Target 
hiveserver2 kerberos principal", false),
 
-    CLUSTER_FOR_JOB_NN_KERBEROS_PRINCIPAL("clusterForJobNNKerberosPrincipal",
-            "Namenode kerberos principal of cluster on which replication job 
runs", false),
-
     // num events
     MAX_EVENTS("maxEvents", "number of events to process in this run"),
 
@@ -73,7 +70,10 @@ public enum HiveDRArgs {
     JOB_NAME("drJobName", "unique job name"),
 
     CLUSTER_FOR_JOB_RUN("clusterForJobRun", "cluster where job runs"),
-    CLUSTER_FOR_JOB_RUN_WRITE_EP("clusterForJobRunWriteEP", "cluster where job 
runs write EP"),
+    JOB_CLUSTER_NN("clusterForJobRunWriteEP", "write end point of cluster 
where job runs"),
+    JOB_CLUSTER_NN_KERBEROS_PRINCIPAL("clusterForJobNNKerberosPrincipal",
+            "Namenode kerberos principal of cluster on which replication job 
runs", false),
+
 
     FALCON_LIBPATH("falconLibPath", "Falcon Lib Path for Jar files", false),
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/11f20f47/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java
----------------------------------------------------------------------
diff --git 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java
index 026f6e3..28515e4 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java
@@ -77,6 +77,14 @@ public class HiveDROptions {
         throw new HiveReplicationException("Source StagingPath cannot be 
empty");
     }
 
+    public String getSourceWriteEP() {
+        return context.get(HiveDRArgs.SOURCE_NN);
+    }
+
+    public String getSourceNNKerberosPrincipal() {
+        return context.get(HiveDRArgs.SOURCE_NN_KERBEROS_PRINCIPAL);
+    }
+
     public String getTargetWriteEP() {
         return context.get(HiveDRArgs.TARGET_NN);
     }
@@ -120,11 +128,11 @@ public class HiveDROptions {
     }
 
     public String getJobClusterWriteEP() {
-        return context.get(HiveDRArgs.CLUSTER_FOR_JOB_RUN_WRITE_EP);
+        return context.get(HiveDRArgs.JOB_CLUSTER_NN);
     }
 
     public String getJobClusterNNPrincipal() {
-        return context.get(HiveDRArgs.CLUSTER_FOR_JOB_NN_KERBEROS_PRINCIPAL);
+        return context.get(HiveDRArgs.JOB_CLUSTER_NN_KERBEROS_PRINCIPAL);
     }
 
     public void setSourceStagingDir(String path) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/11f20f47/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
index 712efe8..bebdb0b 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
@@ -62,7 +62,8 @@ public class HiveDRTool extends Configured implements Tool {
     private static final String META_PATH_FILE_SUFFIX = ".metapath";
 
     private FileSystem jobFS;
-    private FileSystem targetClusterFs;
+    private FileSystem sourceClusterFS;
+    private FileSystem targetClusterFS;
 
     private HiveDROptions inputOptions;
     private DRStatusStore drStore;
@@ -117,15 +118,18 @@ public class HiveDRTool extends Configured implements 
Tool {
         inputOptions = parseOptions(args);
         LOG.info("Input Options: {}", inputOptions);
 
+        Configuration sourceConf = 
FileUtils.getConfiguration(inputOptions.getSourceWriteEP(),
+                inputOptions.getSourceNNKerberosPrincipal());
+        sourceClusterFS = FileSystem.get(sourceConf);
         Configuration targetConf = 
FileUtils.getConfiguration(inputOptions.getTargetWriteEP(),
                 inputOptions.getTargetNNKerberosPrincipal());
-        targetClusterFs = FileSystem.get(targetConf);
+        targetClusterFS = FileSystem.get(targetConf);
         jobConf = 
FileUtils.getConfiguration(inputOptions.getJobClusterWriteEP(),
                 inputOptions.getJobClusterNNPrincipal());
         jobFS = FileSystem.get(jobConf);
 
         // init DR status store
-        drStore = new HiveDRStatusStore(targetClusterFs);
+        drStore = new HiveDRStatusStore(targetClusterFS);
         eventSoucerUtil = new EventSourcerUtils(jobConf, 
inputOptions.shouldKeepHistory(), inputOptions.getJobName());
     }
 
@@ -219,12 +223,12 @@ public class HiveDRTool extends Configured implements 
Tool {
         Path sourceStagingPath = new Path(inputOptions.getSourceStagingPath());
         Path targetStagingPath = new Path(inputOptions.getTargetStagingPath());
         LOG.info("Source staging path: {}", sourceStagingPath);
-        if (!FileSystem.mkdirs(jobFS, sourceStagingPath, 
STAGING_DIR_PERMISSION)) {
+        if (!FileSystem.mkdirs(sourceClusterFS, sourceStagingPath, 
STAGING_DIR_PERMISSION)) {
             throw new IOException("mkdir failed for " + sourceStagingPath);
         }
 
         LOG.info("Target staging path: {}", targetStagingPath);
-        if (!FileSystem.mkdirs(targetClusterFs, targetStagingPath, 
STAGING_DIR_PERMISSION)) {
+        if (!FileSystem.mkdirs(targetClusterFS, targetStagingPath, 
STAGING_DIR_PERMISSION)) {
             throw new IOException("mkdir failed for " + targetStagingPath);
         }
     }
@@ -234,12 +238,12 @@ public class HiveDRTool extends Configured implements 
Tool {
         Path sourceStagingPath = new Path(inputOptions.getSourceStagingPath());
         Path targetStagingPath = new Path(inputOptions.getTargetStagingPath());
         try {
-            if (jobFS.exists(sourceStagingPath)) {
-                jobFS.delete(sourceStagingPath, true);
+            if (sourceClusterFS.exists(sourceStagingPath)) {
+                sourceClusterFS.delete(sourceStagingPath, true);
             }
 
-            if (targetClusterFs.exists(targetStagingPath)) {
-                targetClusterFs.delete(targetStagingPath, true);
+            if (targetClusterFS.exists(targetStagingPath)) {
+                targetClusterFS.delete(targetStagingPath, true);
             }
         } catch (IOException e) {
             LOG.error("Unable to cleanup staging dir:", e);
@@ -320,8 +324,11 @@ public class HiveDRTool extends Configured implements Tool 
{
             if (jobFS != null) {
                 jobFS.close();
             }
-            if (targetClusterFs != null) {
-                targetClusterFs.close();
+            if (targetClusterFS != null) {
+                targetClusterFS.close();
+            }
+            if (sourceClusterFS != null) {
+                sourceClusterFS.close();
             }
         } catch (IOException e) {
             LOG.error("Closing FS failed", e);

http://git-wip-us.apache.org/repos/asf/falcon/blob/11f20f47/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
----------------------------------------------------------------------
diff --git 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
index 0b4200c..f8397ff 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
@@ -61,16 +61,19 @@ public class EventUtils {
     private String sourceDatabase = null;
     private String sourceNN = null;
     private String sourceNNKerberosPrincipal = null;
+    private String jobNN = null;
+    private String jobNNKerberosPrincipal = null;
     private String targetHiveServer2Uri = null;
     private String targetStagingPath = null;
     private String targetNN = null;
     private String targetNNKerberosPrincipal = null;
-    private String targetStagingUri = null;
+    private String fullyQualifiedTargetStagingPath = null;
     private List<Path> sourceCleanUpList = null;
     private List<Path> targetCleanUpList = null;
     private static final Logger LOG = 
LoggerFactory.getLogger(EventUtils.class);
 
     private FileSystem sourceFileSystem = null;
+    private FileSystem jobFileSystem = null;
     private FileSystem targetFileSystem = null;
     private Connection sourceConnection = null;
     private Connection targetConnection = null;
@@ -85,6 +88,8 @@ public class EventUtils {
         sourceDatabase = conf.get(HiveDRArgs.SOURCE_DATABASE.getName());
         sourceNN = conf.get(HiveDRArgs.SOURCE_NN.getName());
         sourceNNKerberosPrincipal = 
conf.get(HiveDRArgs.SOURCE_NN_KERBEROS_PRINCIPAL.getName());
+        jobNN = conf.get(HiveDRArgs.JOB_CLUSTER_NN.getName());
+        jobNNKerberosPrincipal = 
conf.get(HiveDRArgs.JOB_CLUSTER_NN_KERBEROS_PRINCIPAL.getName());
         targetHiveServer2Uri = conf.get(HiveDRArgs.TARGET_HS2_URI.getName());
         targetStagingPath = conf.get(HiveDRArgs.TARGET_STAGING_PATH.getName())
                 + File.separator + conf.get(HiveDRArgs.JOB_NAME.getName());
@@ -128,14 +133,15 @@ public class EventUtils {
 
     public void initializeFS() throws IOException {
         LOG.info("Initializing staging directory");
-        targetStagingUri = new Path(targetNN, targetStagingPath).toString();
+        fullyQualifiedTargetStagingPath = new Path(targetNN, 
targetStagingPath).toString();
         sourceFileSystem = FileSystem.get(FileUtils.getConfiguration(sourceNN, 
sourceNNKerberosPrincipal));
+        jobFileSystem = FileSystem.get(FileUtils.getConfiguration(jobNN, 
jobNNKerberosPrincipal));
         targetFileSystem = FileSystem.get(FileUtils.getConfiguration(targetNN, 
targetNNKerberosPrincipal));
     }
 
     private String readEvents(Path eventFileName) throws IOException {
         StringBuilder eventString = new StringBuilder();
-        BufferedReader in = new BufferedReader(new 
InputStreamReader(sourceFileSystem.open(eventFileName)));
+        BufferedReader in = new BufferedReader(new 
InputStreamReader(jobFileSystem.open(eventFileName)));
         try {
             String line;
             while ((line=in.readLine())!=null) {
@@ -302,16 +308,25 @@ public class EventUtils {
         DistCpOptions options = getDistCpOptions(srcStagingPaths);
         DistCp distCp = new DistCp(conf, options);
         LOG.info("Started DistCp with source Path: {} \ttarget path: {}", 
StringUtils.join(srcStagingPaths.toArray()),
-                targetStagingUri);
+                fullyQualifiedTargetStagingPath);
         Job distcpJob = distCp.execute();
         LOG.info("Distp Hadoop job: {}", distcpJob.getJobID().toString());
         LOG.info("Completed DistCp");
     }
 
     public DistCpOptions getDistCpOptions(List<Path> srcStagingPaths) {
-        srcStagingPaths.toArray(new Path[srcStagingPaths.size()]);
+        /*
+         * Add the fully qualified sourceNameNode to srcStagingPath uris. This 
will
+         * ensure DistCp will succeed when the job is run on target cluster.
+         */
+        List<Path> fullyQualifiedSrcStagingPaths = new ArrayList<Path>();
+        for (Path srcPath : srcStagingPaths) {
+            fullyQualifiedSrcStagingPaths.add(new Path(sourceNN, 
srcPath.toString()));
+        }
+        fullyQualifiedSrcStagingPaths.toArray(new 
Path[fullyQualifiedSrcStagingPaths.size()]);
 
-        DistCpOptions distcpOptions = new DistCpOptions(srcStagingPaths, new 
Path(targetStagingUri));
+        DistCpOptions distcpOptions = new 
DistCpOptions(fullyQualifiedSrcStagingPaths,
+                new Path(fullyQualifiedTargetStagingPath));
         /* setSyncFolder to false to retain dir structure as in source at the 
target. If set to true all files will be
         copied to the same staging sir at target resulting in 
DuplicateFileException in DistCp.
         */

http://git-wip-us.apache.org/repos/asf/falcon/blob/11f20f47/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties
----------------------------------------------------------------------
diff --git 
a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties
 
b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties
index b2d670a..cc70ac4 100644
--- 
a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties
+++ 
b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties
@@ -72,6 +72,7 @@ sourceDatabase=default
 # For DB level replication specify * for sourceTable.
 # For table level replication to replicate multiple tables specify comma 
separated list of tables
 sourceTable=testtable_dr
+## Please specify staging dir in the source without fully qualified domain 
name.
 sourceStagingPath=/apps/hive/tools/dr
 sourceNN=hdfs://localhost:8020
 # Specify kerberos principal required to access source namenode and hive 
servers, optional on non-secure cluster.
@@ -83,6 +84,7 @@ sourceHive2KerberosPrincipal=hive/[email protected]
 targetCluster=backupCluster
 targetMetastoreUri=thrift://localhost:9083
 targetHiveServer2Uri=hive2://localhost:10000
+## Please specify staging dir in the target without fully qualified domain 
name.
 targetStagingPath=/apps/hive/tools/dr
 targetNN=hdfs://localhost:8020
 # Specify kerberos principal required to access target namenode and hive 
servers, optional on non-secure cluster.
@@ -101,4 +103,4 @@ distcpMaxMaps=1
 distcpMapBandwidth=100
 
 ##### Email on failure
-drNotificationReceivers=NA
\ No newline at end of file
+drNotificationReceivers=NA

http://git-wip-us.apache.org/repos/asf/falcon/blob/11f20f47/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery.properties
----------------------------------------------------------------------
diff --git 
a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery.properties
 
b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery.properties
index 42ae30b..c2cbf5d 100644
--- 
a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery.properties
+++ 
b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery.properties
@@ -70,6 +70,7 @@ sourceDatabase=default
 # For DB level replication specify * for sourceTable.
 # For table level replication to replicate multiple tables specify comma 
separated list of tables
 sourceTable=testtable_dr
+## Please specify staging dir in the source without fully qualified domain 
name.
 sourceStagingPath=/apps/hive/tools/dr
 sourceNN=hdfs://localhost:8020
 
@@ -77,6 +78,7 @@ sourceNN=hdfs://localhost:8020
 targetCluster=backupCluster
 targetMetastoreUri=thrift://localhost:9083
 targetHiveServer2Uri=hive2://localhost:10000
+## Please specify staging dir in the target without fully qualified domain 
name.
 targetStagingPath=/apps/hive/tools/dr
 targetNN=hdfs://localhost:8020
 
@@ -91,4 +93,4 @@ distcpMaxMaps=1
 distcpMapBandwidth=100
 
 ##### Email on failure
-drNotificationReceivers=NA
\ No newline at end of file
+drNotificationReceivers=NA

Reply via email to