FALCON-1188 Falcon support for Hive Replication. Contributed by Venkat 
Ranganathan.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/cc1d3840
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/cc1d3840
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/cc1d3840

Branch: refs/heads/0.7
Commit: cc1d3840dd2184f5a2ac5452115d370aab80997e
Parents: 7f6cc6d
Author: Ajay Yadava <[email protected]>
Authored: Tue Aug 11 13:49:02 2015 +0530
Committer: Ajay Yadava <[email protected]>
Committed: Thu Aug 13 14:37:02 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 Installation-steps.txt                          |   4 +
 addons/hivedr/README                            |  80 ++++
 addons/hivedr/pom.xml                           | 207 ++++++++++
 .../apache/falcon/hive/DefaultPartitioner.java  | 317 +++++++++++++++
 .../org/apache/falcon/hive/EventSourcer.java    |  31 ++
 .../java/org/apache/falcon/hive/HiveDRArgs.java | 121 ++++++
 .../org/apache/falcon/hive/HiveDROptions.java   | 175 ++++++++
 .../java/org/apache/falcon/hive/HiveDRTool.java | 378 +++++++++++++++++
 .../falcon/hive/LastReplicatedEvents.java       | 189 +++++++++
 .../falcon/hive/MetaStoreEventSourcer.java      | 202 ++++++++++
 .../org/apache/falcon/hive/Partitioner.java     |  42 ++
 .../falcon/hive/ReplicationEventMetadata.java   |  34 ++
 .../exception/HiveReplicationException.java     |  49 +++
 .../falcon/hive/mapreduce/CopyCommitter.java    |  65 +++
 .../falcon/hive/mapreduce/CopyMapper.java       |  93 +++++
 .../falcon/hive/mapreduce/CopyReducer.java      |  85 ++++
 .../falcon/hive/util/DBReplicationStatus.java   | 213 ++++++++++
 .../apache/falcon/hive/util/DRStatusStore.java  | 104 +++++
 .../apache/falcon/hive/util/DelimiterUtils.java |  30 ++
 .../falcon/hive/util/EventSourcerUtils.java     | 189 +++++++++
 .../org/apache/falcon/hive/util/EventUtils.java | 361 +++++++++++++++++
 .../org/apache/falcon/hive/util/FileUtils.java  |  68 ++++
 .../falcon/hive/util/HiveDRStatusStore.java     | 315 +++++++++++++++
 .../apache/falcon/hive/util/HiveDRUtils.java    |  86 ++++
 .../falcon/hive/util/HiveMetastoreUtils.java    |  92 +++++
 .../falcon/hive/util/ReplicationStatus.java     | 221 ++++++++++
 addons/hivedr/src/main/resources/log4j.xml      |  54 +++
 .../falcon/hive/DBReplicationStatusTest.java    | 230 +++++++++++
 .../java/org/apache/falcon/hive/DRTest.java     |  45 +++
 .../falcon/hive/HiveDRStatusStoreTest.java      | 346 ++++++++++++++++
 .../falcon/hive/ReplicationStatusTest.java      | 137 +++++++
 .../resources/hdfs-replication-template.xml     |  23 +-
 .../resources/hdfs-replication-workflow.xml     |  60 ++-
 .../main/resources/hdfs-replication.properties  |  68 ++--
 .../recipes/hive-disaster-recovery/README.txt   |  58 +++
 addons/recipes/hive-disaster-recovery/pom.xml   |  32 ++
 .../hive-disaster-recovery-secure-template.xml  |  44 ++
 .../hive-disaster-recovery-secure-workflow.xml  | 401 +++++++++++++++++++
 .../hive-disaster-recovery-secure.properties    | 104 +++++
 .../hive-disaster-recovery-template.xml         |  44 ++
 .../hive-disaster-recovery-workflow.xml         | 293 ++++++++++++++
 .../resources/hive-disaster-recovery.properties |  94 +++++
 client/pom.xml                                  |   6 +
 .../java/org/apache/falcon/cli/FalconCLI.java   |  28 +-
 .../org/apache/falcon/client/FalconClient.java  |   4 +-
 .../recipe/HdfsReplicationRecipeTool.java       |  70 ++++
 .../HdfsReplicationRecipeToolOptions.java       |  62 +++
 .../recipe/HiveReplicationRecipeTool.java       | 196 +++++++++
 .../HiveReplicationRecipeToolOptions.java       |  89 ++++
 .../java/org/apache/falcon/recipe/Recipe.java   |  29 ++
 .../org/apache/falcon/recipe/RecipeFactory.java |  44 ++
 .../org/apache/falcon/recipe/RecipeTool.java    | 144 +++----
 .../apache/falcon/recipe/RecipeToolArgs.java    |   5 +-
 .../apache/falcon/recipe/RecipeToolOptions.java |  30 +-
 .../recipe/util/RecipeProcessBuilderUtils.java  | 272 +++++++++++++
 docs/src/site/twiki/InstallationSteps.twiki     |   2 +
 .../service/SharedLibraryHostingService.java    |  16 +-
 pom.xml                                         | 120 ++++--
 replication/pom.xml                             |  10 +-
 src/main/assemblies/distributed-package.xml     |  15 +
 src/main/assemblies/standalone-package.xml      |  16 +
 test-tools/hadoop-webapp/pom.xml                |  23 ++
 webapp/pom.xml                                  |  14 +-
 .../java/org/apache/falcon/cli/FalconCLIIT.java | 195 +++++----
 .../resources/hdfs-replication-template.xml     |  43 ++
 .../test/resources/hdfs-replication.properties  |  45 +++
 webapp/src/test/resources/process.properties    |  25 --
 68 files changed, 6986 insertions(+), 303 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a6dc95b..fc7ee03 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,8 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+    FALCON-1188 Falcon support for Hive Replication(Venkat Ranganathan via 
Ajay Yadava)
+
     FALCON-1297 Falcon Unit which supports Submit and Schedule of jobs(Pavan 
Kumar Kolamuri via Ajay Yadava)
 
     FALCON-1039 Add instance dependency API in falcon (Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/Installation-steps.txt
----------------------------------------------------------------------
diff --git a/Installation-steps.txt b/Installation-steps.txt
index bb92a85..b86d6a1 100644
--- a/Installation-steps.txt
+++ b/Installation-steps.txt
@@ -43,6 +43,8 @@ a. Building falcon from the source release
 [optionally -Dhadoop.version=<<hadoop.version>> can be appended to build for a 
specific version of hadoop]
 *Note:* Falcon drops support for Hadoop-1 and only supports Hadoop-2 from 
Falcon 0.6 onwards
         Falcon build with JDK 1.7 using -noverify option
+        To compile Falcon with Hive Replication, optionally "-P 
hadoop-2,hivedr" can be appended. For this
+        Hive >= 1.2.0 and Oozie >= 4.2.0 should be available.
 
 
 b. Building falcon from the source repository
@@ -55,6 +57,8 @@ b. Building falcon from the source repository
 [optionally -Dhadoop.version=<<hadoop.version>> can be appended to build for a 
specific version of hadoop]
 *Note:* Falcon drops support for Hadoop-1 and only supports Hadoop-2 from 
Falcon 0.6 onwards
         Falcon build with JDK 1.7 using -noverify option
+        To compile Falcon with Hive Replication, optionally "-P 
hadoop-2,hivedr" can be appended. For this
+        Hive >= 1.2.0 and Oozie >= 4.2.0 should be available.
 
 
 2. Deploying Falcon

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/hivedr/README
----------------------------------------------------------------------
diff --git a/addons/hivedr/README b/addons/hivedr/README
new file mode 100644
index 0000000..0b448d3
--- /dev/null
+++ b/addons/hivedr/README
@@ -0,0 +1,80 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+Hive Disaster Recovery
+=======================
+
+Overview
+---------
+
+Falcon provides feature to replicate Hive metadata and data events from one 
hadoop cluster
+to another cluster. This is supported for secure and unsecure cluster through 
Falcon Recipes.
+
+
+Prerequisites
+-------------
+
+Following is the prerequisites to use Hive DR
+
+* Hive 1.2.0+
+* Oozie 4.2.0+
+
+*Note:* Set following properties in hive-site.xml for replicating the Hive 
events:
+    <property>
+        <name>hive.metastore.event.listeners</name>
+        <value>org.apache.hive.hcatalog.listener.DbNotificationListener</value>
+        <description>event listeners that are notified of any metastore 
changes</description>
+    </property>
+
+    <property>
+        <name>hive.metastore.dml.events</name>
+        <value>true</value>
+    </property>
+
+
+Usage
+------
+a. Perform initial bootstrap of Table and Database from one Hadoop cluster to 
another Hadoop cluster
+
+    Table Bootstrap
+    ----------------
+    For bootstrapping table replication, essentially after having turned on 
the DbNotificationListener
+    on the source db, we should do an EXPORT of the table, distcp the export 
over to the destination
+    warehouse, and do an IMPORT over there. Check following Hive Export-Import 
link for syntax details
+    and examples.
+
+    This will set up the destination table so that the events on the source 
cluster that modify the table
+    will then be replicated over.
+
+    Database Bootstrap
+    ------------------
+    For bootstrapping DB replication, first destination DB should be created. 
This step is expected,
+    since DB replication definitions can be set up by users only on 
pre-existing DB’s. Second, we need
+    to export all tables in the source db and import them in the destination 
db, as described above.
+
+
+b. Setup cluster definition
+   $FALCON_HOME/bin/falcon entity -submit -type cluster -file 
/cluster/definition.xml
+
+c. Submit Hive DR recipe
+   $FALCON_HOME/bin/falcon recipe -name hive-disaster-recovery -operation 
HIVE_DISASTER_RECOVERY
+
+
+Recipe templates for Hive DR is available in 
addons/recipe/hive-disaster-recovery and copy it to
+recipe path specified in client.properties.
+
+*Note:* If kerberos security is enabled on cluster, use the secure templates 
for Hive DR from
+        addons/recipe/hive-disaster-recovery
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/hivedr/pom.xml
----------------------------------------------------------------------
diff --git a/addons/hivedr/pom.xml b/addons/hivedr/pom.xml
new file mode 100644
index 0000000..29780ad
--- /dev/null
+++ b/addons/hivedr/pom.xml
@@ -0,0 +1,207 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.falcon</groupId>
+        <artifactId>falcon-main</artifactId>
+        <version>0.7-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    <artifactId>falcon-hive-replication</artifactId>
+    <description>Apache Falcon Hive Replication Module</description>
+    <name>Apache Falcon Hive Replication</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <!-- dependencies are always listed in sorted order by groupId, 
artifactId -->
+        <!-- intra-project -->
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-common</artifactId>
+            <version>${hive.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-exec</artifactId>
+            <version>${hive.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-metastore</artifactId>
+            <version>${hive.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-shims</artifactId>
+            <version>${hive.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hive.hcatalog</groupId>
+            <artifactId>hive-webhcat-java-client</artifactId>
+            <version>${hive.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-jdbc</artifactId>
+            <version>${hive.version}</version>
+            <classifier>standalone</classifier>
+        </dependency>
+        <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+        </dependency>
+        <!-- inter-project -->
+        <dependency>
+            <groupId>javax.jms</groupId>
+            <artifactId>jms</artifactId>
+        </dependency>
+        <!-- test intra-project -->
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-cli</artifactId>
+            <version>${hive.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <!-- test inter-project -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>3.8.1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-core</artifactId>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.springframework</groupId>
+                    <artifactId>spring-context</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>kahadb</artifactId>
+            <version>${activemq.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.falcon</groupId>
+            <artifactId>falcon-test-util</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.falcon</groupId>
+            <artifactId>falcon-hadoop-dependencies</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+    <profiles>
+        <profile>
+            <id>hadoop-2</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+            </activation>
+            <dependencies>
+
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-common</artifactId>
+                    <scope>compile</scope>
+                </dependency>
+
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-hdfs</artifactId>
+                    <scope>compile</scope>
+                </dependency>
+
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-mapreduce-client-core</artifactId>
+                    <scope>compile</scope>
+                    <version>${hadoop.version}</version>
+                </dependency>
+
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-client</artifactId>
+                    <scope>compile</scope>
+                </dependency>
+
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-mapreduce-client-common</artifactId>
+                    <scope>compile</scope>
+                </dependency>
+
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
+                    <scope>compile</scope>
+                </dependency>
+
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-yarn-server-nodemanager</artifactId>
+                    <scope>compile</scope>
+                </dependency>
+
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-auth</artifactId>
+                    <scope>compile</scope>
+                </dependency>
+
+                <dependency>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-distcp</artifactId>
+                    <scope>compile</scope>
+                </dependency>
+            </dependencies>
+        </profile>
+    </profiles>
+
+    <build>
+        <sourceDirectory>${basedir}/src/main/java</sourceDirectory>
+        <testSourceDirectory>${basedir}/src/test/java</testSourceDirectory>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/hivedr/src/main/java/org/apache/falcon/hive/DefaultPartitioner.java
----------------------------------------------------------------------
diff --git 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/DefaultPartitioner.java 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/DefaultPartitioner.java
new file mode 100644
index 0000000..ce4bfab
--- /dev/null
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/DefaultPartitioner.java
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.hive.util.DRStatusStore;
+import org.apache.falcon.hive.util.EventSourcerUtils;
+import org.apache.falcon.hive.util.HiveDRUtils;
+import org.apache.falcon.hive.util.ReplicationStatus;
+import org.apache.hive.hcatalog.api.repl.Command;
+import org.apache.hive.hcatalog.api.repl.ReplicationTask;
+import org.apache.hive.hcatalog.api.repl.StagingDirectoryProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hive.hcatalog.api.HCatNotificationEvent.Scope;
+
+/**
+ * Partitioner for partitioning events for a given DB.
+ */
+public class DefaultPartitioner implements Partitioner {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DefaultPartitioner.class);
+    private EventFilter eventFilter;
+    private final DRStatusStore drStore;
+    private final EventSourcerUtils eventSourcerUtils;
+
+    private enum CMDTYPE {
+        SRC_CMD_TYPE,
+        TGT_CMD_TYPE
+    }
+
+    public DefaultPartitioner(DRStatusStore drStore, EventSourcerUtils 
eventSourcerUtils) {
+        this.drStore = drStore;
+        this.eventSourcerUtils = eventSourcerUtils;
+    }
+
+    private class EventFilter {
+        private final Map<String, Long> eventFilterMap;
+
+        public EventFilter(String sourceMetastoreUri, String 
targetMetastoreUri, String jobName,
+                           String database) throws Exception {
+            eventFilterMap = new HashMap<>();
+            Iterator<ReplicationStatus> replStatusIter = 
drStore.getTableReplicationStatusesInDb(sourceMetastoreUri,
+                    targetMetastoreUri, jobName, database);
+            while (replStatusIter.hasNext()) {
+                ReplicationStatus replStatus = replStatusIter.next();
+                eventFilterMap.put(replStatus.getTable(), 
replStatus.getEventId());
+            }
+        }
+    }
+
+    public ReplicationEventMetadata partition(final HiveDROptions drOptions, 
final String databaseName,
+                                              final Iterator<ReplicationTask> 
taskIter) throws Exception {
+        long lastCounter = 0;
+        String dbName = databaseName.toLowerCase();
+        // init filtering before partitioning
+        this.eventFilter = new EventFilter(drOptions.getSourceMetastoreUri(), 
drOptions.getTargetMetastoreUri(),
+                drOptions.getJobName(), dbName);
+        String srcStagingDirProvider = drOptions.getSourceStagingPath();
+        String dstStagingDirProvider = drOptions.getTargetStagingPath();
+
+        List<Command> dbSrcEventList = Lists.newArrayList();
+        List<Command> dbTgtEventList = Lists.newArrayList();
+
+        Map<String, List<String>> eventMetaFileMap =
+                new HashMap<>();
+        Map<String, List<OutputStream>> outputStreamMap =
+                new HashMap<>();
+
+
+        String srcFilename = null;
+        String tgtFilename = null;
+        OutputStream srcOutputStream = null;
+        OutputStream tgtOutputStream = null;
+
+        while (taskIter.hasNext()) {
+            ReplicationTask task = taskIter.next();
+            if (task.needsStagingDirs()) {
+                task.withSrcStagingDirProvider(new 
StagingDirectoryProvider.TrivialImpl(srcStagingDirProvider,
+                        HiveDRUtils.SEPARATOR));
+                task.withDstStagingDirProvider(new 
StagingDirectoryProvider.TrivialImpl(dstStagingDirProvider,
+                        HiveDRUtils.SEPARATOR));
+            }
+
+            if (task.isActionable()) {
+                Scope eventScope = task.getEvent().getEventScope();
+                String tableName = task.getEvent().getTableName();
+                if (StringUtils.isNotEmpty(tableName)) {
+                    tableName = tableName.toLowerCase();
+                }
+
+                boolean firstEventForTable = (eventScope == Scope.TABLE)
+                        && isFirstEventForTable(eventMetaFileMap, tableName);
+                if (firstEventForTable && (task.getSrcWhCommands() != null || 
task.getDstWhCommands() != null)) {
+                    ++lastCounter;
+                }
+                Iterable<? extends org.apache.hive.hcatalog.api.repl.Command> 
srcCmds = task.getSrcWhCommands();
+                if (srcCmds != null) {
+                    if (eventScope == Scope.DB) {
+                        processDBScopeCommands(dbSrcEventList, srcCmds, 
outputStreamMap, CMDTYPE.SRC_CMD_TYPE);
+                    } else if (eventScope == Scope.TABLE) {
+                        OutputStream srcOut;
+                        if (firstEventForTable) {
+                            srcFilename = 
eventSourcerUtils.getSrcFileName(String.valueOf(lastCounter)).toString();
+                            srcOutputStream = 
eventSourcerUtils.getFileOutputStream(srcFilename);
+                            srcOut = srcOutputStream;
+                        } else {
+                            srcOut = outputStreamMap.get(tableName).get(0);
+                        }
+                        processTableScopeCommands(srcCmds, eventMetaFileMap, 
tableName, dbSrcEventList, srcOut);
+                    } else {
+                        throw new Exception("Event scope is not DB or Table");
+                    }
+                }
+
+
+                Iterable<? extends org.apache.hive.hcatalog.api.repl.Command> 
dstCmds = task.getDstWhCommands();
+                if (dstCmds != null) {
+                    if (eventScope == Scope.DB) {
+                        processDBScopeCommands(dbTgtEventList, dstCmds, 
outputStreamMap, CMDTYPE.TGT_CMD_TYPE);
+                    } else if (eventScope == Scope.TABLE) {
+                        OutputStream tgtOut;
+                        if (firstEventForTable) {
+                            tgtFilename = 
eventSourcerUtils.getTargetFileName(String.valueOf(lastCounter)).toString();
+                            tgtOutputStream = 
eventSourcerUtils.getFileOutputStream(tgtFilename);
+                            tgtOut = tgtOutputStream;
+                        } else {
+                            tgtOut = outputStreamMap.get(tableName).get(1);
+                        }
+                        processTableScopeCommands(dstCmds, eventMetaFileMap, 
tableName, dbTgtEventList, tgtOut);
+                    } else {
+                        throw new Exception("Event scope is not DB or Table");
+                    }
+                }
+
+                // If first table event, update the state data at the end
+                if (firstEventForTable) {
+                    updateStateDataIfFirstTableEvent(tableName, srcFilename, 
tgtFilename, srcOutputStream,
+                            tgtOutputStream, eventMetaFileMap, 
outputStreamMap);
+                }
+            } else {
+                LOG.error("Task is not actionable with event Id : {}", 
task.getEvent().getEventId());
+            }
+        }
+
+        ReplicationEventMetadata eventMetadata = new 
ReplicationEventMetadata();
+        // If there were only DB events for this run
+        if (eventMetaFileMap.isEmpty()) {
+            ++lastCounter;
+            if (!dbSrcEventList.isEmpty()) {
+                srcFilename = 
eventSourcerUtils.getSrcFileName(String.valueOf(lastCounter)).toString();
+                srcOutputStream = 
eventSourcerUtils.getFileOutputStream(srcFilename);
+                eventSourcerUtils.persistReplicationEvents(srcOutputStream, 
dbSrcEventList);
+            }
+
+            if (!dbTgtEventList.isEmpty()) {
+                tgtFilename = 
eventSourcerUtils.getTargetFileName(String.valueOf(lastCounter)).toString();
+                tgtOutputStream = 
eventSourcerUtils.getFileOutputStream(tgtFilename);
+                eventSourcerUtils.persistReplicationEvents(tgtOutputStream, 
dbTgtEventList);
+            }
+
+            // Close the stream
+            eventSourcerUtils.closeOutputStream(srcOutputStream);
+            eventSourcerUtils.closeOutputStream(tgtOutputStream);
+            EventSourcerUtils.updateEventMetadata(eventMetadata, dbName, null, 
srcFilename, tgtFilename);
+        } else {
+            closeAllStreams(outputStreamMap);
+            for (Map.Entry<String, List<String>> entry : 
eventMetaFileMap.entrySet()) {
+                String srcFile = null;
+                String tgtFile = null;
+                if (entry.getValue() != null) {
+                    srcFile = entry.getValue().get(0);
+                    tgtFile = entry.getValue().get(1);
+                }
+                EventSourcerUtils.updateEventMetadata(eventMetadata, dbName, 
entry.getKey(), srcFile, tgtFile);
+            }
+        }
+
+        return eventMetadata;
+    }
+
+    private void updateStateDataIfFirstTableEvent(final String tableName, 
final String srcFilename,
+                                                  final String tgtFilename,
+                                                  final OutputStream 
srcOutputStream,
+                                                  final OutputStream 
tgtOutputStream,
+                                                  Map<String, List<String>> 
eventMetaFileMap,
+                                                  Map<String, 
List<OutputStream>> outputStreamMap) {
+        List<String> files = Arrays.asList(srcFilename, tgtFilename);
+        eventMetaFileMap.put(tableName, files);
+
+        List<OutputStream> streams = Arrays.asList(srcOutputStream, 
tgtOutputStream);
+        outputStreamMap.put(tableName, streams);
+    }
+
+    private void closeAllStreams(final Map<String, List<OutputStream>> 
outputStreamMap) throws Exception {
+        if (outputStreamMap == null || outputStreamMap.isEmpty()) {
+            return;
+        }
+
+        for (Map.Entry<String, List<OutputStream>> entry : 
outputStreamMap.entrySet()) {
+            List<OutputStream> streams = entry.getValue();
+
+            for (OutputStream out : streams) {
+                if (out != null) {
+                    eventSourcerUtils.closeOutputStream(out);
+                }
+            }
+        }
+    }
+
+    private void processDBScopeCommands(final List<Command> dbEventList, final 
Iterable<? extends org.apache.hive
+            .hcatalog.api.repl.Command> cmds, final Map<String, 
List<OutputStream>> outputStreamMap, CMDTYPE cmdType
+    ) throws Exception {
+        addCmdsToDBEventList(dbEventList, cmds);
+
+        /* add DB event to all tables */
+        if (!outputStreamMap.isEmpty()) {
+            addDbEventToAllTablesEventFile(cmds, outputStreamMap, cmdType);
+        }
+    }
+
+    private void processTableScopeCommands(final Iterable<? extends 
org.apache.hive.hcatalog.api.repl.Command> cmds,
+                                           final Map<String,
+                                                   List<String>> 
eventMetaFileMap, String tableName,
+                                           final List<Command> dbEventList, 
final OutputStream out) throws Exception {
+        // First event for this table
+        // Before adding this event, add all the DB events
+        if (isFirstEventForTable(eventMetaFileMap, tableName)) {
+            addDbEventsToTableEventFile(out, dbEventList, tableName);
+        }
+        addTableEventToFile(out, cmds, tableName);
+    }
+
+    private boolean isFirstEventForTable(final Map<String,
+            List<String>> eventMetaFileMap, final String tableName) {
+        List<String> files = eventMetaFileMap.get(tableName);
+        return (files == null || files.isEmpty());
+    }
+
+    private void addCmdsToDBEventList(List<Command> dbEventList, final 
java.lang.Iterable
+            <? extends org.apache.hive.hcatalog.api.repl.Command> cmds) {
+        for (Command cmd : cmds) {
+            dbEventList.add(cmd);
+        }
+    }
+
+    private void addDbEventToAllTablesEventFile(
+            final java.lang.Iterable<? extends 
org.apache.hive.hcatalog.api.repl.Command> cmds,
+            final Map<String, List<OutputStream>> outputStreamMap, final 
CMDTYPE cmdType) throws Exception {
+        for (Map.Entry<String, List<OutputStream>> entry : 
outputStreamMap.entrySet()) {
+            String tableName = entry.getKey();
+            List<OutputStream> streams = entry.getValue();
+            OutputStream out;
+            if (CMDTYPE.SRC_CMD_TYPE == cmdType) {
+                out = streams.get(0);
+            } else {
+                out = streams.get(1);
+            }
+            addTableEventToFile(out, cmds, tableName);
+        }
+    }
+
+    private void addDbEventsToTableEventFile(final OutputStream out, final 
List<Command> dbEventList,
+                                             final String tableName) throws 
Exception {
+        /* First event for the table, add db events before adding this event */
+        addTableEventToFile(out, dbEventList, tableName);
+    }
+
+    private void addTableEventToFile(final OutputStream out,
+                                     final java.lang.Iterable<? extends 
org.apache.hive.hcatalog.api.repl.Command> cmds,
+                                     final String tableName) throws Exception {
+        Long eventId = eventFilter.eventFilterMap.get(tableName);
+        /* If not already processed, add it */
+        for (Command cmd : cmds) {
+            persistEvent(out, eventId, cmd);
+        }
+    }
+
+    private void persistEvent(final OutputStream out, final Long eventId, 
final Command cmd) throws Exception {
+        if (out == null) {
+            LOG.debug("persistEvent : out is null");
+            return;
+        }
+        if (eventId == null || cmd.getEventId() > eventId) {
+            eventSourcerUtils.persistReplicationEvents(out, cmd);
+        }
+    }
+
+    public boolean isPartitioningRequired(final HiveDROptions options) {
+        return (HiveDRUtils.getReplicationType(options.getSourceTables()) == 
HiveDRUtils.ReplicationType.DB);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/hivedr/src/main/java/org/apache/falcon/hive/EventSourcer.java
----------------------------------------------------------------------
diff --git 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/EventSourcer.java 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/EventSourcer.java
new file mode 100644
index 0000000..5f3312c
--- /dev/null
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/EventSourcer.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive;
+
+/**
+ * Source events for each table into a file.
+ */
+public interface EventSourcer {
+    /**
+     * @param inputOptions
+     * @return input filename to mapper
+     */
+    /* Source events for each <db, table> into a file */
+    String sourceEvents(HiveDROptions inputOptions) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java
new file mode 100644
index 0000000..1ad6a62
--- /dev/null
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+
+/**
+ * Arguments for workflow execution.
+ */
+public enum HiveDRArgs {
+
+    // source meta store details
+    SOURCE_CLUSTER("sourceCluster", "source cluster"),
+    SOURCE_METASTORE_URI("sourceMetastoreUri", "source meta store uri"),
+    SOURCE_HS2_URI("sourceHiveServer2Uri", "source HS2 uri"),
+    SOURCE_DATABASE("sourceDatabase", "comma source databases"),
+    SOURCE_TABLE("sourceTable", "comma source tables"),
+    SOURCE_STAGING_PATH("sourceStagingPath", "source staging path for data"),
+
+    // source hadoop endpoints
+    SOURCE_NN("sourceNN", "source name node"),
+    // source security kerberos principals
+    SOURCE_NN_KERBEROS_PRINCIPAL("sourceNNKerberosPrincipal", "Source name 
node kerberos principal", false),
+    
SOURCE_HIVE_METASTORE_KERBEROS_PRINCIPAL("sourceHiveMetastoreKerberosPrincipal",
+            "Source hive metastore kerberos principal", false),
+    SOURCE_HIVE2_KERBEROS_PRINCIPAL("sourceHive2KerberosPrincipal", "Source 
hiveserver2 kerberos principal", false),
+
+    TARGET_CLUSTER("targetCluster", "target cluster"),
+    // target meta store details
+    TARGET_METASTORE_URI("targetMetastoreUri", "source meta store uri"),
+    TARGET_HS2_URI("targetHiveServer2Uri", "source meta store uri"),
+
+    TARGET_STAGING_PATH("targetStagingPath", "source staging path for data"),
+
+    // target hadoop endpoints
+    TARGET_NN("targetNN", "target name node"),
+    // target security kerberos principals
+    TARGET_NN_KERBEROS_PRINCIPAL("targetNNKerberosPrincipal", "Target name 
node kerberos principal", false),
+    
TARGET_HIVE_METASTORE_KERBEROS_PRINCIPAL("targetHiveMetastoreKerberosPrincipal",
+            "Target hive metastore kerberos principal", false),
+    TARGET_HIVE2_KERBEROS_PRINCIPAL("targetHive2KerberosPrincipal", "Target 
hiveserver2 kerberos principal", false),
+
+    CLUSTER_FOR_JOB_NN_KERBEROS_PRINCIPAL("clusterForJobNNKerberosPrincipal",
+            "Namenode kerberos principal of cluster on which replication job 
runs", false),
+
+    // num events
+    MAX_EVENTS("maxEvents", "number of events to process in this run"),
+
+    // tuning params
+    REPLICATION_MAX_MAPS("replicationMaxMaps", "number of maps", false),
+    DISTCP_MAX_MAPS("distcpMaxMaps", "number of maps", false),
+
+    // Map Bandwidth
+    DISTCP_MAP_BANDWIDTH("distcpMapBandwidth", "map bandwidth in mb", false),
+
+    JOB_NAME("drJobName", "unique job name"),
+
+    CLUSTER_FOR_JOB_RUN("clusterForJobRun", "cluster where job runs"),
+    CLUSTER_FOR_JOB_RUN_WRITE_EP("clusterForJobRunWriteEP", "cluster where job 
runs write EP"),
+
+    FALCON_LIBPATH("falconLibPath", "Falcon Lib Path for Jar files", false),
+
+    KEEP_HISTORY("keepHistory", "Keep history of events file generated", 
false),
+    EXECUTION_STAGE("executionStage", "Flag for workflow stage execution", 
false);
+
+    private final String name;
+    private final String description;
+    private final boolean isRequired;
+
+    HiveDRArgs(String name, String description) {
+        this(name, description, true);
+    }
+
+    HiveDRArgs(String name, String description, boolean isRequired) {
+        this.name = name;
+        this.description = description;
+        this.isRequired = isRequired;
+    }
+
+    public Option getOption() {
+        return new Option(this.name, true, this.description);
+    }
+
+    public String getName() {
+        return this.name;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public boolean isRequired() {
+        return isRequired;
+    }
+
+    public String getOptionValue(CommandLine cmd) {
+        return cmd.getOptionValue(this.name);
+    }
+
+    @Override
+    public String toString() {
+        return getName();
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java
----------------------------------------------------------------------
diff --git 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java
new file mode 100644
index 0000000..026f6e3
--- /dev/null
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.hive.exception.HiveReplicationException;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tool Options.
+ */
+public class HiveDROptions {
+    private final Map<HiveDRArgs, String> context;
+
+    protected HiveDROptions(Map<HiveDRArgs, String> context) {
+        this.context = context;
+    }
+
+    public String getValue(HiveDRArgs arg) {
+        return context.get(arg);
+    }
+
+    public Map<HiveDRArgs, String> getContext() {
+        return context;
+    }
+
+    public String getSourceMetastoreUri() {
+        return context.get(HiveDRArgs.SOURCE_METASTORE_URI);
+    }
+
+    public String getSourceMetastoreKerberosPrincipal() {
+        return 
context.get(HiveDRArgs.SOURCE_HIVE_METASTORE_KERBEROS_PRINCIPAL);
+    }
+
+    public String getSourceHive2KerberosPrincipal() {
+        return context.get(HiveDRArgs.SOURCE_HIVE2_KERBEROS_PRINCIPAL);
+    }
+
+    public List<String> getSourceDatabases() {
+        return 
Arrays.asList(context.get(HiveDRArgs.SOURCE_DATABASE).trim().split(","));
+    }
+
+    public List<String> getSourceTables() {
+        return 
Arrays.asList(context.get(HiveDRArgs.SOURCE_TABLE).trim().split(","));
+    }
+
+    public String getSourceStagingPath() throws HiveReplicationException {
+        if 
(StringUtils.isNotEmpty(context.get(HiveDRArgs.SOURCE_STAGING_PATH))) {
+            return context.get(HiveDRArgs.SOURCE_STAGING_PATH) + 
File.separator + getJobName();
+        }
+        throw new HiveReplicationException("Source StagingPath cannot be 
empty");
+    }
+
+    public String getTargetWriteEP() {
+        return context.get(HiveDRArgs.TARGET_NN);
+    }
+
+    public String getTargetMetastoreUri() {
+        return context.get(HiveDRArgs.TARGET_METASTORE_URI);
+    }
+
+    public String getTargetNNKerberosPrincipal() {
+        return context.get(HiveDRArgs.TARGET_NN_KERBEROS_PRINCIPAL);
+    }
+
+    public String getTargetMetastoreKerberosPrincipal() {
+        return 
context.get(HiveDRArgs.TARGET_HIVE_METASTORE_KERBEROS_PRINCIPAL);
+    }
+    public String getTargetHive2KerberosPrincipal() {
+        return context.get(HiveDRArgs.TARGET_HIVE2_KERBEROS_PRINCIPAL);
+    }
+
+    public String getTargetStagingPath() throws HiveReplicationException {
+        if 
(StringUtils.isNotEmpty(context.get(HiveDRArgs.TARGET_STAGING_PATH))) {
+            return context.get(HiveDRArgs.TARGET_STAGING_PATH) + 
File.separator + getJobName();
+        }
+        throw new HiveReplicationException("Target StagingPath cannot be 
empty");
+    }
+
+    public String getReplicationMaxMaps() {
+        return context.get(HiveDRArgs.REPLICATION_MAX_MAPS);
+    }
+
+    public String getJobName() {
+        return context.get(HiveDRArgs.JOB_NAME);
+    }
+
+    public int getMaxEvents() {
+        return Integer.valueOf(context.get(HiveDRArgs.MAX_EVENTS));
+    }
+
+    public boolean shouldKeepHistory() {
+        return Boolean.valueOf(context.get(HiveDRArgs.KEEP_HISTORY));
+    }
+
+    public String getJobClusterWriteEP() {
+        return context.get(HiveDRArgs.CLUSTER_FOR_JOB_RUN_WRITE_EP);
+    }
+
+    public String getJobClusterNNPrincipal() {
+        return context.get(HiveDRArgs.CLUSTER_FOR_JOB_NN_KERBEROS_PRINCIPAL);
+    }
+
+    public void setSourceStagingDir(String path) {
+        context.put(HiveDRArgs.SOURCE_STAGING_PATH, path);
+    }
+
+    public void setTargetStagingDir(String path) {
+        context.put(HiveDRArgs.TARGET_STAGING_PATH, path);
+    }
+
+    public String getExecutionStage() {
+        return context.get(HiveDRArgs.EXECUTION_STAGE);
+    }
+
+    public boolean shouldBlock() {
+        return true;
+    }
+
+    public static HiveDROptions create(String[] args) throws ParseException {
+        Map<HiveDRArgs, String> options = new HashMap<HiveDRArgs, String>();
+
+        CommandLine cmd = getCommand(args);
+        for (HiveDRArgs arg : HiveDRArgs.values()) {
+            String optionValue = arg.getOptionValue(cmd);
+            if (StringUtils.isNotEmpty(optionValue)) {
+                options.put(arg, optionValue);
+            }
+        }
+
+        return new HiveDROptions(options);
+    }
+
+    private static CommandLine getCommand(String[] arguments) throws 
ParseException {
+        Options options = new Options();
+
+        for (HiveDRArgs arg : HiveDRArgs.values()) {
+            addOption(options, arg, arg.isRequired());
+        }
+
+        return new GnuParser().parse(options, arguments, false);
+    }
+
+    private static void addOption(Options options, HiveDRArgs arg, boolean 
isRequired) {
+        Option option = arg.getOption();
+        option.setRequired(isRequired);
+        options.addOption(option);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
new file mode 100644
index 0000000..712efe8
--- /dev/null
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.hive.exception.HiveReplicationException;
+import org.apache.falcon.hive.mapreduce.CopyMapper;
+import org.apache.falcon.hive.mapreduce.CopyReducer;
+import org.apache.falcon.hive.util.DRStatusStore;
+import org.apache.falcon.hive.util.DelimiterUtils;
+import org.apache.falcon.hive.util.EventSourcerUtils;
+import org.apache.falcon.hive.util.FileUtils;
+import org.apache.falcon.hive.util.HiveDRStatusStore;
+import org.apache.falcon.hive.util.HiveDRUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * DR Tool Driver.
+ */
+public class HiveDRTool extends Configured implements Tool {
+
+    private static final String META_PATH_FILE_SUFFIX = ".metapath";
+
+    private FileSystem jobFS;
+    private FileSystem targetClusterFs;
+
+    private HiveDROptions inputOptions;
+    private DRStatusStore drStore;
+    private String eventsMetaFile;
+    private EventSourcerUtils eventSoucerUtil;
+    private Configuration jobConf;
+    private String executionStage;
+
+    public static final FsPermission STAGING_DIR_PERMISSION =
+            new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(HiveDRTool.class);
+
+    public HiveDRTool() {
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+        if (args.length < 1) {
+            usage();
+            return -1;
+        }
+
+        try {
+            init(args);
+        } catch (Throwable e) {
+            LOG.error("Invalid arguments: ", e);
+            System.err.println("Invalid arguments: " + e.getMessage());
+            usage();
+            return -1;
+        }
+
+        try {
+            execute();
+        } catch (Exception e) {
+            System.err.println("Exception encountered " + e.getMessage());
+            e.printStackTrace();
+            LOG.error("Exception encountered, cleaning up staging dirs", e);
+            cleanup();
+            return -1;
+        }
+
+        if 
(inputOptions.getExecutionStage().equalsIgnoreCase(HiveDRUtils.ExecutionStage.IMPORT.name()))
 {
+            cleanup();
+        }
+
+        return 0;
+    }
+
+    private void init(String[] args) throws Exception {
+        LOG.info("Initializing HiveDR");
+        inputOptions = parseOptions(args);
+        LOG.info("Input Options: {}", inputOptions);
+
+        Configuration targetConf = 
FileUtils.getConfiguration(inputOptions.getTargetWriteEP(),
+                inputOptions.getTargetNNKerberosPrincipal());
+        targetClusterFs = FileSystem.get(targetConf);
+        jobConf = 
FileUtils.getConfiguration(inputOptions.getJobClusterWriteEP(),
+                inputOptions.getJobClusterNNPrincipal());
+        jobFS = FileSystem.get(jobConf);
+
+        // init DR status store
+        drStore = new HiveDRStatusStore(targetClusterFs);
+        eventSoucerUtil = new EventSourcerUtils(jobConf, 
inputOptions.shouldKeepHistory(), inputOptions.getJobName());
+    }
+
+    private HiveDROptions parseOptions(String[] args) throws ParseException {
+        return HiveDROptions.create(args);
+    }
+
+    public Job execute() throws Exception {
+        assert inputOptions != null;
+        assert getConf() != null;
+        executionStage = inputOptions.getExecutionStage();
+        LOG.info("Executing Workflow stage : {}", executionStage);
+        if 
(executionStage.equalsIgnoreCase(HiveDRUtils.ExecutionStage.LASTEVENTS.name())) 
{
+            String lastEventsIdFile = getLastEvents(jobConf);
+            LOG.info("Last successfully replicated Event file : {}", 
lastEventsIdFile);
+            return null;
+        } else if 
(executionStage.equalsIgnoreCase(HiveDRUtils.ExecutionStage.EXPORT.name())) {
+            createStagingDirectory();
+            eventsMetaFile = sourceEvents();
+            LOG.info("Sourced Events meta file : {}", eventsMetaFile);
+            if (StringUtils.isEmpty(eventsMetaFile)) {
+                LOG.info("No events to process");
+                return null;
+            } else {
+                /*
+                 * eventsMetaFile contains the events to be processed by 
HiveDr. This file should be available
+                 * for the import action as well. Persist the file at a 
location common to both export and import.
+                 */
+                persistEventsMetafileLocation(eventsMetaFile);
+            }
+        } else if 
(executionStage.equalsIgnoreCase(HiveDRUtils.ExecutionStage.IMPORT.name())) {
+            // read the location of eventsMetaFile from hdfs
+            eventsMetaFile = getEventsMetaFileLocation();
+            if (StringUtils.isEmpty(eventsMetaFile)) {
+                LOG.info("No events to process");
+                return null;
+            }
+        } else {
+            throw new HiveReplicationException("Invalid Execution stage : " + 
inputOptions.getExecutionStage());
+        }
+
+        Job job = createJob();
+        job.submit();
+
+        String jobID = job.getJobID().toString();
+        job.getConfiguration().set("HIVEDR_JOB_ID", jobID);
+
+        LOG.info("HiveDR job-id: {}", jobID);
+        if (inputOptions.shouldBlock() && !job.waitForCompletion(true)) {
+            throw new IOException("HiveDR failure: Job " + jobID + " has 
failed: "
+                    + job.getStatus().getFailureInfo());
+        }
+
+        return job;
+    }
+
+    private Job createJob() throws Exception {
+        String jobName = "hive-dr" + executionStage;
+        String userChosenName = getConf().get(JobContext.JOB_NAME);
+        if (userChosenName != null) {
+            jobName += ": " + userChosenName;
+        }
+        Job job = Job.getInstance(getConf());
+        job.setJobName(jobName);
+        job.setJarByClass(CopyMapper.class);
+        job.setMapperClass(CopyMapper.class);
+        job.setReducerClass(CopyReducer.class);
+        
job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.NLineInputFormat.class);
+
+        job.setOutputFormatClass(NullOutputFormat.class);
+        job.setMapOutputKeyClass(Text.class);
+        job.setMapOutputValueClass(Text.class);
+
+        job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false");
+        job.getConfiguration().set(JobContext.NUM_MAPS,
+                String.valueOf(inputOptions.getReplicationMaxMaps()));
+
+        for (HiveDRArgs args : HiveDRArgs.values()) {
+            if (inputOptions.getValue(args) != null) {
+                job.getConfiguration().set(args.getName(), 
inputOptions.getValue(args));
+            } else {
+                job.getConfiguration().set(args.getName(), "null");
+            }
+        }
+        job.getConfiguration().set(FileInputFormat.INPUT_DIR, eventsMetaFile);
+
+        return job;
+    }
+
+    private void createStagingDirectory() throws IOException, 
HiveReplicationException {
+        Path sourceStagingPath = new Path(inputOptions.getSourceStagingPath());
+        Path targetStagingPath = new Path(inputOptions.getTargetStagingPath());
+        LOG.info("Source staging path: {}", sourceStagingPath);
+        if (!FileSystem.mkdirs(jobFS, sourceStagingPath, 
STAGING_DIR_PERMISSION)) {
+            throw new IOException("mkdir failed for " + sourceStagingPath);
+        }
+
+        LOG.info("Target staging path: {}", targetStagingPath);
+        if (!FileSystem.mkdirs(targetClusterFs, targetStagingPath, 
STAGING_DIR_PERMISSION)) {
+            throw new IOException("mkdir failed for " + targetStagingPath);
+        }
+    }
+
+    private void cleanStagingDirectory() throws HiveReplicationException {
+        LOG.info("Cleaning staging directories");
+        Path sourceStagingPath = new Path(inputOptions.getSourceStagingPath());
+        Path targetStagingPath = new Path(inputOptions.getTargetStagingPath());
+        try {
+            if (jobFS.exists(sourceStagingPath)) {
+                jobFS.delete(sourceStagingPath, true);
+            }
+
+            if (targetClusterFs.exists(targetStagingPath)) {
+                targetClusterFs.delete(targetStagingPath, true);
+            }
+        } catch (IOException e) {
+            LOG.error("Unable to cleanup staging dir:", e);
+        }
+    }
+
+    private String sourceEvents() throws Exception {
+        MetaStoreEventSourcer defaultSourcer = null;
+        String inputFilename = null;
+        String lastEventsIdFile = FileUtils.DEFAULT_EVENT_STORE_PATH 
+File.separator+inputOptions.getJobName()+"/"
+                +inputOptions.getJobName()+".id";
+        Map<String, Long> lastEventsIdMap = getLastDBTableEvents(new 
Path(lastEventsIdFile));
+        try {
+            defaultSourcer = new 
MetaStoreEventSourcer(inputOptions.getSourceMetastoreUri(),
+                    inputOptions.getSourceMetastoreKerberosPrincipal(), 
inputOptions.getSourceHive2KerberosPrincipal(),
+                    new DefaultPartitioner(drStore, eventSoucerUtil), 
eventSoucerUtil, lastEventsIdMap);
+            inputFilename = defaultSourcer.sourceEvents(inputOptions);
+        } finally {
+            if (defaultSourcer != null) {
+                defaultSourcer.cleanUp();
+            }
+        }
+
+        return inputFilename;
+    }
+
+    private String getLastEvents(Configuration conf) throws Exception {
+        LastReplicatedEvents lastEvents = new LastReplicatedEvents(conf,
+                inputOptions.getTargetMetastoreUri(),
+                inputOptions.getTargetMetastoreKerberosPrincipal(),
+                inputOptions.getTargetHive2KerberosPrincipal(),
+                drStore, inputOptions);
+        String eventIdFile = lastEvents.getLastEvents(inputOptions);
+        lastEvents.cleanUp();
+        return eventIdFile;
+    }
+
+    private Map<String, Long> getLastDBTableEvents(Path lastEventIdFile) 
throws Exception {
+        Map<String, Long> lastEventsIdMap = new HashMap<String, Long>();
+        BufferedReader in = new BufferedReader(new 
InputStreamReader(jobFS.open(lastEventIdFile)));
+        try {
+            String line;
+            while ((line=in.readLine())!=null) {
+                String[] field = line.trim().split(DelimiterUtils.TAB_DELIM, 
-1);
+                lastEventsIdMap.put(field[0], Long.parseLong(field[1]));
+            }
+        } catch (Exception e) {
+            throw new IOException(e);
+        } finally {
+            IOUtils.closeQuietly(in);
+        }
+
+        return lastEventsIdMap;
+    }
+
+    public static void main(String[] args) {
+        int exitCode;
+        try {
+            HiveDRTool hiveDRTool = new HiveDRTool();
+            exitCode = ToolRunner.run(HiveDRUtils.getDefaultConf(), 
hiveDRTool, args);
+        } catch (Exception e) {
+            LOG.error("Couldn't complete HiveDR operation: ", e);
+            exitCode = -1;
+        }
+
+        System.exit(exitCode);
+    }
+
+    private void cleanInputDir() {
+        eventSoucerUtil.cleanUpEventInputDir();
+    }
+
+    private synchronized void cleanup() throws HiveReplicationException {
+        cleanStagingDirectory();
+        cleanInputDir();
+        cleanTempFiles();
+        try {
+            if (jobFS != null) {
+                jobFS.close();
+            }
+            if (targetClusterFs != null) {
+                targetClusterFs.close();
+            }
+        } catch (IOException e) {
+            LOG.error("Closing FS failed", e);
+        }
+    }
+
+    private void cleanTempFiles() {
+        Path eventsDirPath = new Path(FileUtils.DEFAULT_EVENT_STORE_PATH, 
inputOptions.getJobName());
+        Path metaFilePath = new Path(eventsDirPath.toString(), 
inputOptions.getJobName() + META_PATH_FILE_SUFFIX);
+        Path eventsFilePath = new Path(eventsDirPath.toString(), 
inputOptions.getJobName() + ".id");
+
+        try {
+            if (jobFS.exists(metaFilePath)) {
+                jobFS.delete(metaFilePath, true);
+            }
+            if (jobFS.exists(eventsFilePath)) {
+                jobFS.delete(eventsFilePath, true);
+            }
+        } catch (IOException e) {
+            LOG.error("Deleting Temp files failed", e);
+        }
+    }
+
+    public void persistEventsMetafileLocation(final String eventMetaFilePath) 
throws IOException {
+        Path eventsDirPath = new Path(FileUtils.DEFAULT_EVENT_STORE_PATH, 
inputOptions.getJobName());
+        Path metaFilePath = new Path(eventsDirPath.toString(), 
inputOptions.getJobName() + META_PATH_FILE_SUFFIX);
+
+        OutputStream out = null;
+        try {
+            out = FileSystem.create(jobFS, metaFilePath, 
FileUtils.FS_PERMISSION_700);
+            out.write(eventMetaFilePath.getBytes());
+            out.flush();
+        } finally {
+            IOUtils.closeQuietly(out);
+        }
+    }
+
+    private String getEventsMetaFileLocation() throws IOException {
+        Path eventsDirPath = new Path(FileUtils.DEFAULT_EVENT_STORE_PATH, 
inputOptions.getJobName());
+        Path metaFilePath = new Path(eventsDirPath.toString(), 
inputOptions.getJobName() + META_PATH_FILE_SUFFIX);
+        String line = null;
+        if (jobFS.exists(metaFilePath)) {
+            BufferedReader in = new BufferedReader(new 
InputStreamReader(jobFS.open(metaFilePath)));
+            line = in.readLine();
+            in.close();
+        }
+        return line;
+    }
+
+
+    public static void usage() {
+        System.out.println("Usage: hivedrtool -option value ....");
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/hivedr/src/main/java/org/apache/falcon/hive/LastReplicatedEvents.java
----------------------------------------------------------------------
diff --git 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/LastReplicatedEvents.java 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/LastReplicatedEvents.java
new file mode 100644
index 0000000..94f936c
--- /dev/null
+++ 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/LastReplicatedEvents.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.falcon.hive.exception.HiveReplicationException;
+import org.apache.falcon.hive.util.DRStatusStore;
+import org.apache.falcon.hive.util.DelimiterUtils;
+import org.apache.falcon.hive.util.FileUtils;
+import org.apache.falcon.hive.util.HiveDRUtils;
+import org.apache.falcon.hive.util.HiveMetastoreUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hive.hcatalog.api.HCatClient;
+import org.apache.hive.hcatalog.api.HCatTable;
+import org.apache.hive.hcatalog.api.repl.ReplicationUtils;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Sources meta store change events from Hive.
+ */
+public class LastReplicatedEvents {
+    private static final Logger LOG = 
LoggerFactory.getLogger(LastReplicatedEvents.class);
+    private final HCatClient targetMetastoreClient;
+    private final DRStatusStore drStore;
+    private final FileSystem jobFS;
+    private Path eventsInputDirPath;
+
+    /* TODO handle cases when no events. files will be empty and lists will be 
empty */
+    public LastReplicatedEvents(Configuration conf, String targetMetastoreUri,
+                                String targetMetastoreKerberosPrincipal,
+                                String targetHive2KerberosPrincipal,
+                                DRStatusStore drStore, HiveDROptions 
inputOptions) throws Exception {
+        targetMetastoreClient = 
HiveMetastoreUtils.initializeHiveMetaStoreClient(targetMetastoreUri,
+                targetMetastoreKerberosPrincipal, 
targetHive2KerberosPrincipal);
+        jobFS = FileSystem.get(conf);
+        this.drStore = drStore;
+        init(inputOptions.getJobName());
+    }
+
+    private void init(final String jobName) throws Exception {
+        // Create base dir to store events on cluster where job is running
+        Path dir = new Path(FileUtils.DEFAULT_EVENT_STORE_PATH);
+        // Validate base path
+        FileUtils.validatePath(jobFS, new 
Path(DRStatusStore.BASE_DEFAULT_STORE_PATH));
+
+        if (!jobFS.exists(dir)) {
+            if (!jobFS.mkdirs(dir)) {
+                throw new Exception("Creating directory failed: " + dir);
+            }
+        }
+
+        eventsInputDirPath = new Path(FileUtils.DEFAULT_EVENT_STORE_PATH, 
jobName);
+
+        if (!jobFS.exists(eventsInputDirPath)) {
+            if (!jobFS.mkdirs(eventsInputDirPath)) {
+                throw new Exception("Creating directory failed: " + 
eventsInputDirPath);
+            }
+        }
+    }
+
+    public String getLastEvents(HiveDROptions inputOptions) throws Exception {
+        HiveDRUtils.ReplicationType replicationType = 
HiveDRUtils.getReplicationType(inputOptions.getSourceTables());
+        LOG.info("Obtaining last events for replicationType : {}", 
replicationType);
+        HashMap<String, Long> lastEvents = new HashMap<String, Long>();
+        if (replicationType == HiveDRUtils.ReplicationType.DB) {
+            List<String> dbNames = inputOptions.getSourceDatabases();
+            for (String db : dbNames) {
+                lastEvents.put(db, getLastSavedEventId(inputOptions, db, 
null));
+            }
+        } else {
+            List<String> tableNames = inputOptions.getSourceTables();
+            String db = inputOptions.getSourceDatabases().get(0);
+            for (String tableName : tableNames) {
+                lastEvents.put(db + "." + tableName, 
getLastSavedEventId(inputOptions, db, tableName));
+            }
+        }
+
+        return persistLastEventsToFile(lastEvents, inputOptions.getJobName());
+    }
+
+    private long getLastSavedEventId(HiveDROptions inputOptions, final String 
dbName,
+                                     final String tableName) throws Exception {
+        HiveDRUtils.ReplicationType replicationType = 
HiveDRUtils.getReplicationType(inputOptions.getSourceTables());
+        String jobName = inputOptions.getJobName();
+        String sourceMetastoreUri = inputOptions.getSourceMetastoreUri();
+        String targetMetastoreUri = inputOptions.getTargetMetastoreUri();
+
+        long eventId = 0;
+        if (HiveDRUtils.ReplicationType.DB == replicationType) {
+            eventId = drStore.getReplicationStatus(sourceMetastoreUri, 
targetMetastoreUri,
+                    jobName, dbName).getEventId();
+        } else if (HiveDRUtils.ReplicationType.TABLE == replicationType) {
+            eventId = drStore.getReplicationStatus(sourceMetastoreUri, 
targetMetastoreUri,
+                    jobName, dbName, tableName).getEventId();
+        }
+
+        if (eventId == -1) {
+            if (HiveDRUtils.ReplicationType.DB == replicationType) {
+                /*
+                 * API to get last repl ID for a DB is very expensive, so Hive 
does not want to make it public.
+                 * HiveDrTool finds last repl id for DB by finding min last 
repl id of all tables.
+                 */
+                eventId = getLastReplicationIdForDatabase(dbName);
+            } else {
+                HCatTable table = targetMetastoreClient.getTable(dbName, 
tableName);
+                eventId = ReplicationUtils.getLastReplicationId(table);
+            }
+        }
+        LOG.info("Last saved eventId : {}", eventId);
+        return eventId;
+    }
+
+    private long getLastReplicationIdForDatabase(String databaseName) throws 
HiveReplicationException {
+        /*
+         * This is a very expensive method and should only be called during 
first dbReplication instance.
+         */
+        long eventId = Long.MAX_VALUE;
+        try {
+            List<String> tableList = 
targetMetastoreClient.listTableNamesByPattern(databaseName, "*");
+            for (String tableName : tableList) {
+                long temp = ReplicationUtils.getLastReplicationId(
+                        targetMetastoreClient.getTable(databaseName, 
tableName));
+                if (temp < eventId) {
+                    eventId = temp;
+                }
+            }
+            return (eventId == Long.MAX_VALUE) ? 0 : eventId;
+        } catch (HCatException e) {
+            throw new HiveReplicationException("Unable to find last 
replication id for database "
+                    + databaseName, e);
+        }
+    }
+
+    public String persistLastEventsToFile(final HashMap<String, Long> 
lastEvents,
+                                          final String identifier) throws 
IOException {
+        if (lastEvents.size()!=0) {
+            Path eventsFile = new Path(eventsInputDirPath.toString(), 
identifier+".id");
+            OutputStream out = null;
+
+            try {
+                out = FileSystem.create(jobFS, eventsFile, 
FileUtils.FS_PERMISSION_700);
+                for (Map.Entry<String, Long> entry : lastEvents.entrySet()) {
+                    out.write(entry.getKey().getBytes());
+                    out.write(DelimiterUtils.TAB_DELIM.getBytes());
+                    out.write(String.valueOf(entry.getValue()).getBytes());
+                    out.write(DelimiterUtils.NEWLINE_DELIM.getBytes());
+                }
+                out.flush();
+            } finally {
+                IOUtils.closeQuietly(out);
+            }
+            return jobFS.makeQualified(eventsFile).toString();
+        } else {
+            return null;
+        }
+    }
+
+    public void cleanUp() throws Exception {
+        if (targetMetastoreClient != null) {
+            targetMetastoreClient.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/hivedr/src/main/java/org/apache/falcon/hive/MetaStoreEventSourcer.java
----------------------------------------------------------------------
diff --git 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/MetaStoreEventSourcer.java 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/MetaStoreEventSourcer.java
new file mode 100644
index 0000000..6f3fe8f
--- /dev/null
+++ 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/MetaStoreEventSourcer.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.hive.util.EventSourcerUtils;
+import org.apache.falcon.hive.util.HiveDRUtils;
+import org.apache.falcon.hive.util.HiveMetastoreUtils;
+import org.apache.hive.hcatalog.api.HCatClient;
+import org.apache.hive.hcatalog.api.repl.ReplicationTask;
+import org.apache.hive.hcatalog.api.repl.StagingDirectoryProvider;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.OutputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Sources meta store change events from Hive.
+ */
+public class MetaStoreEventSourcer implements EventSourcer {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MetaStoreEventSourcer.class);
+
+    private final HCatClient sourceMetastoreClient;
+    private final Partitioner partitioner;
+    private final EventSourcerUtils eventSourcerUtils;
+    private final ReplicationEventMetadata eventMetadata;
+    private Map<String, Long> lastEventsIdMap = null;
+    private long lastCounter;
+
+    /* TODO handle cases when no events. files will be empty and lists will be 
empty */
+    public MetaStoreEventSourcer(String sourceMetastoreUri, String 
sourceMetastoreKerberosPrincipal,
+                                 String sourceHive2KerberosPrincipal, 
Partitioner partitioner,
+                                 EventSourcerUtils eventSourcerUtils,  
Map<String, Long> lastEventsIdMap)
+        throws Exception {
+
+        sourceMetastoreClient = 
HiveMetastoreUtils.initializeHiveMetaStoreClient(sourceMetastoreUri,
+                sourceMetastoreKerberosPrincipal, 
sourceHive2KerberosPrincipal);
+        eventMetadata = new ReplicationEventMetadata();
+        this.partitioner = partitioner;
+        this.eventSourcerUtils = eventSourcerUtils;
+        this.lastEventsIdMap = lastEventsIdMap;
+    }
+
+    public String sourceEvents(HiveDROptions inputOptions) throws Exception {
+        HiveDRUtils.ReplicationType replicationType = 
HiveDRUtils.getReplicationType(inputOptions.getSourceTables());
+        LOG.info("Sourcing replication events for type : {}", replicationType);
+        if (replicationType == HiveDRUtils.ReplicationType.DB) {
+            List<String> dbNames = inputOptions.getSourceDatabases();
+            for (String db : dbNames) {
+                ++lastCounter;
+                sourceEventsForDb(inputOptions, db);
+            }
+        } else {
+            List<String> tableNames = inputOptions.getSourceTables();
+            String db = inputOptions.getSourceDatabases().get(0);
+            for (String tableName : tableNames) {
+                ++lastCounter;
+                sourceEventsForTable(inputOptions, db, tableName);
+            }
+        }
+
+        if (eventMetadata.getEventFileMetadata() == null || 
eventMetadata.getEventFileMetadata().isEmpty()) {
+            LOG.info("No events for tables for the request db: {} , Tables : 
{}", inputOptions.getSourceDatabases(),
+                    inputOptions.getSourceTables());
+            eventSourcerUtils.cleanUpEventInputDir();
+            return null;
+        } else {
+            return eventSourcerUtils.persistToMetaFile(eventMetadata, 
inputOptions.getJobName());
+        }
+    }
+
+    private void sourceEventsForDb(HiveDROptions inputOptions, String dbName) 
throws Exception {
+        Iterator<ReplicationTask> replicationTaskIter = 
sourceReplicationEvents(getLastSavedEventId(dbName, null),
+                inputOptions.getMaxEvents(), dbName, null);
+        if (replicationTaskIter == null || !replicationTaskIter.hasNext()) {
+            LOG.info("No events for db: {}", dbName);
+        }
+        processEvents(dbName, null, inputOptions, replicationTaskIter);
+    }
+
+    private void sourceEventsForTable(HiveDROptions inputOptions, String 
dbName, String tableName)
+        throws Exception {
+        Iterator<ReplicationTask> replicationTaskIter = 
sourceReplicationEvents(getLastSavedEventId(dbName, tableName),
+                inputOptions.getMaxEvents(), dbName, tableName
+        );
+        if (replicationTaskIter == null || !replicationTaskIter.hasNext()) {
+            LOG.info("No events for db.table: {}.{}", dbName, tableName);
+        }
+        processEvents(dbName, tableName, inputOptions, replicationTaskIter);
+    }
+
+    private void processEvents(String dbName, String tableName, HiveDROptions 
inputOptions,
+                               Iterator<ReplicationTask> replicationTaskIter) 
throws Exception {
+        if (partitioner.isPartitioningRequired(inputOptions)) {
+            ReplicationEventMetadata dbEventMetadata = 
partitioner.partition(inputOptions, dbName, replicationTaskIter);
+
+            if (dbEventMetadata == null || 
dbEventMetadata.getEventFileMetadata() == null
+                    || dbEventMetadata.getEventFileMetadata().isEmpty()) {
+                LOG.info("No events for db: {} , Table : {}", dbName, 
tableName);
+            } else {
+                EventSourcerUtils.updateEventMetadata(eventMetadata, 
dbEventMetadata);
+            }
+        } else {
+            processTableReplicationEvents(replicationTaskIter, dbName, 
tableName,
+                    inputOptions.getSourceStagingPath(), 
inputOptions.getTargetStagingPath());
+        }
+    }
+
+    private long getLastSavedEventId(final String dbName, final String 
tableName) throws Exception {
+        String key = dbName;
+        if (StringUtils.isNotEmpty(tableName)) {
+            key = dbName + "." + tableName;
+        }
+        long eventId = lastEventsIdMap.get(key);
+        LOG.info("LastSavedEventId eventId for {} : {}", key, eventId);
+        return eventId;
+    }
+
+    private Iterator<ReplicationTask> sourceReplicationEvents(long 
lastEventId, int maxEvents, String dbName,
+                                                              String 
tableName) throws Exception {
+        try {
+            return sourceMetastoreClient.getReplicationTasks(lastEventId, 
maxEvents, dbName, tableName);
+        } catch (HCatException e) {
+            throw new Exception("Exception getting replication events " + 
e.getMessage(), e);
+        }
+    }
+
+
+    private void processTableReplicationEvents(Iterator<ReplicationTask> 
taskIter, String dbName,
+                                               String tableName, String 
srcStagingDirProvider,
+                                               String dstStagingDirProvider) 
throws Exception {
+        String srcFilename = null;
+        String tgtFilename = null;
+        OutputStream srcOutputStream = null;
+        OutputStream tgtOutputStream = null;
+
+        while (taskIter.hasNext()) {
+            ReplicationTask task = taskIter.next();
+            if (task.needsStagingDirs()) {
+                task.withSrcStagingDirProvider(new 
StagingDirectoryProvider.TrivialImpl(srcStagingDirProvider,
+                        HiveDRUtils.SEPARATOR));
+                task.withDstStagingDirProvider(new 
StagingDirectoryProvider.TrivialImpl(dstStagingDirProvider,
+                        HiveDRUtils.SEPARATOR));
+            }
+
+            if (task.isActionable()) {
+                Iterable<? extends org.apache.hive.hcatalog.api.repl.Command> 
srcCmds = task.getSrcWhCommands();
+                if (srcCmds != null) {
+                    if (StringUtils.isEmpty(srcFilename)) {
+                        srcFilename = 
eventSourcerUtils.getSrcFileName(String.valueOf(lastCounter)).toString();
+                        srcOutputStream = 
eventSourcerUtils.getFileOutputStream(srcFilename);
+                    }
+                    
eventSourcerUtils.persistReplicationEvents(srcOutputStream, srcCmds);
+                }
+
+
+                Iterable<? extends org.apache.hive.hcatalog.api.repl.Command> 
dstCmds = task.getDstWhCommands();
+                if (dstCmds != null) {
+                    if (StringUtils.isEmpty(tgtFilename)) {
+                        tgtFilename = 
eventSourcerUtils.getTargetFileName(String.valueOf(lastCounter)).toString();
+                        tgtOutputStream = 
eventSourcerUtils.getFileOutputStream(tgtFilename);
+                    }
+                    
eventSourcerUtils.persistReplicationEvents(tgtOutputStream, dstCmds);
+                }
+
+            } else {
+                LOG.error("Task is not actionable with event Id : {}", 
task.getEvent().getEventId());
+            }
+        }
+        // Close the stream
+        eventSourcerUtils.closeOutputStream(srcOutputStream);
+        eventSourcerUtils.closeOutputStream(tgtOutputStream);
+        EventSourcerUtils.updateEventMetadata(eventMetadata, dbName, 
tableName, srcFilename, tgtFilename);
+    }
+
+    public void cleanUp() throws Exception {
+        if (sourceMetastoreClient != null) {
+            sourceMetastoreClient.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/hivedr/src/main/java/org/apache/falcon/hive/Partitioner.java
----------------------------------------------------------------------
diff --git 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/Partitioner.java 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/Partitioner.java
new file mode 100644
index 0000000..25b8bd6
--- /dev/null
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/Partitioner.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive;
+
+import org.apache.hive.hcatalog.api.repl.ReplicationTask;
+
+import java.util.Iterator;
+
+/**
+ * Partition hive events.
+ */
+public interface Partitioner {
+    /**
+     * Partition events.
+     *
+     * @param options                 Hive dr options.
+     * @param dbName                  Database name.
+     * @param replicationTaskIterator Repl task iterator.
+     * @return ReplicationEventMetadata
+     */
+    ReplicationEventMetadata partition(final HiveDROptions options,
+                                       final String dbName,
+                                       final Iterator<ReplicationTask> 
replicationTaskIterator) throws Exception;
+
+    boolean isPartitioningRequired(final HiveDROptions options);
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/hivedr/src/main/java/org/apache/falcon/hive/ReplicationEventMetadata.java
----------------------------------------------------------------------
diff --git 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/ReplicationEventMetadata.java
 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/ReplicationEventMetadata.java
new file mode 100644
index 0000000..79e0ded
--- /dev/null
+++ 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/ReplicationEventMetadata.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive;
+
+import java.util.Map;
+import java.util.HashMap;
+
+/**
+ * Replication event meta data class.
+ */
+public class ReplicationEventMetadata {
+
+    private Map<String, String> eventFileMetadata = new HashMap<>();
+
+    public Map<String, String> getEventFileMetadata() {
+        return eventFileMetadata;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/hivedr/src/main/java/org/apache/falcon/hive/exception/HiveReplicationException.java
----------------------------------------------------------------------
diff --git 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/exception/HiveReplicationException.java
 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/exception/HiveReplicationException.java
new file mode 100644
index 0000000..0baf6d8
--- /dev/null
+++ 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/exception/HiveReplicationException.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.hive.exception;
+
+/**
+ * Wrapper class for HiveReplication exceptions.
+ */
+public class HiveReplicationException extends Exception {
+
+    /**
+     * @param e Exception
+     */
+    public HiveReplicationException(Throwable e) {
+        super(e);
+    }
+
+    public HiveReplicationException(String message, Throwable e) {
+        super(message, e);
+    }
+
+    /**
+     * @param message - custom exception message
+     */
+    public HiveReplicationException(String message) {
+        super(message);
+    }
+
+    /**
+     *
+     */
+    private static final long serialVersionUID = -1475818869309247014L;
+
+}

Reply via email to