http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/DefaultPartitioner.java
----------------------------------------------------------------------
diff --git 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/DefaultPartitioner.java 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/DefaultPartitioner.java
deleted file mode 100644
index ce4bfab..0000000
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/DefaultPartitioner.java
+++ /dev/null
@@ -1,317 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.hive;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.hive.util.DRStatusStore;
-import org.apache.falcon.hive.util.EventSourcerUtils;
-import org.apache.falcon.hive.util.HiveDRUtils;
-import org.apache.falcon.hive.util.ReplicationStatus;
-import org.apache.hive.hcatalog.api.repl.Command;
-import org.apache.hive.hcatalog.api.repl.ReplicationTask;
-import org.apache.hive.hcatalog.api.repl.StagingDirectoryProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.hive.hcatalog.api.HCatNotificationEvent.Scope;
-
-/**
- * Partitioner for partitioning events for a given DB.
- */
-public class DefaultPartitioner implements Partitioner {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(DefaultPartitioner.class);
-    private EventFilter eventFilter;
-    private final DRStatusStore drStore;
-    private final EventSourcerUtils eventSourcerUtils;
-
-    private enum CMDTYPE {
-        SRC_CMD_TYPE,
-        TGT_CMD_TYPE
-    }
-
-    public DefaultPartitioner(DRStatusStore drStore, EventSourcerUtils 
eventSourcerUtils) {
-        this.drStore = drStore;
-        this.eventSourcerUtils = eventSourcerUtils;
-    }
-
-    private class EventFilter {
-        private final Map<String, Long> eventFilterMap;
-
-        public EventFilter(String sourceMetastoreUri, String 
targetMetastoreUri, String jobName,
-                           String database) throws Exception {
-            eventFilterMap = new HashMap<>();
-            Iterator<ReplicationStatus> replStatusIter = 
drStore.getTableReplicationStatusesInDb(sourceMetastoreUri,
-                    targetMetastoreUri, jobName, database);
-            while (replStatusIter.hasNext()) {
-                ReplicationStatus replStatus = replStatusIter.next();
-                eventFilterMap.put(replStatus.getTable(), 
replStatus.getEventId());
-            }
-        }
-    }
-
-    public ReplicationEventMetadata partition(final HiveDROptions drOptions, 
final String databaseName,
-                                              final Iterator<ReplicationTask> 
taskIter) throws Exception {
-        long lastCounter = 0;
-        String dbName = databaseName.toLowerCase();
-        // init filtering before partitioning
-        this.eventFilter = new EventFilter(drOptions.getSourceMetastoreUri(), 
drOptions.getTargetMetastoreUri(),
-                drOptions.getJobName(), dbName);
-        String srcStagingDirProvider = drOptions.getSourceStagingPath();
-        String dstStagingDirProvider = drOptions.getTargetStagingPath();
-
-        List<Command> dbSrcEventList = Lists.newArrayList();
-        List<Command> dbTgtEventList = Lists.newArrayList();
-
-        Map<String, List<String>> eventMetaFileMap =
-                new HashMap<>();
-        Map<String, List<OutputStream>> outputStreamMap =
-                new HashMap<>();
-
-
-        String srcFilename = null;
-        String tgtFilename = null;
-        OutputStream srcOutputStream = null;
-        OutputStream tgtOutputStream = null;
-
-        while (taskIter.hasNext()) {
-            ReplicationTask task = taskIter.next();
-            if (task.needsStagingDirs()) {
-                task.withSrcStagingDirProvider(new 
StagingDirectoryProvider.TrivialImpl(srcStagingDirProvider,
-                        HiveDRUtils.SEPARATOR));
-                task.withDstStagingDirProvider(new 
StagingDirectoryProvider.TrivialImpl(dstStagingDirProvider,
-                        HiveDRUtils.SEPARATOR));
-            }
-
-            if (task.isActionable()) {
-                Scope eventScope = task.getEvent().getEventScope();
-                String tableName = task.getEvent().getTableName();
-                if (StringUtils.isNotEmpty(tableName)) {
-                    tableName = tableName.toLowerCase();
-                }
-
-                boolean firstEventForTable = (eventScope == Scope.TABLE)
-                        && isFirstEventForTable(eventMetaFileMap, tableName);
-                if (firstEventForTable && (task.getSrcWhCommands() != null || 
task.getDstWhCommands() != null)) {
-                    ++lastCounter;
-                }
-                Iterable<? extends org.apache.hive.hcatalog.api.repl.Command> 
srcCmds = task.getSrcWhCommands();
-                if (srcCmds != null) {
-                    if (eventScope == Scope.DB) {
-                        processDBScopeCommands(dbSrcEventList, srcCmds, 
outputStreamMap, CMDTYPE.SRC_CMD_TYPE);
-                    } else if (eventScope == Scope.TABLE) {
-                        OutputStream srcOut;
-                        if (firstEventForTable) {
-                            srcFilename = 
eventSourcerUtils.getSrcFileName(String.valueOf(lastCounter)).toString();
-                            srcOutputStream = 
eventSourcerUtils.getFileOutputStream(srcFilename);
-                            srcOut = srcOutputStream;
-                        } else {
-                            srcOut = outputStreamMap.get(tableName).get(0);
-                        }
-                        processTableScopeCommands(srcCmds, eventMetaFileMap, 
tableName, dbSrcEventList, srcOut);
-                    } else {
-                        throw new Exception("Event scope is not DB or Table");
-                    }
-                }
-
-
-                Iterable<? extends org.apache.hive.hcatalog.api.repl.Command> 
dstCmds = task.getDstWhCommands();
-                if (dstCmds != null) {
-                    if (eventScope == Scope.DB) {
-                        processDBScopeCommands(dbTgtEventList, dstCmds, 
outputStreamMap, CMDTYPE.TGT_CMD_TYPE);
-                    } else if (eventScope == Scope.TABLE) {
-                        OutputStream tgtOut;
-                        if (firstEventForTable) {
-                            tgtFilename = 
eventSourcerUtils.getTargetFileName(String.valueOf(lastCounter)).toString();
-                            tgtOutputStream = 
eventSourcerUtils.getFileOutputStream(tgtFilename);
-                            tgtOut = tgtOutputStream;
-                        } else {
-                            tgtOut = outputStreamMap.get(tableName).get(1);
-                        }
-                        processTableScopeCommands(dstCmds, eventMetaFileMap, 
tableName, dbTgtEventList, tgtOut);
-                    } else {
-                        throw new Exception("Event scope is not DB or Table");
-                    }
-                }
-
-                // If first table event, update the state data at the end
-                if (firstEventForTable) {
-                    updateStateDataIfFirstTableEvent(tableName, srcFilename, 
tgtFilename, srcOutputStream,
-                            tgtOutputStream, eventMetaFileMap, 
outputStreamMap);
-                }
-            } else {
-                LOG.error("Task is not actionable with event Id : {}", 
task.getEvent().getEventId());
-            }
-        }
-
-        ReplicationEventMetadata eventMetadata = new 
ReplicationEventMetadata();
-        // If there were only DB events for this run
-        if (eventMetaFileMap.isEmpty()) {
-            ++lastCounter;
-            if (!dbSrcEventList.isEmpty()) {
-                srcFilename = 
eventSourcerUtils.getSrcFileName(String.valueOf(lastCounter)).toString();
-                srcOutputStream = 
eventSourcerUtils.getFileOutputStream(srcFilename);
-                eventSourcerUtils.persistReplicationEvents(srcOutputStream, 
dbSrcEventList);
-            }
-
-            if (!dbTgtEventList.isEmpty()) {
-                tgtFilename = 
eventSourcerUtils.getTargetFileName(String.valueOf(lastCounter)).toString();
-                tgtOutputStream = 
eventSourcerUtils.getFileOutputStream(tgtFilename);
-                eventSourcerUtils.persistReplicationEvents(tgtOutputStream, 
dbTgtEventList);
-            }
-
-            // Close the stream
-            eventSourcerUtils.closeOutputStream(srcOutputStream);
-            eventSourcerUtils.closeOutputStream(tgtOutputStream);
-            EventSourcerUtils.updateEventMetadata(eventMetadata, dbName, null, 
srcFilename, tgtFilename);
-        } else {
-            closeAllStreams(outputStreamMap);
-            for (Map.Entry<String, List<String>> entry : 
eventMetaFileMap.entrySet()) {
-                String srcFile = null;
-                String tgtFile = null;
-                if (entry.getValue() != null) {
-                    srcFile = entry.getValue().get(0);
-                    tgtFile = entry.getValue().get(1);
-                }
-                EventSourcerUtils.updateEventMetadata(eventMetadata, dbName, 
entry.getKey(), srcFile, tgtFile);
-            }
-        }
-
-        return eventMetadata;
-    }
-
-    private void updateStateDataIfFirstTableEvent(final String tableName, 
final String srcFilename,
-                                                  final String tgtFilename,
-                                                  final OutputStream 
srcOutputStream,
-                                                  final OutputStream 
tgtOutputStream,
-                                                  Map<String, List<String>> 
eventMetaFileMap,
-                                                  Map<String, 
List<OutputStream>> outputStreamMap) {
-        List<String> files = Arrays.asList(srcFilename, tgtFilename);
-        eventMetaFileMap.put(tableName, files);
-
-        List<OutputStream> streams = Arrays.asList(srcOutputStream, 
tgtOutputStream);
-        outputStreamMap.put(tableName, streams);
-    }
-
-    private void closeAllStreams(final Map<String, List<OutputStream>> 
outputStreamMap) throws Exception {
-        if (outputStreamMap == null || outputStreamMap.isEmpty()) {
-            return;
-        }
-
-        for (Map.Entry<String, List<OutputStream>> entry : 
outputStreamMap.entrySet()) {
-            List<OutputStream> streams = entry.getValue();
-
-            for (OutputStream out : streams) {
-                if (out != null) {
-                    eventSourcerUtils.closeOutputStream(out);
-                }
-            }
-        }
-    }
-
-    private void processDBScopeCommands(final List<Command> dbEventList, final 
Iterable<? extends org.apache.hive
-            .hcatalog.api.repl.Command> cmds, final Map<String, 
List<OutputStream>> outputStreamMap, CMDTYPE cmdType
-    ) throws Exception {
-        addCmdsToDBEventList(dbEventList, cmds);
-
-        /* add DB event to all tables */
-        if (!outputStreamMap.isEmpty()) {
-            addDbEventToAllTablesEventFile(cmds, outputStreamMap, cmdType);
-        }
-    }
-
-    private void processTableScopeCommands(final Iterable<? extends 
org.apache.hive.hcatalog.api.repl.Command> cmds,
-                                           final Map<String,
-                                                   List<String>> 
eventMetaFileMap, String tableName,
-                                           final List<Command> dbEventList, 
final OutputStream out) throws Exception {
-        // First event for this table
-        // Before adding this event, add all the DB events
-        if (isFirstEventForTable(eventMetaFileMap, tableName)) {
-            addDbEventsToTableEventFile(out, dbEventList, tableName);
-        }
-        addTableEventToFile(out, cmds, tableName);
-    }
-
-    private boolean isFirstEventForTable(final Map<String,
-            List<String>> eventMetaFileMap, final String tableName) {
-        List<String> files = eventMetaFileMap.get(tableName);
-        return (files == null || files.isEmpty());
-    }
-
-    private void addCmdsToDBEventList(List<Command> dbEventList, final 
java.lang.Iterable
-            <? extends org.apache.hive.hcatalog.api.repl.Command> cmds) {
-        for (Command cmd : cmds) {
-            dbEventList.add(cmd);
-        }
-    }
-
-    private void addDbEventToAllTablesEventFile(
-            final java.lang.Iterable<? extends 
org.apache.hive.hcatalog.api.repl.Command> cmds,
-            final Map<String, List<OutputStream>> outputStreamMap, final 
CMDTYPE cmdType) throws Exception {
-        for (Map.Entry<String, List<OutputStream>> entry : 
outputStreamMap.entrySet()) {
-            String tableName = entry.getKey();
-            List<OutputStream> streams = entry.getValue();
-            OutputStream out;
-            if (CMDTYPE.SRC_CMD_TYPE == cmdType) {
-                out = streams.get(0);
-            } else {
-                out = streams.get(1);
-            }
-            addTableEventToFile(out, cmds, tableName);
-        }
-    }
-
-    private void addDbEventsToTableEventFile(final OutputStream out, final 
List<Command> dbEventList,
-                                             final String tableName) throws 
Exception {
-        /* First event for the table, add db events before adding this event */
-        addTableEventToFile(out, dbEventList, tableName);
-    }
-
-    private void addTableEventToFile(final OutputStream out,
-                                     final java.lang.Iterable<? extends 
org.apache.hive.hcatalog.api.repl.Command> cmds,
-                                     final String tableName) throws Exception {
-        Long eventId = eventFilter.eventFilterMap.get(tableName);
-        /* If not already processed, add it */
-        for (Command cmd : cmds) {
-            persistEvent(out, eventId, cmd);
-        }
-    }
-
-    private void persistEvent(final OutputStream out, final Long eventId, 
final Command cmd) throws Exception {
-        if (out == null) {
-            LOG.debug("persistEvent : out is null");
-            return;
-        }
-        if (eventId == null || cmd.getEventId() > eventId) {
-            eventSourcerUtils.persistReplicationEvents(out, cmd);
-        }
-    }
-
-    public boolean isPartitioningRequired(final HiveDROptions options) {
-        return (HiveDRUtils.getReplicationType(options.getSourceTables()) == 
HiveDRUtils.ReplicationType.DB);
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/EventSourcer.java
----------------------------------------------------------------------
diff --git 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/EventSourcer.java 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/EventSourcer.java
deleted file mode 100644
index 5f3312c..0000000
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/EventSourcer.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.hive;
-
-/**
- * Source events for each table into a file.
- */
-public interface EventSourcer {
-    /**
-     * @param inputOptions
-     * @return input filename to mapper
-     */
-    /* Source events for each <db, table> into a file */
-    String sourceEvents(HiveDROptions inputOptions) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java
deleted file mode 100644
index 5490232..0000000
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.hive;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-
-/**
- * Arguments for workflow execution.
- */
-public enum HiveDRArgs {
-
-    // source meta store details
-    SOURCE_CLUSTER("sourceCluster", "source cluster"),
-    SOURCE_METASTORE_URI("sourceMetastoreUri", "source meta store uri"),
-    SOURCE_HS2_URI("sourceHiveServer2Uri", "source HS2 uri"),
-    SOURCE_DATABASE("sourceDatabase", "comma source databases"),
-    SOURCE_TABLE("sourceTable", "comma source tables"),
-    SOURCE_STAGING_PATH("sourceStagingPath", "source staging path for data"),
-
-    // source hadoop endpoints
-    SOURCE_NN("sourceNN", "source name node"),
-    // source security kerberos principals
-    SOURCE_NN_KERBEROS_PRINCIPAL("sourceNNKerberosPrincipal", "Source name 
node kerberos principal", false),
-    
SOURCE_HIVE_METASTORE_KERBEROS_PRINCIPAL("sourceHiveMetastoreKerberosPrincipal",
-            "Source hive metastore kerberos principal", false),
-    SOURCE_HIVE2_KERBEROS_PRINCIPAL("sourceHive2KerberosPrincipal", "Source 
hiveserver2 kerberos principal", false),
-
-    TARGET_CLUSTER("targetCluster", "target cluster"),
-    // target meta store details
-    TARGET_METASTORE_URI("targetMetastoreUri", "source meta store uri"),
-    TARGET_HS2_URI("targetHiveServer2Uri", "source meta store uri"),
-
-    TARGET_STAGING_PATH("targetStagingPath", "source staging path for data"),
-
-    // target hadoop endpoints
-    TARGET_NN("targetNN", "target name node"),
-    // target security kerberos principals
-    TARGET_NN_KERBEROS_PRINCIPAL("targetNNKerberosPrincipal", "Target name 
node kerberos principal", false),
-    
TARGET_HIVE_METASTORE_KERBEROS_PRINCIPAL("targetHiveMetastoreKerberosPrincipal",
-            "Target hive metastore kerberos principal", false),
-    TARGET_HIVE2_KERBEROS_PRINCIPAL("targetHive2KerberosPrincipal", "Target 
hiveserver2 kerberos principal", false),
-
-    // num events
-    MAX_EVENTS("maxEvents", "number of events to process in this run"),
-
-    // tuning params
-    REPLICATION_MAX_MAPS("replicationMaxMaps", "number of maps", false),
-    DISTCP_MAX_MAPS("distcpMaxMaps", "number of maps", false),
-
-    // Map Bandwidth
-    DISTCP_MAP_BANDWIDTH("distcpMapBandwidth", "map bandwidth in mb", false),
-
-    JOB_NAME("drJobName", "unique job name"),
-
-    CLUSTER_FOR_JOB_RUN("clusterForJobRun", "cluster where job runs"),
-    JOB_CLUSTER_NN("clusterForJobRunWriteEP", "write end point of cluster 
where job runs"),
-    JOB_CLUSTER_NN_KERBEROS_PRINCIPAL("clusterForJobNNKerberosPrincipal",
-            "Namenode kerberos principal of cluster on which replication job 
runs", false),
-
-
-    FALCON_LIBPATH("falconLibPath", "Falcon Lib Path for Jar files", false),
-
-    KEEP_HISTORY("keepHistory", "Keep history of events file generated", 
false),
-    EXECUTION_STAGE("executionStage", "Flag for workflow stage execution", 
false),
-    COUNTER_LOGDIR("counterLogDir", "Log directory to store counter file", 
false);
-
-    private final String name;
-    private final String description;
-    private final boolean isRequired;
-
-    HiveDRArgs(String name, String description) {
-        this(name, description, true);
-    }
-
-    HiveDRArgs(String name, String description, boolean isRequired) {
-        this.name = name;
-        this.description = description;
-        this.isRequired = isRequired;
-    }
-
-    public Option getOption() {
-        return new Option(this.name, true, this.description);
-    }
-
-    public String getName() {
-        return this.name;
-    }
-
-    public String getDescription() {
-        return description;
-    }
-
-    public boolean isRequired() {
-        return isRequired;
-    }
-
-    public String getOptionValue(CommandLine cmd) {
-        return cmd.getOptionValue(this.name);
-    }
-
-    @Override
-    public String toString() {
-        return getName();
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java
----------------------------------------------------------------------
diff --git 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java
deleted file mode 100644
index 28515e4..0000000
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.hive;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.hive.exception.HiveReplicationException;
-
-import java.io.File;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Tool Options.
- */
-public class HiveDROptions {
-    private final Map<HiveDRArgs, String> context;
-
-    protected HiveDROptions(Map<HiveDRArgs, String> context) {
-        this.context = context;
-    }
-
-    public String getValue(HiveDRArgs arg) {
-        return context.get(arg);
-    }
-
-    public Map<HiveDRArgs, String> getContext() {
-        return context;
-    }
-
-    public String getSourceMetastoreUri() {
-        return context.get(HiveDRArgs.SOURCE_METASTORE_URI);
-    }
-
-    public String getSourceMetastoreKerberosPrincipal() {
-        return 
context.get(HiveDRArgs.SOURCE_HIVE_METASTORE_KERBEROS_PRINCIPAL);
-    }
-
-    public String getSourceHive2KerberosPrincipal() {
-        return context.get(HiveDRArgs.SOURCE_HIVE2_KERBEROS_PRINCIPAL);
-    }
-
-    public List<String> getSourceDatabases() {
-        return 
Arrays.asList(context.get(HiveDRArgs.SOURCE_DATABASE).trim().split(","));
-    }
-
-    public List<String> getSourceTables() {
-        return 
Arrays.asList(context.get(HiveDRArgs.SOURCE_TABLE).trim().split(","));
-    }
-
-    public String getSourceStagingPath() throws HiveReplicationException {
-        if 
(StringUtils.isNotEmpty(context.get(HiveDRArgs.SOURCE_STAGING_PATH))) {
-            return context.get(HiveDRArgs.SOURCE_STAGING_PATH) + 
File.separator + getJobName();
-        }
-        throw new HiveReplicationException("Source StagingPath cannot be 
empty");
-    }
-
-    public String getSourceWriteEP() {
-        return context.get(HiveDRArgs.SOURCE_NN);
-    }
-
-    public String getSourceNNKerberosPrincipal() {
-        return context.get(HiveDRArgs.SOURCE_NN_KERBEROS_PRINCIPAL);
-    }
-
-    public String getTargetWriteEP() {
-        return context.get(HiveDRArgs.TARGET_NN);
-    }
-
-    public String getTargetMetastoreUri() {
-        return context.get(HiveDRArgs.TARGET_METASTORE_URI);
-    }
-
-    public String getTargetNNKerberosPrincipal() {
-        return context.get(HiveDRArgs.TARGET_NN_KERBEROS_PRINCIPAL);
-    }
-
-    public String getTargetMetastoreKerberosPrincipal() {
-        return 
context.get(HiveDRArgs.TARGET_HIVE_METASTORE_KERBEROS_PRINCIPAL);
-    }
-    public String getTargetHive2KerberosPrincipal() {
-        return context.get(HiveDRArgs.TARGET_HIVE2_KERBEROS_PRINCIPAL);
-    }
-
-    public String getTargetStagingPath() throws HiveReplicationException {
-        if 
(StringUtils.isNotEmpty(context.get(HiveDRArgs.TARGET_STAGING_PATH))) {
-            return context.get(HiveDRArgs.TARGET_STAGING_PATH) + 
File.separator + getJobName();
-        }
-        throw new HiveReplicationException("Target StagingPath cannot be 
empty");
-    }
-
-    public String getReplicationMaxMaps() {
-        return context.get(HiveDRArgs.REPLICATION_MAX_MAPS);
-    }
-
-    public String getJobName() {
-        return context.get(HiveDRArgs.JOB_NAME);
-    }
-
-    public int getMaxEvents() {
-        return Integer.valueOf(context.get(HiveDRArgs.MAX_EVENTS));
-    }
-
-    public boolean shouldKeepHistory() {
-        return Boolean.valueOf(context.get(HiveDRArgs.KEEP_HISTORY));
-    }
-
-    public String getJobClusterWriteEP() {
-        return context.get(HiveDRArgs.JOB_CLUSTER_NN);
-    }
-
-    public String getJobClusterNNPrincipal() {
-        return context.get(HiveDRArgs.JOB_CLUSTER_NN_KERBEROS_PRINCIPAL);
-    }
-
-    public void setSourceStagingDir(String path) {
-        context.put(HiveDRArgs.SOURCE_STAGING_PATH, path);
-    }
-
-    public void setTargetStagingDir(String path) {
-        context.put(HiveDRArgs.TARGET_STAGING_PATH, path);
-    }
-
-    public String getExecutionStage() {
-        return context.get(HiveDRArgs.EXECUTION_STAGE);
-    }
-
-    public boolean shouldBlock() {
-        return true;
-    }
-
-    public static HiveDROptions create(String[] args) throws ParseException {
-        Map<HiveDRArgs, String> options = new HashMap<HiveDRArgs, String>();
-
-        CommandLine cmd = getCommand(args);
-        for (HiveDRArgs arg : HiveDRArgs.values()) {
-            String optionValue = arg.getOptionValue(cmd);
-            if (StringUtils.isNotEmpty(optionValue)) {
-                options.put(arg, optionValue);
-            }
-        }
-
-        return new HiveDROptions(options);
-    }
-
-    private static CommandLine getCommand(String[] arguments) throws 
ParseException {
-        Options options = new Options();
-
-        for (HiveDRArgs arg : HiveDRArgs.values()) {
-            addOption(options, arg, arg.isRequired());
-        }
-
-        return new GnuParser().parse(options, arguments, false);
-    }
-
-    private static void addOption(Options options, HiveDRArgs arg, boolean 
isRequired) {
-        Option option = arg.getOption();
-        option.setRequired(isRequired);
-        options.addOption(option);
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
deleted file mode 100644
index e141800..0000000
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
+++ /dev/null
@@ -1,393 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.hive;
-
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.hive.exception.HiveReplicationException;
-import org.apache.falcon.hive.mapreduce.CopyMapper;
-import org.apache.falcon.hive.mapreduce.CopyReducer;
-import org.apache.falcon.hive.util.DRStatusStore;
-import org.apache.falcon.hive.util.DelimiterUtils;
-import org.apache.falcon.hive.util.EventSourcerUtils;
-import org.apache.falcon.hive.util.FileUtils;
-import org.apache.falcon.hive.util.HiveDRStatusStore;
-import org.apache.falcon.hive.util.HiveDRUtils;
-import org.apache.falcon.hive.util.HiveMetastoreUtils;
-import org.apache.falcon.job.JobCounters;
-import org.apache.falcon.job.JobCountersHandler;
-import org.apache.falcon.job.JobType;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.hive.hcatalog.api.HCatClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * DR Tool Driver.
- */
-public class HiveDRTool extends Configured implements Tool {
-
-    private static final String META_PATH_FILE_SUFFIX = ".metapath";
-
-    private FileSystem jobFS;
-    private FileSystem sourceClusterFS;
-    private FileSystem targetClusterFS;
-
-    private HiveDROptions inputOptions;
-    private DRStatusStore drStore;
-    private String eventsMetaFile;
-    private EventSourcerUtils eventSoucerUtil;
-    private Configuration jobConf;
-    private String executionStage;
-
-    public static final FsPermission STAGING_DIR_PERMISSION =
-            new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(HiveDRTool.class);
-
-    public HiveDRTool() {
-    }
-
-    @Override
-    public int run(String[] args) throws Exception {
-        if (args.length < 1) {
-            usage();
-            return -1;
-        }
-
-        try {
-            init(args);
-        } catch (Throwable e) {
-            LOG.error("Invalid arguments: ", e);
-            System.err.println("Invalid arguments: " + e.getMessage());
-            usage();
-            return -1;
-        }
-
-        try {
-            Job job = execute();
-            if ((job != null) && 
(inputOptions.getExecutionStage().equalsIgnoreCase(
-                    HiveDRUtils.ExecutionStage.EXPORT.name()))) {
-                if ((job.getStatus().getState() == JobStatus.State.SUCCEEDED)
-                        && (job.getConfiguration().get("counterLogDir") != 
null)) {
-                    LOG.info("Obtaining job replication counters for Hive DR 
job");
-                    Path counterFile = new 
Path(job.getConfiguration().get("counterLogDir"), "counter.txt");
-                    JobCounters hiveReplicationCounters = 
JobCountersHandler.getCountersType(
-                            JobType.HIVEREPLICATION.name());
-                    
hiveReplicationCounters.obtainJobCounters(job.getConfiguration(), job, false);
-                    
hiveReplicationCounters.storeJobCounters(job.getConfiguration(), counterFile);
-                }
-            }
-        } catch (Exception e) {
-            System.err.println("Exception encountered " + e.getMessage());
-            e.printStackTrace();
-            LOG.error("Exception encountered, cleaning up staging dirs", e);
-            cleanup();
-            return -1;
-        }
-
-        if 
(inputOptions.getExecutionStage().equalsIgnoreCase(HiveDRUtils.ExecutionStage.IMPORT.name()))
 {
-            cleanup();
-        }
-
-        return 0;
-    }
-
-    private void init(String[] args) throws Exception {
-        LOG.info("Initializing HiveDR");
-        inputOptions = parseOptions(args);
-        LOG.info("Input Options: {}", inputOptions);
-
-        Configuration sourceConf = 
FileUtils.getConfiguration(inputOptions.getSourceWriteEP(),
-                inputOptions.getSourceNNKerberosPrincipal());
-        sourceClusterFS = FileSystem.get(sourceConf);
-        Configuration targetConf = 
FileUtils.getConfiguration(inputOptions.getTargetWriteEP(),
-                inputOptions.getTargetNNKerberosPrincipal());
-        targetClusterFS = FileSystem.get(targetConf);
-        jobConf = 
FileUtils.getConfiguration(inputOptions.getJobClusterWriteEP(),
-                inputOptions.getJobClusterNNPrincipal());
-        jobFS = FileSystem.get(jobConf);
-
-        // init DR status store
-        drStore = new HiveDRStatusStore(targetClusterFS);
-        eventSoucerUtil = new EventSourcerUtils(jobConf, 
inputOptions.shouldKeepHistory(), inputOptions.getJobName());
-    }
-
-    private HiveDROptions parseOptions(String[] args) throws ParseException {
-        return HiveDROptions.create(args);
-    }
-
-    public Job execute() throws Exception {
-        assert inputOptions != null;
-        assert getConf() != null;
-        executionStage = inputOptions.getExecutionStage();
-        LOG.info("Executing Workflow stage : {}", executionStage);
-        if 
(executionStage.equalsIgnoreCase(HiveDRUtils.ExecutionStage.LASTEVENTS.name())) 
{
-            String lastEventsIdFile = getLastEvents(jobConf);
-            LOG.info("Last successfully replicated Event file : {}", 
lastEventsIdFile);
-            return null;
-        } else if 
(executionStage.equalsIgnoreCase(HiveDRUtils.ExecutionStage.EXPORT.name())) {
-            createStagingDirectory();
-            eventsMetaFile = sourceEvents();
-            LOG.info("Sourced Events meta file : {}", eventsMetaFile);
-            if (StringUtils.isEmpty(eventsMetaFile)) {
-                LOG.info("No events to process");
-                return null;
-            } else {
-                /*
-                 * eventsMetaFile contains the events to be processed by 
HiveDr. This file should be available
-                 * for the import action as well. Persist the file at a 
location common to both export and import.
-                 */
-                persistEventsMetafileLocation(eventsMetaFile);
-            }
-        } else if 
(executionStage.equalsIgnoreCase(HiveDRUtils.ExecutionStage.IMPORT.name())) {
-            // read the location of eventsMetaFile from hdfs
-            eventsMetaFile = getEventsMetaFileLocation();
-            if (StringUtils.isEmpty(eventsMetaFile)) {
-                LOG.info("No events to process");
-                return null;
-            }
-        } else {
-            throw new HiveReplicationException("Invalid Execution stage : " + 
inputOptions.getExecutionStage());
-        }
-
-        Job job = createJob();
-        job.submit();
-
-        String jobID = job.getJobID().toString();
-        job.getConfiguration().set("HIVEDR_JOB_ID", jobID);
-
-        LOG.info("HiveDR job-id: {}", jobID);
-        if (inputOptions.shouldBlock() && !job.waitForCompletion(true)) {
-            throw new IOException("HiveDR failure: Job " + jobID + " has 
failed: "
-                    + job.getStatus().getFailureInfo());
-        }
-
-        return job;
-    }
-
-    private Job createJob() throws Exception {
-        String jobName = "hive-dr" + executionStage;
-        String userChosenName = getConf().get(JobContext.JOB_NAME);
-        if (userChosenName != null) {
-            jobName += ": " + userChosenName;
-        }
-        Job job = Job.getInstance(getConf());
-        job.setJobName(jobName);
-        job.setJarByClass(CopyMapper.class);
-        job.setMapperClass(CopyMapper.class);
-        job.setReducerClass(CopyReducer.class);
-        
job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.NLineInputFormat.class);
-
-        job.setOutputFormatClass(NullOutputFormat.class);
-        job.setMapOutputKeyClass(Text.class);
-        job.setMapOutputValueClass(Text.class);
-
-        job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false");
-        job.getConfiguration().set(JobContext.NUM_MAPS,
-                String.valueOf(inputOptions.getReplicationMaxMaps()));
-
-        for (HiveDRArgs args : HiveDRArgs.values()) {
-            if (inputOptions.getValue(args) != null) {
-                job.getConfiguration().set(args.getName(), 
inputOptions.getValue(args));
-            } else {
-                job.getConfiguration().set(args.getName(), "null");
-            }
-        }
-        job.getConfiguration().set(FileInputFormat.INPUT_DIR, eventsMetaFile);
-
-        return job;
-    }
-
-    private void createStagingDirectory() throws IOException, 
HiveReplicationException {
-        Path sourceStagingPath = new Path(inputOptions.getSourceStagingPath());
-        Path targetStagingPath = new Path(inputOptions.getTargetStagingPath());
-        LOG.info("Source staging path: {}", sourceStagingPath);
-        if (!FileSystem.mkdirs(sourceClusterFS, sourceStagingPath, 
STAGING_DIR_PERMISSION)) {
-            throw new IOException("mkdir failed for " + sourceStagingPath);
-        }
-
-        LOG.info("Target staging path: {}", targetStagingPath);
-        if (!FileSystem.mkdirs(targetClusterFS, targetStagingPath, 
STAGING_DIR_PERMISSION)) {
-            throw new IOException("mkdir failed for " + targetStagingPath);
-        }
-    }
-
-    private void cleanStagingDirectory() throws HiveReplicationException {
-        LOG.info("Cleaning staging directories");
-        Path sourceStagingPath = new Path(inputOptions.getSourceStagingPath());
-        Path targetStagingPath = new Path(inputOptions.getTargetStagingPath());
-        try {
-            if (sourceClusterFS.exists(sourceStagingPath)) {
-                sourceClusterFS.delete(sourceStagingPath, true);
-            }
-
-            if (targetClusterFS.exists(targetStagingPath)) {
-                targetClusterFS.delete(targetStagingPath, true);
-            }
-        } catch (IOException e) {
-            LOG.error("Unable to cleanup staging dir:", e);
-        }
-    }
-
-    private String sourceEvents() throws Exception {
-        MetaStoreEventSourcer defaultSourcer = null;
-        String inputFilename = null;
-        String lastEventsIdFile = FileUtils.DEFAULT_EVENT_STORE_PATH 
+File.separator+inputOptions.getJobName()+"/"
-                +inputOptions.getJobName()+".id";
-        Map<String, Long> lastEventsIdMap = getLastDBTableEvents(new 
Path(lastEventsIdFile));
-        try {
-            HCatClient sourceMetastoreClient = 
HiveMetastoreUtils.initializeHiveMetaStoreClient(
-                    inputOptions.getSourceMetastoreUri(),
-                    inputOptions.getSourceMetastoreKerberosPrincipal(),
-                    inputOptions.getSourceHive2KerberosPrincipal());
-            defaultSourcer = new MetaStoreEventSourcer(sourceMetastoreClient,
-                    new DefaultPartitioner(drStore, eventSoucerUtil), 
eventSoucerUtil, lastEventsIdMap);
-            inputFilename = defaultSourcer.sourceEvents(inputOptions);
-        } finally {
-            if (defaultSourcer != null) {
-                defaultSourcer.cleanUp();
-            }
-        }
-
-        return inputFilename;
-    }
-
-    private String getLastEvents(Configuration conf) throws Exception {
-        LastReplicatedEvents lastEvents = new LastReplicatedEvents(conf,
-                inputOptions.getTargetMetastoreUri(),
-                inputOptions.getTargetMetastoreKerberosPrincipal(),
-                inputOptions.getTargetHive2KerberosPrincipal(),
-                drStore, inputOptions);
-        String eventIdFile = lastEvents.getLastEvents(inputOptions);
-        lastEvents.cleanUp();
-        return eventIdFile;
-    }
-
-    private Map<String, Long> getLastDBTableEvents(Path lastEventIdFile) 
throws Exception {
-        Map<String, Long> lastEventsIdMap = new HashMap<String, Long>();
-        BufferedReader in = new BufferedReader(new 
InputStreamReader(jobFS.open(lastEventIdFile)));
-        try {
-            String line;
-            while ((line=in.readLine())!=null) {
-                String[] field = line.trim().split(DelimiterUtils.TAB_DELIM, 
-1);
-                lastEventsIdMap.put(field[0], Long.parseLong(field[1]));
-            }
-        } catch (Exception e) {
-            throw new IOException(e);
-        } finally {
-            IOUtils.closeQuietly(in);
-        }
-
-        return lastEventsIdMap;
-    }
-
-    public static void main(String[] args) {
-        int exitCode;
-        try {
-            HiveDRTool hiveDRTool = new HiveDRTool();
-            exitCode = ToolRunner.run(HiveDRUtils.getDefaultConf(), 
hiveDRTool, args);
-        } catch (Exception e) {
-            LOG.error("Couldn't complete HiveDR operation: ", e);
-            exitCode = -1;
-        }
-
-        System.exit(exitCode);
-    }
-
-    private void cleanInputDir() {
-        eventSoucerUtil.cleanUpEventInputDir();
-    }
-
-    private synchronized void cleanup() throws HiveReplicationException {
-        cleanStagingDirectory();
-        cleanInputDir();
-        cleanTempFiles();
-    }
-
-    private void cleanTempFiles() {
-        Path eventsDirPath = new Path(FileUtils.DEFAULT_EVENT_STORE_PATH, 
inputOptions.getJobName());
-        Path metaFilePath = new Path(eventsDirPath.toString(), 
inputOptions.getJobName() + META_PATH_FILE_SUFFIX);
-        Path eventsFilePath = new Path(eventsDirPath.toString(), 
inputOptions.getJobName() + ".id");
-
-        try {
-            if (jobFS.exists(metaFilePath)) {
-                jobFS.delete(metaFilePath, true);
-            }
-            if (jobFS.exists(eventsFilePath)) {
-                jobFS.delete(eventsFilePath, true);
-            }
-        } catch (IOException e) {
-            LOG.error("Deleting Temp files failed", e);
-        }
-    }
-
-    public void persistEventsMetafileLocation(final String eventMetaFilePath) 
throws IOException {
-        Path eventsDirPath = new Path(FileUtils.DEFAULT_EVENT_STORE_PATH, 
inputOptions.getJobName());
-        Path metaFilePath = new Path(eventsDirPath.toString(), 
inputOptions.getJobName() + META_PATH_FILE_SUFFIX);
-
-        OutputStream out = null;
-        try {
-            out = FileSystem.create(jobFS, metaFilePath, 
FileUtils.FS_PERMISSION_700);
-            out.write(eventMetaFilePath.getBytes());
-            out.flush();
-        } finally {
-            IOUtils.closeQuietly(out);
-        }
-    }
-
-    private String getEventsMetaFileLocation() throws IOException {
-        Path eventsDirPath = new Path(FileUtils.DEFAULT_EVENT_STORE_PATH, 
inputOptions.getJobName());
-        Path metaFilePath = new Path(eventsDirPath.toString(), 
inputOptions.getJobName() + META_PATH_FILE_SUFFIX);
-        String line = null;
-        if (jobFS.exists(metaFilePath)) {
-            BufferedReader in = new BufferedReader(new 
InputStreamReader(jobFS.open(metaFilePath)));
-            line = in.readLine();
-            in.close();
-        }
-        return line;
-    }
-
-
-    public static void usage() {
-        System.out.println("Usage: hivedrtool -option value ....");
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/LastReplicatedEvents.java
----------------------------------------------------------------------
diff --git 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/LastReplicatedEvents.java 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/LastReplicatedEvents.java
deleted file mode 100644
index bae6c9e..0000000
--- 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/LastReplicatedEvents.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.hive;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.hive.exception.HiveReplicationException;
-import org.apache.falcon.hive.util.DRStatusStore;
-import org.apache.falcon.hive.util.DelimiterUtils;
-import org.apache.falcon.hive.util.FileUtils;
-import org.apache.falcon.hive.util.HiveDRUtils;
-import org.apache.falcon.hive.util.HiveMetastoreUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hive.hcatalog.api.HCatClient;
-import org.apache.hive.hcatalog.api.HCatTable;
-import org.apache.hive.hcatalog.api.repl.ReplicationUtils;
-import org.apache.hive.hcatalog.common.HCatException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Sources meta store change events from Hive.
- */
-public class LastReplicatedEvents {
-    private static final Logger LOG = 
LoggerFactory.getLogger(LastReplicatedEvents.class);
-    private final HCatClient targetMetastoreClient;
-    private final DRStatusStore drStore;
-    private final FileSystem jobFS;
-    private Path eventsInputDirPath;
-
-    /* TODO handle cases when no events. files will be empty and lists will be 
empty */
-    public LastReplicatedEvents(Configuration conf, String targetMetastoreUri,
-                                String targetMetastoreKerberosPrincipal,
-                                String targetHive2KerberosPrincipal,
-                                DRStatusStore drStore, HiveDROptions 
inputOptions) throws Exception {
-        targetMetastoreClient = 
HiveMetastoreUtils.initializeHiveMetaStoreClient(targetMetastoreUri,
-                targetMetastoreKerberosPrincipal, 
targetHive2KerberosPrincipal);
-        jobFS = FileSystem.get(conf);
-        this.drStore = drStore;
-        init(inputOptions.getJobName());
-    }
-
-    private void init(final String jobName) throws Exception {
-        // Create base dir to store events on cluster where job is running
-        Path dir = new Path(FileUtils.DEFAULT_EVENT_STORE_PATH);
-        // Validate base path
-        FileUtils.validatePath(jobFS, new 
Path(DRStatusStore.BASE_DEFAULT_STORE_PATH));
-
-        if (!jobFS.exists(dir)) {
-            if (!jobFS.mkdirs(dir)) {
-                throw new Exception("Creating directory failed: " + dir);
-            }
-        }
-
-        eventsInputDirPath = new Path(FileUtils.DEFAULT_EVENT_STORE_PATH, 
jobName);
-
-        if (!jobFS.exists(eventsInputDirPath)) {
-            if (!jobFS.mkdirs(eventsInputDirPath)) {
-                throw new Exception("Creating directory failed: " + 
eventsInputDirPath);
-            }
-        }
-    }
-
-    public String getLastEvents(HiveDROptions inputOptions) throws Exception {
-        HiveDRUtils.ReplicationType replicationType = 
HiveDRUtils.getReplicationType(inputOptions.getSourceTables());
-        LOG.info("Obtaining last events for replicationType : {}", 
replicationType);
-        HashMap<String, Long> lastEvents = new HashMap<String, Long>();
-        if (replicationType == HiveDRUtils.ReplicationType.DB) {
-            List<String> dbNames = inputOptions.getSourceDatabases();
-            for (String db : dbNames) {
-                lastEvents.put(db, getLastSavedEventId(inputOptions, db, 
null));
-            }
-        } else {
-            List<String> tableNames = inputOptions.getSourceTables();
-            String db = inputOptions.getSourceDatabases().get(0);
-            for (String tableName : tableNames) {
-                lastEvents.put(db + "." + tableName, 
getLastSavedEventId(inputOptions, db, tableName));
-            }
-        }
-
-        return persistLastEventsToFile(lastEvents, inputOptions.getJobName());
-    }
-
-    private long getLastSavedEventId(HiveDROptions inputOptions, final String 
dbName,
-                                     final String tableName) throws Exception {
-        HiveDRUtils.ReplicationType replicationType = 
HiveDRUtils.getReplicationType(inputOptions.getSourceTables());
-        String jobName = inputOptions.getJobName();
-        String sourceMetastoreUri = inputOptions.getSourceMetastoreUri();
-        String targetMetastoreUri = inputOptions.getTargetMetastoreUri();
-
-        long eventId = 0;
-        if (HiveDRUtils.ReplicationType.DB == replicationType) {
-            eventId = drStore.getReplicationStatus(sourceMetastoreUri, 
targetMetastoreUri,
-                    jobName, dbName).getEventId();
-        } else if (HiveDRUtils.ReplicationType.TABLE == replicationType) {
-            eventId = drStore.getReplicationStatus(sourceMetastoreUri, 
targetMetastoreUri,
-                    jobName, dbName, tableName).getEventId();
-        }
-
-        if (eventId == -1) {
-            if (HiveDRUtils.ReplicationType.DB == replicationType) {
-                /*
-                 * API to get last repl ID for a DB is very expensive, so Hive 
does not want to make it public.
-                 * HiveDrTool finds last repl id for DB by finding min last 
repl id of all tables.
-                 */
-                eventId = getLastReplicationIdForDatabase(dbName);
-            } else {
-                HCatTable table = targetMetastoreClient.getTable(dbName, 
tableName);
-                eventId = ReplicationUtils.getLastReplicationId(table);
-            }
-        }
-
-        if ((StringUtils.isEmpty(tableName))) {
-            LOG.info("Last replicated eventId for DB : {} is {}", dbName, 
eventId);
-        } else {
-            LOG.info("Last replicated eventId for DB : {} Table : {} is {}", 
dbName, tableName, eventId);
-        }
-
-        return eventId;
-    }
-
-    private long getLastReplicationIdForDatabase(String databaseName) throws 
HiveReplicationException {
-        /*
-         * This is a very expensive method and should only be called during 
first dbReplication instance.
-         */
-        long eventId = Long.MAX_VALUE;
-        try {
-            List<String> tableList = 
targetMetastoreClient.listTableNamesByPattern(databaseName, "*");
-            for (String tableName : tableList) {
-                long temp = ReplicationUtils.getLastReplicationId(
-                        targetMetastoreClient.getTable(databaseName, 
tableName));
-                if (temp < eventId) {
-                    eventId = temp;
-                }
-            }
-            return (eventId == Long.MAX_VALUE) ? 0 : eventId;
-        } catch (HCatException e) {
-            throw new HiveReplicationException("Unable to find last 
replication id for database "
-                    + databaseName, e);
-        }
-    }
-
-    public String persistLastEventsToFile(final HashMap<String, Long> 
lastEvents,
-                                          final String identifier) throws 
IOException {
-        if (lastEvents.size()!=0) {
-            Path eventsFile = new Path(eventsInputDirPath.toString(), 
identifier+".id");
-            OutputStream out = null;
-
-            try {
-                out = FileSystem.create(jobFS, eventsFile, 
FileUtils.FS_PERMISSION_700);
-                for (Map.Entry<String, Long> entry : lastEvents.entrySet()) {
-                    out.write(entry.getKey().getBytes());
-                    out.write(DelimiterUtils.TAB_DELIM.getBytes());
-                    out.write(String.valueOf(entry.getValue()).getBytes());
-                    out.write(DelimiterUtils.NEWLINE_DELIM.getBytes());
-                }
-                out.flush();
-            } finally {
-                IOUtils.closeQuietly(out);
-            }
-            return jobFS.makeQualified(eventsFile).toString();
-        } else {
-            return null;
-        }
-    }
-
-    public void cleanUp() throws Exception {
-        if (targetMetastoreClient != null) {
-            targetMetastoreClient.close();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/MetaStoreEventSourcer.java
----------------------------------------------------------------------
diff --git 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/MetaStoreEventSourcer.java 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/MetaStoreEventSourcer.java
deleted file mode 100644
index f008883..0000000
--- 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/MetaStoreEventSourcer.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.hive;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.hive.util.EventSourcerUtils;
-import org.apache.falcon.hive.util.HiveDRUtils;
-import org.apache.hive.hcatalog.api.HCatClient;
-import org.apache.hive.hcatalog.api.repl.ReplicationTask;
-import org.apache.hive.hcatalog.api.repl.StagingDirectoryProvider;
-import org.apache.hive.hcatalog.common.HCatException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.OutputStream;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Sources meta store change events from Hive.
- */
-public class MetaStoreEventSourcer implements EventSourcer {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(MetaStoreEventSourcer.class);
-
-    private final HCatClient sourceMetastoreClient;
-    private final Partitioner partitioner;
-    private final EventSourcerUtils eventSourcerUtils;
-    private final ReplicationEventMetadata eventMetadata;
-    private Map<String, Long> lastEventsIdMap = null;
-    private long lastCounter;
-
-    /* TODO handle cases when no events. files will be empty and lists will be 
empty */
-    public MetaStoreEventSourcer(HCatClient sourceMetastoreClient, Partitioner 
partitioner,
-                                 EventSourcerUtils eventSourcerUtils,
-                                 Map<String, Long> lastEventsIdMap) throws 
Exception {
-        this.sourceMetastoreClient = sourceMetastoreClient;
-        this.eventMetadata = new ReplicationEventMetadata();
-        this.partitioner = partitioner;
-        this.eventSourcerUtils = eventSourcerUtils;
-        this.lastEventsIdMap = lastEventsIdMap;
-    }
-
-    public String sourceEvents(HiveDROptions inputOptions) throws Exception {
-        HiveDRUtils.ReplicationType replicationType = 
HiveDRUtils.getReplicationType(inputOptions.getSourceTables());
-        LOG.info("Sourcing replication events for type : {}", replicationType);
-        if (replicationType == HiveDRUtils.ReplicationType.DB) {
-            List<String> dbNames = inputOptions.getSourceDatabases();
-            for (String db : dbNames) {
-                ++lastCounter;
-                sourceEventsForDb(inputOptions, db);
-            }
-        } else {
-            List<String> tableNames = inputOptions.getSourceTables();
-            String db = inputOptions.getSourceDatabases().get(0);
-            for (String tableName : tableNames) {
-                ++lastCounter;
-                sourceEventsForTable(inputOptions, db, tableName);
-            }
-        }
-
-        if (eventMetadata.getEventFileMetadata() == null || 
eventMetadata.getEventFileMetadata().isEmpty()) {
-            LOG.info("No events for tables for the request db: {} , Tables : 
{}", inputOptions.getSourceDatabases(),
-                    inputOptions.getSourceTables());
-            eventSourcerUtils.cleanUpEventInputDir();
-            return null;
-        } else {
-            return eventSourcerUtils.persistToMetaFile(eventMetadata, 
inputOptions.getJobName());
-        }
-    }
-
-    private void sourceEventsForDb(HiveDROptions inputOptions, String dbName) 
throws Exception {
-        Iterator<ReplicationTask> replicationTaskIter = 
sourceReplicationEvents(getLastSavedEventId(dbName, null),
-                inputOptions.getMaxEvents(), dbName, null);
-        if (replicationTaskIter == null || !replicationTaskIter.hasNext()) {
-            LOG.info("No events for db: {}", dbName);
-            return;
-        }
-        processEvents(dbName, null, inputOptions, replicationTaskIter);
-    }
-
-    private void sourceEventsForTable(HiveDROptions inputOptions, String 
dbName, String tableName)
-        throws Exception {
-        Iterator<ReplicationTask> replicationTaskIter = 
sourceReplicationEvents(getLastSavedEventId(dbName, tableName),
-                inputOptions.getMaxEvents(), dbName, tableName
-        );
-        if (replicationTaskIter == null || !replicationTaskIter.hasNext()) {
-            LOG.info("No events for db.table: {}.{}", dbName, tableName);
-            return;
-        }
-        processEvents(dbName, tableName, inputOptions, replicationTaskIter);
-    }
-
-    private void processEvents(String dbName, String tableName, HiveDROptions 
inputOptions,
-                               Iterator<ReplicationTask> replicationTaskIter) 
throws Exception {
-        if (partitioner.isPartitioningRequired(inputOptions)) {
-            ReplicationEventMetadata dbEventMetadata = 
partitioner.partition(inputOptions, dbName, replicationTaskIter);
-
-            if (dbEventMetadata == null || 
dbEventMetadata.getEventFileMetadata() == null
-                    || dbEventMetadata.getEventFileMetadata().isEmpty()) {
-                LOG.info("No events for db: {} , Table : {}", dbName, 
tableName);
-            } else {
-                EventSourcerUtils.updateEventMetadata(eventMetadata, 
dbEventMetadata);
-            }
-        } else {
-            processTableReplicationEvents(replicationTaskIter, dbName, 
tableName,
-                    inputOptions.getSourceStagingPath(), 
inputOptions.getTargetStagingPath());
-        }
-    }
-
-    private long getLastSavedEventId(final String dbName, final String 
tableName) throws Exception {
-        String key = dbName;
-        if (StringUtils.isNotEmpty(tableName)) {
-            key = dbName + "." + tableName;
-        }
-        long eventId = lastEventsIdMap.get(key);
-        LOG.info("LastSavedEventId eventId for {} : {}", key, eventId);
-        return eventId;
-    }
-
-    private Iterator<ReplicationTask> sourceReplicationEvents(long 
lastEventId, int maxEvents, String dbName,
-                                                              String 
tableName) throws Exception {
-        try {
-            return sourceMetastoreClient.getReplicationTasks(lastEventId, 
maxEvents, dbName, tableName);
-        } catch (HCatException e) {
-            throw new Exception("Exception getting replication events " + 
e.getMessage(), e);
-        }
-    }
-
-
-    protected void processTableReplicationEvents(Iterator<ReplicationTask> 
taskIter, String dbName,
-                                               String tableName, String 
srcStagingDirProvider,
-                                               String dstStagingDirProvider) 
throws Exception {
-        String srcFilename = null;
-        String tgtFilename = null;
-        OutputStream srcOutputStream = null;
-        OutputStream tgtOutputStream = null;
-
-        while (taskIter.hasNext()) {
-            ReplicationTask task = taskIter.next();
-            if (task.needsStagingDirs()) {
-                task.withSrcStagingDirProvider(new 
StagingDirectoryProvider.TrivialImpl(srcStagingDirProvider,
-                        HiveDRUtils.SEPARATOR));
-                task.withDstStagingDirProvider(new 
StagingDirectoryProvider.TrivialImpl(dstStagingDirProvider,
-                        HiveDRUtils.SEPARATOR));
-            }
-
-            if (task.isActionable()) {
-                Iterable<? extends org.apache.hive.hcatalog.api.repl.Command> 
srcCmds = task.getSrcWhCommands();
-                if (srcCmds != null) {
-                    if (StringUtils.isEmpty(srcFilename)) {
-                        srcFilename = 
eventSourcerUtils.getSrcFileName(String.valueOf(lastCounter)).toString();
-                        srcOutputStream = 
eventSourcerUtils.getFileOutputStream(srcFilename);
-                    }
-                    
eventSourcerUtils.persistReplicationEvents(srcOutputStream, srcCmds);
-                }
-
-
-                Iterable<? extends org.apache.hive.hcatalog.api.repl.Command> 
dstCmds = task.getDstWhCommands();
-                if (dstCmds != null) {
-                    if (StringUtils.isEmpty(tgtFilename)) {
-                        tgtFilename = 
eventSourcerUtils.getTargetFileName(String.valueOf(lastCounter)).toString();
-                        tgtOutputStream = 
eventSourcerUtils.getFileOutputStream(tgtFilename);
-                    }
-                    
eventSourcerUtils.persistReplicationEvents(tgtOutputStream, dstCmds);
-                }
-
-            } else {
-                LOG.error("Task is not actionable with event Id : {}", 
task.getEvent().getEventId());
-            }
-        }
-        // Close the stream
-        eventSourcerUtils.closeOutputStream(srcOutputStream);
-        eventSourcerUtils.closeOutputStream(tgtOutputStream);
-        EventSourcerUtils.updateEventMetadata(eventMetadata, dbName, 
tableName, srcFilename, tgtFilename);
-    }
-
-    public String persistToMetaFile(String jobName) throws Exception {
-        return eventSourcerUtils.persistToMetaFile(eventMetadata, jobName);
-    }
-
-    public void cleanUp() throws Exception {
-        if (sourceMetastoreClient != null) {
-            sourceMetastoreClient.close();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/Partitioner.java
----------------------------------------------------------------------
diff --git 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/Partitioner.java 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/Partitioner.java
deleted file mode 100644
index 25b8bd6..0000000
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/Partitioner.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.hive;
-
-import org.apache.hive.hcatalog.api.repl.ReplicationTask;
-
-import java.util.Iterator;
-
-/**
- * Partition hive events.
- */
-public interface Partitioner {
-    /**
-     * Partition events.
-     *
-     * @param options                 Hive dr options.
-     * @param dbName                  Database name.
-     * @param replicationTaskIterator Repl task iterator.
-     * @return ReplicationEventMetadata
-     */
-    ReplicationEventMetadata partition(final HiveDROptions options,
-                                       final String dbName,
-                                       final Iterator<ReplicationTask> 
replicationTaskIterator) throws Exception;
-
-    boolean isPartitioningRequired(final HiveDROptions options);
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/ReplicationEventMetadata.java
----------------------------------------------------------------------
diff --git 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/ReplicationEventMetadata.java
 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/ReplicationEventMetadata.java
deleted file mode 100644
index 79e0ded..0000000
--- 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/ReplicationEventMetadata.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.hive;
-
-import java.util.Map;
-import java.util.HashMap;
-
-/**
- * Replication event meta data class.
- */
-public class ReplicationEventMetadata {
-
-    private Map<String, String> eventFileMetadata = new HashMap<>();
-
-    public Map<String, String> getEventFileMetadata() {
-        return eventFileMetadata;
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/exception/HiveReplicationException.java
----------------------------------------------------------------------
diff --git 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/exception/HiveReplicationException.java
 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/exception/HiveReplicationException.java
deleted file mode 100644
index 0baf6d8..0000000
--- 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/exception/HiveReplicationException.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.hive.exception;
-
-/**
- * Wrapper class for HiveReplication exceptions.
- */
-public class HiveReplicationException extends Exception {
-
-    /**
-     * @param e Exception
-     */
-    public HiveReplicationException(Throwable e) {
-        super(e);
-    }
-
-    public HiveReplicationException(String message, Throwable e) {
-        super(message, e);
-    }
-
-    /**
-     * @param message - custom exception message
-     */
-    public HiveReplicationException(String message) {
-        super(message);
-    }
-
-    /**
-     *
-     */
-    private static final long serialVersionUID = -1475818869309247014L;
-
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyCommitter.java
----------------------------------------------------------------------
diff --git 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyCommitter.java
 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyCommitter.java
deleted file mode 100644
index 98449f0..0000000
--- 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyCommitter.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.hive.mapreduce;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * Copy committer class.
- */
-public class CopyCommitter extends FileOutputCommitter {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(CopyCommitter.class);
-
-    /**
-     * Create a file output committer.
-     *
-     * @param outputPath the job's output path, or null if you want the output
-     *                   committer to act as a noop.
-     * @param context    the task's context
-     * @throws java.io.IOException
-     */
-    public CopyCommitter(Path outputPath,
-                         TaskAttemptContext context) throws IOException {
-        super(outputPath, context);
-    }
-
-    @Override
-    public void commitJob(JobContext jobContext) throws IOException {
-        Configuration conf = jobContext.getConfiguration();
-
-        try {
-            super.commitJob(jobContext);
-        } finally {
-            cleanup(conf);
-        }
-    }
-
-    private void cleanup(Configuration conf) {
-        // clean up staging and other data
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
----------------------------------------------------------------------
diff --git 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
deleted file mode 100644
index 08e0551..0000000
--- 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.hive.mapreduce;
-
-import org.apache.falcon.hive.HiveDRArgs;
-import org.apache.falcon.hive.util.EventUtils;
-import org.apache.falcon.hive.util.HiveDRUtils;
-import org.apache.falcon.hive.util.ReplicationStatus;
-import org.apache.falcon.job.ReplicationJobCountersList;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.sql.SQLException;
-import java.util.List;
-
-/**
- * Map class for Hive DR.
- */
-public class CopyMapper extends Mapper<LongWritable, Text, Text, Text> {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(CopyMapper.class);
-    private EventUtils eventUtils;
-
-    @Override
-    protected void setup(Context context) throws IOException, 
InterruptedException {
-        eventUtils = new EventUtils(context.getConfiguration());
-        eventUtils.initializeFS();
-        try {
-            eventUtils.setupConnection();
-        } catch (Exception e) {
-            throw new IOException(e);
-        }
-    }
-
-    @Override
-    protected void map(LongWritable key, Text value,
-                       Context context) throws IOException, 
InterruptedException {
-        LOG.debug("Processing Event value: {}", value.toString());
-
-        try {
-            eventUtils.processEvents(value.toString());
-        } catch (Exception e) {
-            LOG.error("Exception in processing events:", e);
-            throw new IOException(e);
-        } finally {
-            cleanup(context);
-        }
-        List<ReplicationStatus> replicationStatusList = 
eventUtils.getListReplicationStatus();
-        if (replicationStatusList != null && !replicationStatusList.isEmpty()) 
{
-            for (ReplicationStatus rs : replicationStatusList) {
-                context.write(new Text(rs.getJobName()), new 
Text(rs.toString()));
-            }
-        }
-
-        // In case of export stage, populate custom counters
-        if 
(context.getConfiguration().get(HiveDRArgs.EXECUTION_STAGE.getName())
-                .equalsIgnoreCase(HiveDRUtils.ExecutionStage.EXPORT.name())
-                && !eventUtils.isCountersMapEmtpy()) {
-            
context.getCounter(ReplicationJobCountersList.BYTESCOPIED).increment(
-                    
eventUtils.getCounterValue(ReplicationJobCountersList.BYTESCOPIED.getName()));
-            context.getCounter(ReplicationJobCountersList.COPY).increment(
-                    
eventUtils.getCounterValue(ReplicationJobCountersList.COPY.getName()));
-        }
-    }
-
-    protected void cleanup(Context context) throws IOException, 
InterruptedException {
-        LOG.info("Invoking cleanup process");
-        super.cleanup(context);
-        try {
-            if 
(context.getConfiguration().get(HiveDRArgs.EXECUTION_STAGE.getName())
-                    
.equalsIgnoreCase(HiveDRUtils.ExecutionStage.IMPORT.name())) {
-                eventUtils.cleanEventsDirectory();
-            }
-        } catch (IOException e) {
-            LOG.error("Cleaning up of events directories failed", e);
-        } finally {
-            try {
-                eventUtils.closeConnection();
-            } catch (SQLException e) {
-                LOG.error("Closing the connections failed", e);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java
----------------------------------------------------------------------
diff --git 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java
deleted file mode 100644
index 50cb4b2..0000000
--- 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.hive.mapreduce;
-
-
-import org.apache.falcon.hive.HiveDRArgs;
-import org.apache.falcon.hive.exception.HiveReplicationException;
-import org.apache.falcon.hive.util.DRStatusStore;
-import org.apache.falcon.hive.util.FileUtils;
-import org.apache.falcon.hive.util.HiveDRStatusStore;
-import org.apache.falcon.hive.util.ReplicationStatus;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Reducer;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-
-/**
- * Reducer class for Hive DR.
- */
-public class CopyReducer extends Reducer<Text, Text, Text, Text> {
-    private DRStatusStore hiveDRStore;
-
-    @Override
-    protected void setup(Context context) throws IOException, 
InterruptedException {
-        Configuration conf = context.getConfiguration();
-        FileSystem fs= FileSystem.get(FileUtils.getConfiguration(
-                conf.get(HiveDRArgs.TARGET_NN.getName()),
-                conf.get(HiveDRArgs.TARGET_NN_KERBEROS_PRINCIPAL.getName())));
-        hiveDRStore = new HiveDRStatusStore(fs);
-    }
-
-    private List<ReplicationStatus> sortStatusList(List<ReplicationStatus> 
replStatusList) {
-        Collections.sort(replStatusList, new Comparator<ReplicationStatus>() {
-            @Override
-            public int compare(ReplicationStatus r1, ReplicationStatus r2) {
-                return (int) (r1.getEventId() - r2.getEventId());
-            }
-        });
-        return replStatusList;
-    }
-
-    @Override
-    protected void reduce(Text key, Iterable<Text> values, Context context) 
throws IOException, InterruptedException {
-        List<ReplicationStatus> replStatusList = new 
ArrayList<ReplicationStatus>();
-        ReplicationStatus rs;
-        try {
-            for (Text value : values) {
-                String[] fields = (value.toString()).split("\t");
-                rs = new ReplicationStatus(fields[0], fields[1], fields[2], 
fields[3], fields[4],
-                        ReplicationStatus.Status.valueOf(fields[5]), 
Long.parseLong(fields[6]));
-                replStatusList.add(rs);
-            }
-
-            hiveDRStore.updateReplicationStatus(key.toString(), 
sortStatusList(replStatusList));
-        } catch (HiveReplicationException e) {
-            throw new IOException(e);
-        }
-    }
-
-    @Override
-    protected void cleanup(Context context) throws IOException, 
InterruptedException {
-    }
-}

Reply via email to