http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DBReplicationStatus.java
----------------------------------------------------------------------
diff --git 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DBReplicationStatus.java
 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DBReplicationStatus.java
deleted file mode 100644
index 6dceb8e..0000000
--- 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DBReplicationStatus.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.hive.util;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.hive.exception.HiveReplicationException;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Class to store replication status of a DB and it's tables.
- */
-public class DBReplicationStatus {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(DBReplicationStatus.class);
-    private static final String DB_STATUS = "db_status";
-    private static final String TABLE_STATUS = "table_status";
-
-    private Map<String, ReplicationStatus> tableStatuses = new HashMap<String, 
ReplicationStatus>();
-    private ReplicationStatus databaseStatus;
-
-    public DBReplicationStatus(ReplicationStatus dbStatus) throws 
HiveReplicationException {
-        setDatabaseStatus(dbStatus);
-    }
-
-    public DBReplicationStatus(ReplicationStatus dbStatus,
-                               Map<String, ReplicationStatus> tableStatuses) 
throws HiveReplicationException {
-        /*
-        The order of set method calls is important to ensure tables that do 
not belong to same db
-        are not added to this DBReplicationStatus
-         */
-        setDatabaseStatus(dbStatus);
-        setTableStatuses(tableStatuses);
-    }
-
-    // Serialize
-    public String toJsonString() throws HiveReplicationException {
-        JSONObject retObject = new JSONObject();
-        JSONObject tableStatus = new JSONObject();
-        try {
-            for (Map.Entry<String, ReplicationStatus> status : 
tableStatuses.entrySet()) {
-                tableStatus.put(status.getKey(), 
status.getValue().toJsonObject());
-            }
-            retObject.put(DB_STATUS, databaseStatus.toJsonObject());
-            retObject.put(TABLE_STATUS, tableStatus);
-            return retObject.toString(ReplicationStatus.INDENT_FACTOR);
-        } catch (JSONException e) {
-            throw new HiveReplicationException("Unable to serialize Database 
Replication Status", e);
-        }
-    }
-
-    // de-serialize
-    public DBReplicationStatus(String jsonString) throws 
HiveReplicationException {
-        try {
-            JSONObject object = new JSONObject(jsonString);
-            ReplicationStatus dbstatus = new 
ReplicationStatus(object.get(DB_STATUS).toString());
-            setDatabaseStatus(dbstatus);
-
-            JSONObject tableJson = object.getJSONObject(TABLE_STATUS);
-            Iterator keys = tableJson.keys();
-            while(keys.hasNext()) {
-                String key = keys.next().toString();
-                ReplicationStatus value = new 
ReplicationStatus(tableJson.get(key).toString());
-                if (value.getDatabase().equals(dbstatus.getDatabase())) {
-                    tableStatuses.put(key.toLowerCase(), value);
-                } else {
-                    throw new HiveReplicationException("Unable to create 
DBReplicationStatus from JsonString. "
-                            + "Cannot set status for table " + 
value.getDatabase() + "." + value.getTable()
-                            + ", It does not belong to DB " + 
dbstatus.getDatabase());
-                }
-            }
-        } catch (JSONException e) {
-            throw new HiveReplicationException("Unable to create 
DBReplicationStatus from JsonString", e);
-        }
-    }
-
-    public Map<String, ReplicationStatus> getTableStatuses() {
-        return tableStatuses;
-    }
-
-    public ReplicationStatus getTableStatus(String tableName) throws 
HiveReplicationException {
-        tableName = tableName.toLowerCase();
-        if (tableStatuses.containsKey(tableName)) {
-            return tableStatuses.get(tableName);
-        }
-        return new ReplicationStatus(databaseStatus.getSourceUri(), 
databaseStatus.getTargetUri(),
-                databaseStatus.getJobName(), databaseStatus.getDatabase(),
-                tableName, ReplicationStatus.Status.INIT, -1);
-    }
-
-    public Iterator<ReplicationStatus> getTableStatusIterator() {
-        List<ReplicationStatus> resultSet = new ArrayList<ReplicationStatus>();
-        for (Map.Entry<String, ReplicationStatus> entry : 
tableStatuses.entrySet()) {
-            resultSet.add(entry.getValue());
-        }
-        return resultSet.iterator();
-    }
-
-    private void setTableStatuses(Map<String, ReplicationStatus> 
tableStatuses) throws HiveReplicationException {
-        for (Map.Entry<String, ReplicationStatus> entry : 
tableStatuses.entrySet()) {
-            if 
(!entry.getValue().getDatabase().equals(databaseStatus.getDatabase())) {
-                throw new HiveReplicationException("Cannot set status for 
table " + entry.getValue().getDatabase()
-                        + "." + entry.getValue().getTable() + ", It does not 
belong to DB "
-                        + databaseStatus.getDatabase());
-            } else {
-                this.tableStatuses.put(entry.getKey().toLowerCase(), 
entry.getValue());
-            }
-        }
-    }
-
-    public ReplicationStatus getDatabaseStatus() {
-        return databaseStatus;
-    }
-
-    private void setDatabaseStatus(ReplicationStatus databaseStatus) {
-        this.databaseStatus = databaseStatus;
-    }
-
-    /**
-     * Update DB status from table statuses.
-            case 1) All tables replicated successfully.
-                Take the largest successful eventId and set dbReplStatus as 
success
-            case 2) One or many tables failed to replicate
-                Take the smallest eventId amongst the failed tables and set 
dbReplStatus as failed.
-     */
-    public void updateDbStatusFromTableStatuses() throws 
HiveReplicationException {
-        if (tableStatuses.size() == 0) {
-            // nothing to do
-            return;
-        }
-
-        databaseStatus.setStatus(ReplicationStatus.Status.SUCCESS);
-        long successEventId = databaseStatus.getEventId();
-        long failedEventId = -1;
-
-        for (Map.Entry<String, ReplicationStatus> entry : 
tableStatuses.entrySet()) {
-            long eventId = entry.getValue().getEventId();
-            if 
(entry.getValue().getStatus().equals(ReplicationStatus.Status.SUCCESS)) {
-                if (eventId > successEventId) {
-                    successEventId = eventId;
-                }
-            } else if 
(entry.getValue().getStatus().equals(ReplicationStatus.Status.FAILURE)) {
-                databaseStatus.setStatus(ReplicationStatus.Status.FAILURE);
-                if (eventId < failedEventId || failedEventId == -1) {
-                    failedEventId = eventId;
-                }
-            } //else , if table status is Status.INIT, it should not change 
lastEventId of DB
-        }
-
-        String log = "Updating DB Status based on table replication status. 
Status : "
-                + databaseStatus.getStatus().toString() + ", eventId : ";
-        if 
(databaseStatus.getStatus().equals(ReplicationStatus.Status.SUCCESS)) {
-            databaseStatus.setEventId(successEventId);
-            LOG.info(log + String.valueOf(successEventId));
-        } else if 
(databaseStatus.getStatus().equals(ReplicationStatus.Status.FAILURE)) {
-            databaseStatus.setEventId(failedEventId);
-            LOG.error(log + String.valueOf(failedEventId));
-        }
-
-    }
-
-    public void updateDbStatus(ReplicationStatus status) throws 
HiveReplicationException {
-        if (StringUtils.isNotEmpty(status.getTable())) {
-            throw new HiveReplicationException("Cannot update DB Status. This 
is table level status.");
-        }
-
-        if (this.databaseStatus.getDatabase().equals(status.getDatabase())) {
-            this.databaseStatus = status;
-        } else {
-            throw new HiveReplicationException("Cannot update Database Status. 
StatusDB "
-                    + status.getDatabase() + " does not match current DB "
-                    +  this.databaseStatus.getDatabase());
-        }
-    }
-
-    public void updateTableStatus(ReplicationStatus status) throws 
HiveReplicationException {
-        if (StringUtils.isEmpty(status.getTable())) {
-            throw new HiveReplicationException("Cannot update Table Status. 
Table name is empty.");
-        }
-
-        if (this.databaseStatus.getDatabase().equals(status.getDatabase())) {
-            this.tableStatuses.put(status.getTable(), status);
-        } else {
-            throw new HiveReplicationException("Cannot update Table Status. 
TableDB "
-                    + status.getDatabase() + " does not match current DB "
-                    +  this.databaseStatus.getDatabase());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DRStatusStore.java
----------------------------------------------------------------------
diff --git 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DRStatusStore.java 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DRStatusStore.java
deleted file mode 100644
index cf6b7ad..0000000
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DRStatusStore.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.hive.util;
-
-import org.apache.falcon.hive.exception.HiveReplicationException;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * Abstract class for Data Replication Status Store.
- */
-public abstract class DRStatusStore {
-
-    public static final String BASE_DEFAULT_STORE_PATH = 
"/apps/data-mirroring/";
-    public static final FsPermission DEFAULT_STORE_PERMISSION =
-            new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.NONE);
-
-    private static String storeGroup = "users";
-
-
-    /**
-     * Update replication status of a table(s)/db after replication job 
jobName completes.
-     * @param jobName Name of the replication job.
-     * @param statusList List of replication statuses of db/tables replicated 
by jobName.
-     */
-    public abstract void updateReplicationStatus(String jobName, 
List<ReplicationStatus> statusList)
-        throws HiveReplicationException;
-
-    /**
-     * Get Replication status for a database.
-     * @param source Replication source uri.
-     * @param target Replication target uri.
-     * @param jobName Name of the replication job.
-     * @param database Name of the target database.
-     * @return ReplicationStatus
-     * destination commands for each table
-     */
-    public abstract ReplicationStatus getReplicationStatus(String source, 
String target,
-                                                           String jobName, 
String database)
-        throws HiveReplicationException;
-
-    /**
-     * Get Replication status for a table.
-     * @param source Replication source uri.
-     * @param target Replication target uri.
-     * @param jobName Name of the replication job.
-     * @param database Name of the target database.
-     * @param table Name of the target table.
-     * @return ReplicationStatus
-     * destination commands for each table
-     */
-    public abstract ReplicationStatus getReplicationStatus(String source, 
String target,
-                                                           String jobName, 
String database,
-                                                           String table) 
throws HiveReplicationException;
-
-    /**
-     * Get Replication status of all tables in a database.
-     * @param source Replication source uri.
-     * @param target Replication target uri.
-     * @param jobName Name of the replication job.
-     * @param database Name of the target database.
-     * @return Iterator
-     * destination commands for each table
-     */
-    public abstract Iterator<ReplicationStatus> 
getTableReplicationStatusesInDb(String source, String target,
-                                                                               
 String jobName, String database)
-        throws HiveReplicationException;
-
-
-    /**
-     * Delete a replication job.
-     * @param jobName Name of the replication job.
-     * @param database Name of the target database.
-     * destination commands for each table
-     */
-    public abstract void deleteReplicationStatus(String jobName, String 
database) throws HiveReplicationException;
-
-    public static String getStoreGroup() {
-        return storeGroup;
-    }
-
-    public static void setStoreGroup(String group) {
-        storeGroup = group;
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DelimiterUtils.java
----------------------------------------------------------------------
diff --git 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DelimiterUtils.java 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DelimiterUtils.java
deleted file mode 100644
index 3b3156f..0000000
--- 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DelimiterUtils.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.hive.util;
-
-/**
- * Public delimiters used for event processing.
- */
-public final class DelimiterUtils {
-    public static final String FIELD_DELIM = "\u0001";
-    public static final String NEWLINE_DELIM = 
System.getProperty("line.separator");
-    public static final String TAB_DELIM = "\t";
-
-    private DelimiterUtils() {}
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventSourcerUtils.java
----------------------------------------------------------------------
diff --git 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventSourcerUtils.java
 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventSourcerUtils.java
deleted file mode 100644
index fb695d0..0000000
--- 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventSourcerUtils.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.hive.util;
-
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.hive.ReplicationEventMetadata;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hive.hcatalog.api.repl.Command;
-import org.apache.hive.hcatalog.api.repl.ReplicationUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Map;
-
-/**
- * Utility methods for event sourcer.
- */
-public class EventSourcerUtils {
-
-    private static final String METAFILE_EXTENSION = ".meta";
-    private static final String SRCFILE_EXTENSION = ".src";
-    private static final String TGTFILE_EXTENSION = ".tgt";
-    private Path eventsInputDirPath;
-    private final boolean shouldKeepHistory;
-    private final FileSystem jobFS;
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(EventSourcerUtils.class);
-
-    public EventSourcerUtils(final Configuration conf, final boolean 
shouldKeepHistory,
-                             final String jobName) throws Exception {
-        this.shouldKeepHistory = shouldKeepHistory;
-        jobFS = FileSystem.get(conf);
-        init(jobName);
-    }
-
-    private void init(final String jobName) throws Exception {
-        // Create base dir to store events on cluster where job is running
-        Path dir = new Path(FileUtils.DEFAULT_EVENT_STORE_PATH);
-        // Validate base path
-        FileUtils.validatePath(jobFS, new 
Path(DRStatusStore.BASE_DEFAULT_STORE_PATH));
-
-        if (!jobFS.exists(dir)) {
-            if (!jobFS.mkdirs(dir)) {
-                throw new Exception("Creating directory failed: " + dir);
-            }
-        }
-
-        eventsInputDirPath = new Path(FileUtils.DEFAULT_EVENT_STORE_PATH, 
jobName);
-
-        if (!jobFS.exists(eventsInputDirPath)) {
-            if (!jobFS.mkdirs(eventsInputDirPath)) {
-                throw new Exception("Creating directory failed: " + 
eventsInputDirPath);
-            }
-        }
-    }
-
-    public OutputStream getFileOutputStream(final String path) throws 
Exception {
-        return FileSystem.create(jobFS, new Path(path), 
FileUtils.FS_PERMISSION_700);
-    }
-
-    public void closeOutputStream(OutputStream out) throws IOException {
-        if (out != null) {
-            try {
-                out.flush();
-            } finally {
-                IOUtils.closeQuietly(out);
-            }
-        }
-    }
-
-    public void persistReplicationEvents(final OutputStream out,
-                                         final java.lang.Iterable
-                                                 <? extends 
org.apache.hive.hcatalog.api.repl.Command> cmds)
-        throws Exception {
-        for (Command cmd : cmds) {
-            persistReplicationEvents(out, cmd);
-        }
-    }
-
-    public void persistReplicationEvents(final OutputStream out,
-                                         final Command cmd) throws Exception {
-        out.write(ReplicationUtils.serializeCommand(cmd).getBytes());
-        LOG.debug("HiveDR Serialized Repl Command : {}", cmd);
-        out.write(DelimiterUtils.NEWLINE_DELIM.getBytes());
-    }
-
-    public String persistToMetaFile(final ReplicationEventMetadata data, final 
String identifier) throws IOException {
-        if (data != null && data.getEventFileMetadata() != null && 
!data.getEventFileMetadata().isEmpty()) {
-            Path metaFilename = new Path(eventsInputDirPath.toString(), 
identifier + METAFILE_EXTENSION);
-            OutputStream out = null;
-
-            try {
-                out = FileSystem.create(jobFS, metaFilename, 
FileUtils.FS_PERMISSION_700);
-
-                for (Map.Entry<String, String> entry : 
data.getEventFileMetadata().entrySet()) {
-                    out.write(entry.getKey().getBytes());
-                    out.write(DelimiterUtils.FIELD_DELIM.getBytes());
-                    out.write(entry.getValue().getBytes());
-                    out.write(DelimiterUtils.NEWLINE_DELIM.getBytes());
-                }
-                out.flush();
-            } finally {
-                IOUtils.closeQuietly(out);
-            }
-            return jobFS.makeQualified(metaFilename).toString();
-        } else {
-            return null;
-        }
-    }
-
-    public static void updateEventMetadata(ReplicationEventMetadata data, 
final String dbName, final String tableName,
-                                           final String srcFilename, final 
String tgtFilename) {
-        if (data == null || data.getEventFileMetadata() == null) {
-            return;
-        }
-        StringBuilder key = new StringBuilder();
-
-        if (StringUtils.isNotEmpty(dbName)) {
-            
key.append(Base64.encodeBase64URLSafeString(dbName.toLowerCase().getBytes()));
-        }
-        key.append(DelimiterUtils.FIELD_DELIM);
-        if (StringUtils.isNotEmpty(tableName)) {
-            
key.append(Base64.encodeBase64URLSafeString(tableName.toLowerCase().getBytes()));
-        }
-
-        StringBuilder value = new StringBuilder();
-        if (StringUtils.isNotEmpty(srcFilename)) {
-            value.append(srcFilename);
-        }
-        value.append(DelimiterUtils.FIELD_DELIM);
-
-        if (StringUtils.isNotEmpty(tgtFilename)) {
-            value.append(tgtFilename);
-        }
-
-        data.getEventFileMetadata().put(key.toString(), value.toString());
-    }
-
-    public static void updateEventMetadata(ReplicationEventMetadata data, 
final ReplicationEventMetadata inputData) {
-        if (data == null || data.getEventFileMetadata() == null || inputData 
== null
-                || inputData.getEventFileMetadata() == null || 
inputData.getEventFileMetadata().isEmpty()) {
-            return;
-        }
-
-        data.getEventFileMetadata().putAll(inputData.getEventFileMetadata());
-    }
-
-    public Path getSrcFileName(final String identifier) {
-        return jobFS.makeQualified(new Path(eventsInputDirPath, identifier + 
SRCFILE_EXTENSION));
-    }
-
-    public Path getTargetFileName(final String identifier) {
-        return jobFS.makeQualified(new Path(eventsInputDirPath, identifier + 
TGTFILE_EXTENSION));
-    }
-
-    public void cleanUpEventInputDir() {
-        if (!shouldKeepHistory) {
-            try {
-                jobFS.delete(eventsInputDirPath, true);
-                eventsInputDirPath = null;
-            } catch (IOException e) {
-                LOG.error("Unable to cleanup: {}", eventsInputDirPath, e);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
----------------------------------------------------------------------
diff --git 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
deleted file mode 100644
index d075bfb..0000000
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
+++ /dev/null
@@ -1,393 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.hive.util;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.hive.HiveDRArgs;
-import org.apache.falcon.hive.exception.HiveReplicationException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.tools.DistCp;
-import org.apache.hadoop.tools.DistCpOptions;
-import org.apache.hive.hcatalog.api.repl.Command;
-import org.apache.hive.hcatalog.api.repl.ReplicationUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * Utility class to handle Hive events for data-mirroring.
- */
-public class EventUtils {
-    private static final String DRIVER_NAME = 
"org.apache.hive.jdbc.HiveDriver";
-    private static final int TIMEOUT_IN_SECS = 300;
-    private static final String JDBC_PREFIX = "jdbc:";
-    private static final int RETRY_ATTEMPTS = 3;
-
-    private Configuration conf = null;
-    private String sourceHiveServer2Uri = null;
-    private String sourceDatabase = null;
-    private String sourceNN = null;
-    private String sourceNNKerberosPrincipal = null;
-    private String jobNN = null;
-    private String jobNNKerberosPrincipal = null;
-    private String targetHiveServer2Uri = null;
-    private String targetStagingPath = null;
-    private String targetNN = null;
-    private String targetNNKerberosPrincipal = null;
-    private String fullyQualifiedTargetStagingPath = null;
-    private List<Path> sourceCleanUpList = null;
-    private List<Path> targetCleanUpList = null;
-    private static final Logger LOG = 
LoggerFactory.getLogger(EventUtils.class);
-
-    private FileSystem sourceFileSystem = null;
-    private FileSystem jobFileSystem = null;
-    private FileSystem targetFileSystem = null;
-    private Connection sourceConnection = null;
-    private Connection targetConnection = null;
-    private Statement sourceStatement = null;
-    private Statement targetStatement = null;
-
-    private Map<String, Long> countersMap = null;
-
-    private List<ReplicationStatus> listReplicationStatus;
-
-    public EventUtils(Configuration conf) {
-        this.conf = conf;
-        sourceHiveServer2Uri = conf.get(HiveDRArgs.SOURCE_HS2_URI.getName());
-        sourceDatabase = conf.get(HiveDRArgs.SOURCE_DATABASE.getName());
-        sourceNN = conf.get(HiveDRArgs.SOURCE_NN.getName());
-        sourceNNKerberosPrincipal = 
conf.get(HiveDRArgs.SOURCE_NN_KERBEROS_PRINCIPAL.getName());
-        jobNN = conf.get(HiveDRArgs.JOB_CLUSTER_NN.getName());
-        jobNNKerberosPrincipal = 
conf.get(HiveDRArgs.JOB_CLUSTER_NN_KERBEROS_PRINCIPAL.getName());
-        targetHiveServer2Uri = conf.get(HiveDRArgs.TARGET_HS2_URI.getName());
-        targetStagingPath = conf.get(HiveDRArgs.TARGET_STAGING_PATH.getName())
-                + File.separator + conf.get(HiveDRArgs.JOB_NAME.getName());
-        targetNN = conf.get(HiveDRArgs.TARGET_NN.getName());
-        targetNNKerberosPrincipal = 
conf.get(HiveDRArgs.TARGET_NN_KERBEROS_PRINCIPAL.getName());
-        sourceCleanUpList = new ArrayList<Path>();
-        targetCleanUpList = new ArrayList<Path>();
-        countersMap = new HashMap<>();
-    }
-
-    public void setupConnection() throws Exception {
-        Class.forName(DRIVER_NAME);
-        DriverManager.setLoginTimeout(TIMEOUT_IN_SECS);
-        String authTokenString = ";auth=delegationToken";
-        //To bypass findbugs check, need to store empty password in Properties.
-        Properties password = new Properties();
-        password.put("password", "");
-        String user = "";
-
-        UserGroupInformation currentUser = 
UserGroupInformation.getCurrentUser();
-        if (currentUser != null) {
-            user = currentUser.getShortUserName();
-        }
-
-        if (conf.get(HiveDRArgs.EXECUTION_STAGE.getName())
-                .equalsIgnoreCase(HiveDRUtils.ExecutionStage.EXPORT.name())) {
-            String connString = JDBC_PREFIX + sourceHiveServer2Uri + "/" + 
sourceDatabase;
-            if 
(StringUtils.isNotEmpty(conf.get(HiveDRArgs.SOURCE_HIVE2_KERBEROS_PRINCIPAL.getName())))
 {
-                connString += authTokenString;
-            }
-            sourceConnection = DriverManager.getConnection(connString, user, 
password.getProperty("password"));
-            sourceStatement = sourceConnection.createStatement();
-        } else {
-            String connString = JDBC_PREFIX + targetHiveServer2Uri + "/" + 
sourceDatabase;
-            if 
(StringUtils.isNotEmpty(conf.get(HiveDRArgs.TARGET_HIVE2_KERBEROS_PRINCIPAL.getName())))
 {
-                connString += authTokenString;
-            }
-            targetConnection = DriverManager.getConnection(connString, user, 
password.getProperty("password"));
-            targetStatement = targetConnection.createStatement();
-        }
-    }
-
-    public void initializeFS() throws IOException {
-        LOG.info("Initializing staging directory");
-        fullyQualifiedTargetStagingPath = new Path(targetNN, 
targetStagingPath).toString();
-        sourceFileSystem = FileSystem.get(FileUtils.getConfiguration(sourceNN, 
sourceNNKerberosPrincipal));
-        jobFileSystem = FileSystem.get(FileUtils.getConfiguration(jobNN, 
jobNNKerberosPrincipal));
-        targetFileSystem = FileSystem.get(FileUtils.getConfiguration(targetNN, 
targetNNKerberosPrincipal));
-    }
-
-    private String readEvents(Path eventFileName) throws IOException {
-        StringBuilder eventString = new StringBuilder();
-        BufferedReader in = new BufferedReader(new 
InputStreamReader(jobFileSystem.open(eventFileName)));
-        try {
-            String line;
-            while ((line=in.readLine())!=null) {
-                eventString.append(line);
-                eventString.append(DelimiterUtils.NEWLINE_DELIM);
-            }
-        } catch (Exception e) {
-            throw new IOException(e);
-        } finally {
-            IOUtils.closeQuietly(in);
-        }
-
-        return eventString.toString();
-    }
-
-    public void processEvents(String event) throws Exception {
-        listReplicationStatus = new ArrayList<ReplicationStatus>();
-        String[] eventSplit = event.split(DelimiterUtils.FIELD_DELIM);
-        String dbName = new String(Base64.decodeBase64(eventSplit[0]), 
"UTF-8");
-        String tableName = new String(Base64.decodeBase64(eventSplit[1]), 
"UTF-8");
-        String exportEventStr;
-        String importEventStr;
-        if (conf.get(HiveDRArgs.EXECUTION_STAGE.getName())
-                .equalsIgnoreCase(HiveDRUtils.ExecutionStage.EXPORT.name())) {
-            exportEventStr = readEvents(new Path(eventSplit[2]));
-            if (StringUtils.isNotEmpty(exportEventStr)) {
-                LOG.info("Process the export statements for db {} table {}", 
dbName, tableName);
-                processCommands(exportEventStr, dbName, tableName, 
sourceStatement, sourceCleanUpList, false);
-                if (!sourceCleanUpList.isEmpty()) {
-                    invokeCopy(sourceCleanUpList);
-                }
-            }
-        } else if (conf.get(HiveDRArgs.EXECUTION_STAGE.getName())
-                .equalsIgnoreCase(HiveDRUtils.ExecutionStage.IMPORT.name())) {
-            importEventStr = readEvents(new Path(eventSplit[3]));
-            if (StringUtils.isNotEmpty(importEventStr)) {
-                LOG.info("Process the import statements for db {} table {}", 
dbName, tableName);
-                processCommands(importEventStr, dbName, tableName, 
targetStatement, targetCleanUpList, true);
-            }
-        }
-    }
-
-    public List<ReplicationStatus> getListReplicationStatus() {
-        return listReplicationStatus;
-    }
-
-    private void processCommands(String eventStr, String dbName, String 
tableName, Statement sqlStmt,
-                                 List<Path> cleanUpList, boolean 
isImportStatements)
-        throws SQLException, HiveReplicationException, IOException {
-        String[] commandList = eventStr.split(DelimiterUtils.NEWLINE_DELIM);
-        List<Command> deserializeCommand = new ArrayList<Command>();
-        for (String command : commandList) {
-            Command cmd = ReplicationUtils.deserializeCommand(command);
-            deserializeCommand.add(cmd);
-            List<String> cleanupLocations = cmd.cleanupLocationsAfterEvent();
-            cleanUpList.addAll(getCleanUpPaths(cleanupLocations));
-        }
-        for (Command cmd : deserializeCommand) {
-            try {
-                LOG.debug("Executing command : {} : {} ", cmd.getEventId(), 
cmd.toString());
-                executeCommand(cmd, dbName, tableName, sqlStmt, 
isImportStatements, 0);
-            } catch (Exception e) {
-                // clean up locations before failing.
-                cleanupEventLocations(sourceCleanUpList, sourceFileSystem);
-                cleanupEventLocations(targetCleanUpList, targetFileSystem);
-                throw new HiveReplicationException("Could not process 
replication command for "
-                        + " DB Name:" + dbName + ", Table Name:" + tableName, 
e);
-            }
-        }
-    }
-
-    private void executeCommand(Command cmd, String dbName, String tableName,
-                                Statement sqlStmt, boolean isImportStatements, 
int attempt)
-        throws HiveReplicationException, SQLException, IOException {
-        for (final String stmt : cmd.get()) {
-            executeSqlStatement(cmd, dbName, tableName, sqlStmt, stmt, 
isImportStatements, attempt);
-        }
-        if (isImportStatements) {
-            addReplicationStatus(ReplicationStatus.Status.SUCCESS, dbName, 
tableName, cmd.getEventId());
-        }
-    }
-
-    private void executeSqlStatement(Command cmd, String dbName, String 
tableName,
-                                     Statement sqlStmt, String stmt, boolean 
isImportStatements, int attempt)
-        throws HiveReplicationException, SQLException, IOException {
-        try {
-            sqlStmt.execute(stmt);
-        } catch (SQLException sqeOuter) {
-            // Retry if command is retriable.
-            if (attempt < RETRY_ATTEMPTS && cmd.isRetriable()) {
-                if (isImportStatements) {
-                    try {
-                        
cleanupEventLocations(getCleanUpPaths(cmd.cleanupLocationsPerRetry()), 
targetFileSystem);
-                    } catch (IOException ioe) {
-                        // Clean up failed before retry on target. Update 
failure status and return
-                        addReplicationStatus(ReplicationStatus.Status.FAILURE, 
dbName,
-                                tableName, cmd.getEventId());
-                        throw ioe;
-                    }
-                } else {
-                    
cleanupEventLocations(getCleanUpPaths(cmd.cleanupLocationsPerRetry()), 
sourceFileSystem);
-                }
-                executeCommand(cmd, dbName, tableName, sqlStmt, 
isImportStatements, ++attempt);
-                return; // Retry succeeded, return without throwing an 
exception.
-            }
-            // If we reached here, retries have failed.
-            LOG.error("SQL Exception: {}", sqeOuter);
-            undoCommand(cmd, dbName, tableName, sqlStmt, isImportStatements);
-            if (isImportStatements) {
-                addReplicationStatus(ReplicationStatus.Status.FAILURE, dbName, 
tableName, cmd.getEventId());
-            }
-            throw sqeOuter;
-        }
-    }
-
-    private static List<Path> getCleanUpPaths(List<String> cleanupLocations) {
-        List<Path> cleanupLocationPaths = new ArrayList<Path>();
-        for (String cleanupLocation : cleanupLocations) {
-            cleanupLocationPaths.add(new Path(cleanupLocation));
-        }
-        return cleanupLocationPaths;
-    }
-
-    private void undoCommand(Command cmd, String dbName,
-                             String tableName, Statement sqlStmt, boolean 
isImportStatements)
-        throws SQLException, HiveReplicationException {
-        if (cmd.isUndoable()) {
-            try {
-                List<String> undoCommands = cmd.getUndo();
-                LOG.debug("Undo command: {}", 
StringUtils.join(undoCommands.toArray()));
-                if (undoCommands.size() != 0) {
-                    for (final String undoStmt : undoCommands) {
-                        sqlStmt.execute(undoStmt);
-                    }
-                }
-            } catch (SQLException sqeInner) {
-                if (isImportStatements) {
-                    addReplicationStatus(ReplicationStatus.Status.FAILURE, 
dbName,
-                            tableName, cmd.getEventId());
-                }
-                LOG.error("SQL Exception: {}", sqeInner);
-                throw sqeInner;
-            }
-        }
-    }
-
-    private void addReplicationStatus(ReplicationStatus.Status status, String 
dbName, String tableName, long eventId)
-        throws HiveReplicationException {
-        try {
-            String drJobName = conf.get(HiveDRArgs.JOB_NAME.getName());
-            ReplicationStatus rs = new 
ReplicationStatus(conf.get(HiveDRArgs.SOURCE_CLUSTER.getName()),
-                    conf.get(HiveDRArgs.TARGET_CLUSTER.getName()), drJobName, 
dbName, tableName, status, eventId);
-            listReplicationStatus.add(rs);
-        } catch (HiveReplicationException hre) {
-            throw new HiveReplicationException("Could not update replication 
status store for "
-                    + " EventId:" + eventId
-                    + " DB Name:" + dbName
-                    + " Table Name:" + tableName
-                    + hre.toString());
-        }
-    }
-
-    public void invokeCopy(List<Path> srcStagingPaths) throws Exception {
-        DistCpOptions options = getDistCpOptions(srcStagingPaths);
-        DistCp distCp = new DistCp(conf, options);
-        LOG.info("Started DistCp with source Path: {} \ttarget path: {}", 
StringUtils.join(srcStagingPaths.toArray()),
-                fullyQualifiedTargetStagingPath);
-        Job distcpJob = distCp.execute();
-        LOG.info("Distp Hadoop job: {}", distcpJob.getJobID().toString());
-        LOG.info("Completed DistCp");
-        if (distcpJob.getStatus().getState() == JobStatus.State.SUCCEEDED) {
-            countersMap = HiveDRUtils.fetchReplicationCounters(conf, 
distcpJob);
-        }
-    }
-
-    public DistCpOptions getDistCpOptions(List<Path> srcStagingPaths) {
-        /*
-         * Add the fully qualified sourceNameNode to srcStagingPath uris. This 
will
-         * ensure DistCp will succeed when the job is run on target cluster.
-         */
-        List<Path> fullyQualifiedSrcStagingPaths = new ArrayList<Path>();
-        for (Path srcPath : srcStagingPaths) {
-            fullyQualifiedSrcStagingPaths.add(new Path(sourceNN, 
srcPath.toString()));
-        }
-        fullyQualifiedSrcStagingPaths.toArray(new 
Path[fullyQualifiedSrcStagingPaths.size()]);
-
-        DistCpOptions distcpOptions = new 
DistCpOptions(fullyQualifiedSrcStagingPaths,
-                new Path(fullyQualifiedTargetStagingPath));
-        /* setSyncFolder to false to retain dir structure as in source at the 
target. If set to true all files will be
-        copied to the same staging sir at target resulting in 
DuplicateFileException in DistCp.
-        */
-
-        distcpOptions.setSyncFolder(false);
-        distcpOptions.setBlocking(true);
-        
distcpOptions.setMaxMaps(Integer.valueOf(conf.get(HiveDRArgs.DISTCP_MAX_MAPS.getName())));
-        
distcpOptions.setMapBandwidth(Integer.valueOf(conf.get(HiveDRArgs.DISTCP_MAP_BANDWIDTH.getName())));
-        return distcpOptions;
-    }
-
-    public Long getCounterValue(String counterKey) {
-        return countersMap.get(counterKey);
-    }
-
-    public boolean isCountersMapEmtpy() {
-        return countersMap.size() == 0 ? true : false;
-    }
-
-    public void cleanEventsDirectory() throws IOException {
-        LOG.info("Cleaning staging directory");
-        cleanupEventLocations(sourceCleanUpList, sourceFileSystem);
-        cleanupEventLocations(targetCleanUpList, targetFileSystem);
-    }
-
-    private void cleanupEventLocations(List<Path> cleanupList, FileSystem 
fileSystem)
-        throws IOException {
-        for (Path cleanUpPath : cleanupList) {
-            try {
-                fileSystem.delete(cleanUpPath, true);
-            } catch (IOException ioe) {
-                LOG.error("Cleaning up of staging directory {} failed {}", 
cleanUpPath, ioe.toString());
-                throw ioe;
-            }
-        }
-
-    }
-
-    public void closeConnection() throws SQLException {
-        if (sourceStatement != null) {
-            sourceStatement.close();
-        }
-
-        if (targetStatement != null) {
-            targetStatement.close();
-        }
-
-        if (sourceConnection != null) {
-            sourceConnection.close();
-        }
-        if (targetConnection != null) {
-            targetConnection.close();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/util/FileUtils.java
----------------------------------------------------------------------
diff --git 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/FileUtils.java 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/FileUtils.java
deleted file mode 100644
index 6bd6319..0000000
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/FileUtils.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.hive.util;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-
-import java.io.File;
-import java.io.IOException;
-
-/**
- * Utility class to validate HDFS files.
- */
-public final class FileUtils {
-
-    public static final String DEFAULT_EVENT_STORE_PATH = 
DRStatusStore.BASE_DEFAULT_STORE_PATH
-            + File.separator + "Events";
-    public static final FsPermission FS_PERMISSION_700 = new 
FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
-
-
-    private FileUtils() {}
-
-    public static Configuration getConfiguration(final String writeEP, final 
String nnKerberosPrincipal) {
-        Configuration conf = new Configuration();
-        conf.set("fs.defaultFS", writeEP);
-        if (StringUtils.isNotEmpty(nnKerberosPrincipal)) {
-            conf.set("dfs.namenode.kerberos.principal", nnKerberosPrincipal);
-        }
-        return conf;
-    }
-
-    public static void validatePath(final FileSystem fileSystem, final Path 
basePath) throws IOException {
-        if (!fileSystem.exists(basePath)) {
-            throw new IOException("Please create base dir " + 
fileSystem.getUri() + basePath
-                    + ". Please set group to " + DRStatusStore.getStoreGroup()
-                    + " and permissions to " + 
DRStatusStore.DEFAULT_STORE_PERMISSION.toString());
-        }
-
-        if 
(!fileSystem.getFileStatus(basePath).getPermission().equals(DRStatusStore.DEFAULT_STORE_PERMISSION)
-                || 
!fileSystem.getFileStatus(basePath).getGroup().equalsIgnoreCase(DRStatusStore.getStoreGroup()))
 {
-            throw new IOException("Base dir " + fileSystem.getUri() + basePath
-                    + " does not have correct ownership/permissions."
-                    + " Please set group to " + DRStatusStore.getStoreGroup()
-                    + " and permissions to " + 
DRStatusStore.DEFAULT_STORE_PERMISSION.toString());
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java
----------------------------------------------------------------------
diff --git 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java
 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java
deleted file mode 100644
index 900afe8..0000000
--- 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java
+++ /dev/null
@@ -1,315 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.hive.util;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.hive.exception.HiveReplicationException;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-/**
- * DRStatusStore implementation for hive.
- */
-public class HiveDRStatusStore extends DRStatusStore {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(DRStatusStore.class);
-    private FileSystem fileSystem;
-
-    private static final String DEFAULT_STORE_PATH = BASE_DEFAULT_STORE_PATH + 
"hiveReplicationStatusStore/";
-    private static final FsPermission DEFAULT_STATUS_DIR_PERMISSION =
-            new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.NONE);
-
-    private static final String LATEST_FILE = "latest.json";
-    private static final int FILE_ROTATION_LIMIT = 10;
-    private static final int FILE_ROTATION_TIME = 86400000; // 1 day
-
-
-    public HiveDRStatusStore(FileSystem targetFileSystem) throws IOException {
-        init(targetFileSystem);
-    }
-
-    public HiveDRStatusStore(FileSystem targetFileSystem, String group) throws 
IOException {
-        HiveDRStatusStore.setStoreGroup(group);
-        init(targetFileSystem);
-    }
-
-    private void init(FileSystem targetFileSystem) throws IOException {
-        this.fileSystem = targetFileSystem;
-        Path basePath = new Path(BASE_DEFAULT_STORE_PATH);
-        FileUtils.validatePath(fileSystem, basePath);
-
-        Path storePath = new Path(DEFAULT_STORE_PATH);
-        if (!fileSystem.exists(storePath)) {
-            if (!FileSystem.mkdirs(fileSystem, storePath, 
DEFAULT_STORE_PERMISSION)) {
-                throw new IOException("mkdir failed for " + 
DEFAULT_STORE_PATH);
-            }
-        } else {
-            if 
(!fileSystem.getFileStatus(storePath).getPermission().equals(DEFAULT_STORE_PERMISSION))
 {
-                throw new IOException("Base dir " + DEFAULT_STORE_PATH + "does 
not have correct permissions. "
-                        + "Please set to 777");
-            }
-        }
-    }
-
-     /**
-        get all DB updated by the job. get all current table statuses for the 
DB merge the latest repl
-        status with prev table repl statuses. If all statuses are success, 
store the status as success
-        with largest eventId for the DB else store status as failure for the 
DB and lowest eventId.
-     */
-    @Override
-    public void updateReplicationStatus(String jobName, 
List<ReplicationStatus> statusList)
-        throws HiveReplicationException {
-        Map<String, DBReplicationStatus> dbStatusMap = new HashMap<String, 
DBReplicationStatus>();
-        for (ReplicationStatus status : statusList) {
-            if (!status.getJobName().equals(jobName)) {
-                String error = "JobName for status does not match current job 
\"" + jobName
-                        + "\". Status is " + status.toJsonString();
-                LOG.error(error);
-                throw new HiveReplicationException(error);
-            }
-
-            // init dbStatusMap and tableStatusMap from existing statuses.
-            if (!dbStatusMap.containsKey(status.getDatabase())) {
-                DBReplicationStatus dbStatus = 
getDbReplicationStatus(status.getSourceUri(), status.getTargetUri(),
-                        status.getJobName(), status.getDatabase());
-                dbStatusMap.put(status.getDatabase(), dbStatus);
-            }
-
-            // update existing statuses with new status for db/tables
-            if (StringUtils.isEmpty(status.getTable())) { // db level 
replication status.
-                dbStatusMap.get(status.getDatabase()).updateDbStatus(status);
-            } else { // table level replication status
-                
dbStatusMap.get(status.getDatabase()).updateTableStatus(status);
-            }
-        }
-        // write to disk
-        for (Map.Entry<String, DBReplicationStatus> entry : 
dbStatusMap.entrySet()) {
-            writeStatusFile(entry.getValue());
-        }
-    }
-
-    @Override
-    public ReplicationStatus getReplicationStatus(String source, String 
target, String jobName, String database)
-        throws HiveReplicationException {
-        return getReplicationStatus(source, target, jobName, database, null);
-    }
-
-
-    public ReplicationStatus getReplicationStatus(String source, String target,
-                                                  String jobName, String 
database,
-                                                  String table) throws 
HiveReplicationException {
-        if (StringUtils.isEmpty(table)) {
-            return getDbReplicationStatus(source, target, jobName, 
database).getDatabaseStatus();
-        } else {
-            return getDbReplicationStatus(source, target, jobName, 
database).getTableStatus(table);
-        }
-    }
-
-    @Override
-    public Iterator<ReplicationStatus> getTableReplicationStatusesInDb(String 
source, String target,
-                                                                       String 
jobName, String database)
-        throws HiveReplicationException {
-        DBReplicationStatus dbReplicationStatus = 
getDbReplicationStatus(source, target, jobName, database);
-        return dbReplicationStatus.getTableStatusIterator();
-    }
-
-    @Override
-    public void deleteReplicationStatus(String jobName, String database) 
throws HiveReplicationException {
-        Path deletePath = getStatusDirPath(database, jobName);
-        try {
-            if (fileSystem.exists(deletePath)) {
-                fileSystem.delete(deletePath, true);
-            }
-        } catch (IOException e) {
-            throw new HiveReplicationException("Failed to delete status for 
Job "
-                    + jobName + " and DB "+ database, e);
-        }
-
-    }
-
-    private DBReplicationStatus getDbReplicationStatus(String source, String 
target, String jobName,
-                                                       String database) throws 
HiveReplicationException{
-        DBReplicationStatus dbReplicationStatus = null;
-        Path statusDirPath = getStatusDirPath(database, jobName);
-        // check if database name or jobName can contain chars not allowed by 
hdfs dir/file naming.
-        // if yes, use md5 of the same for dir names. prefer to use actual db 
names for readability.
-
-        try {
-            if (fileSystem.exists(statusDirPath)) {
-                dbReplicationStatus = readStatusFile(statusDirPath);
-            }
-            if (null == dbReplicationStatus) {
-                // Init replication state for this database
-                ReplicationStatus initDbStatus = new ReplicationStatus(source, 
target, jobName,
-                        database, null, ReplicationStatus.Status.INIT, -1);
-                dbReplicationStatus = new DBReplicationStatus(initDbStatus);
-                if (!FileSystem.mkdirs(fileSystem, statusDirPath, 
DEFAULT_STATUS_DIR_PERMISSION)) {
-                    String error = "mkdir failed for " + 
statusDirPath.toString();
-                    LOG.error(error);
-                    throw new HiveReplicationException(error);
-                }
-                writeStatusFile(dbReplicationStatus);
-            }
-            return dbReplicationStatus;
-        } catch (IOException e) {
-            String error = "Failed to get ReplicationStatus for job " + 
jobName;
-            LOG.error(error);
-            throw new HiveReplicationException(error);
-        }
-    }
-
-    private Path getStatusDirPath(DBReplicationStatus dbReplicationStatus) {
-        ReplicationStatus status = dbReplicationStatus.getDatabaseStatus();
-        return getStatusDirPath(status.getDatabase(), status.getJobName());
-    }
-
-    public Path getStatusDirPath(String database, String jobName) {
-        return new Path(DEFAULT_STORE_PATH + "/" + database.toLowerCase() + 
"/" + jobName);
-    }
-
-    private void writeStatusFile(DBReplicationStatus dbReplicationStatus) 
throws HiveReplicationException {
-        dbReplicationStatus.updateDbStatusFromTableStatuses();
-        String statusDir = getStatusDirPath(dbReplicationStatus).toString();
-        try {
-            Path latestFile = new Path(statusDir + "/" + LATEST_FILE);
-            if (fileSystem.exists(latestFile)) {
-                Path renamedFile = new Path(statusDir + "/"
-                        + 
String.valueOf(fileSystem.getFileStatus(latestFile).getModificationTime()) + 
".json");
-                fileSystem.rename(latestFile, renamedFile);
-            }
-
-            FSDataOutputStream stream = FileSystem.create(fileSystem, 
latestFile, DEFAULT_STATUS_DIR_PERMISSION);
-            stream.write(dbReplicationStatus.toJsonString().getBytes());
-            stream.close();
-
-        } catch (IOException e) {
-            String error = "Failed to write latest Replication status into dir 
" + statusDir;
-            LOG.error(error);
-            throw new HiveReplicationException(error);
-        }
-
-        rotateStatusFiles(new Path(statusDir), FILE_ROTATION_LIMIT, 
FILE_ROTATION_TIME);
-    }
-
-    public void rotateStatusFiles(Path statusDir, int numFiles, int 
maxFileAge) throws HiveReplicationException {
-
-        List<String> fileList = new ArrayList<String>();
-        long now = System.currentTimeMillis();
-        try {
-            RemoteIterator<LocatedFileStatus> fileIterator = 
fileSystem.listFiles(statusDir, false);
-            while (fileIterator.hasNext()) {
-                fileList.add(fileIterator.next().getPath().toString());
-            }
-            if (fileList.size() > (numFiles+1)) {
-                // delete some files, as long as they are older than the time.
-                Collections.sort(fileList);
-                for (String file : fileList.subList(0, (fileList.size() - 
numFiles + 1))) {
-                    long modTime = fileSystem.getFileStatus(new 
Path(file)).getModificationTime();
-                    if ((now - modTime) > maxFileAge) {
-                        Path deleteFilePath = new Path(file);
-                        if (fileSystem.exists(deleteFilePath)) {
-                            fileSystem.delete(deleteFilePath, false);
-                        }
-                    }
-                }
-            }
-        } catch (IOException e) {
-            String error = "Failed to rotate status files in dir " + 
statusDir.toString();
-            LOG.error(error);
-            throw new HiveReplicationException(error);
-        }
-    }
-
-    private DBReplicationStatus readStatusFile(Path statusDirPath) throws 
HiveReplicationException {
-        try {
-            Path statusFile = new Path(statusDirPath.toString() + "/" + 
LATEST_FILE);
-            if ((!fileSystem.exists(statusDirPath)) || 
(!fileSystem.exists(statusFile))) {
-                return null;
-            } else {
-                return new 
DBReplicationStatus(IOUtils.toString(fileSystem.open(statusFile)));
-            }
-        } catch (IOException e) {
-            String error = "Failed to read latest Replication status from dir 
" + statusDirPath.toString();
-            LOG.error(error);
-            throw new HiveReplicationException(error);
-        }
-    }
-
-    public void checkForReplicationConflict(String newSource, String jobName,
-                                             String database, String table) 
throws HiveReplicationException {
-        try {
-            Path globPath = new Path(DEFAULT_STORE_PATH + "/" + 
database.toLowerCase() + "/*/latest.json");
-            FileStatus[] files = fileSystem.globStatus(globPath);
-            for(FileStatus file : files) {
-                DBReplicationStatus dbFileStatus = new 
DBReplicationStatus(IOUtils.toString(
-                        fileSystem.open(file.getPath())));
-                ReplicationStatus existingJob = 
dbFileStatus.getDatabaseStatus();
-
-                if (!(newSource.equals(existingJob.getSourceUri()))) {
-                    throw new HiveReplicationException("Two different sources 
are attempting to replicate to same db "
-                            + database + ". New Source = " + newSource
-                            + ", Existing Source = " + 
existingJob.getSourceUri());
-                } // two different sources replicating to same DB. Conflict
-                if (jobName.equals(existingJob.getJobName())) {
-                    continue;
-                } // same job, no conflict.
-
-                if (StringUtils.isEmpty(table)) {
-                    // When it is DB level replication, two different jobs 
cannot replicate to same DB
-                    throw new HiveReplicationException("Two different jobs are 
attempting to replicate to same db "
-                            + database.toLowerCase() + ". New Job = " + jobName
-                            + ", Existing Job = " + existingJob.getJobName());
-                }
-
-                /*
-                At this point, it is different table level jobs replicating 
from same newSource to same target. This is
-                allowed as long as the target tables are different. For 
example, job1 can replicate db1.table1 and
-                job2 can replicate db1.table2.  Both jobs cannot replicate to 
same table.
-                 */
-                for(Map.Entry<String, ReplicationStatus> entry : 
dbFileStatus.getTableStatuses().entrySet()) {
-                    if (table.equals(entry.getKey())) {
-                        throw new HiveReplicationException("Two different jobs 
are trying to replicate to same table "
-                                + entry.getKey() + ". New job = " + jobName
-                                + ", Existing job = " + 
existingJob.getJobName());
-                    }
-                }
-            }
-        } catch (IOException e) {
-            throw new HiveReplicationException("Failed to read status files 
for DB "
-                    + database, e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java
----------------------------------------------------------------------
diff --git 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java
deleted file mode 100644
index dff0803..0000000
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.hive.util;
-
-import org.apache.falcon.job.JobCounters;
-import org.apache.falcon.job.JobCountersHandler;
-import org.apache.falcon.job.JobType;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.util.Shell;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Hive replication utility class.
- */
-public final class HiveDRUtils {
-    /**
-     * Enum for Hive replication type.
-     */
-    public enum ReplicationType {
-        TABLE,
-        DB
-    }
-
-    /**
-     * Enum for hive-dr action type.
-     */
-    public enum ExecutionStage {
-        IMPORT,
-        EXPORT,
-        LASTEVENTS
-    }
-
-    private static final String ALL_TABLES = "*";
-
-    public static final String SEPARATOR = File.separator;
-
-    private HiveDRUtils() {}
-
-    public static ReplicationType getReplicationType(List<String> 
sourceTables) {
-        return (sourceTables.size() == 1 && 
sourceTables.get(0).equals(ALL_TABLES)) ? ReplicationType.DB
-                : ReplicationType.TABLE;
-    }
-
-    public static Configuration getDefaultConf() throws IOException {
-        Configuration conf = new Configuration();
-        conf.addResource(new Path("file:///", 
System.getProperty("oozie.action.conf.xml")));
-        String delegationToken = 
getFilePathFromEnv("HADOOP_TOKEN_FILE_LOCATION");
-        if (delegationToken != null) {
-            conf.set("mapreduce.job.credentials.binary", delegationToken);
-            conf.set("tez.credentials.path", delegationToken);
-        }
-        return conf;
-    }
-
-    public static String getFilePathFromEnv(String env) {
-        String path = System.getenv(env);
-        if (path != null && Shell.WINDOWS) {
-            // In Windows, file paths are enclosed in \" so remove them here
-            // to avoid path errors
-            if (path.charAt(0) == '"') {
-                path = path.substring(1);
-            }
-            if (path.charAt(path.length() - 1) == '"') {
-                path = path.substring(0, path.length() - 1);
-            }
-        }
-        return path;
-    }
-
-    public static Map<String, Long> fetchReplicationCounters(Configuration 
conf,
-                                                             Job job) throws 
IOException, InterruptedException {
-        JobCounters hiveReplicationCounters = 
JobCountersHandler.getCountersType(
-                JobType.HIVEREPLICATION.name());
-        hiveReplicationCounters.obtainJobCounters(conf, job, true);
-        return hiveReplicationCounters.getCountersMap();
-    }
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveMetastoreUtils.java
----------------------------------------------------------------------
diff --git 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveMetastoreUtils.java
 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveMetastoreUtils.java
deleted file mode 100644
index ea19f09..0000000
--- 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveMetastoreUtils.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.hive.util;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hive.hcatalog.api.HCatClient;
-import org.apache.hive.hcatalog.api.repl.exim.EximReplicationTaskFactory;
-import org.apache.hive.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-
-/**
- * Create hive metastore client for user.
- */
-public final class HiveMetastoreUtils {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(HiveMetastoreUtils.class);
-
-    private HiveMetastoreUtils() {}
-
-    public static HCatClient initializeHiveMetaStoreClient(String 
metastoreUri, String metastorePrincipal,
-                                                    String hive2Principal) 
throws Exception {
-        try {
-            HiveConf hcatConf = createHiveConf(HiveDRUtils.getDefaultConf(),
-                    metastoreUri, metastorePrincipal, hive2Principal);
-            HCatClient client = HCatClient.create(hcatConf);
-            return client;
-        } catch (IOException e) {
-            throw new Exception("Exception creating HCatClient: " + 
e.getMessage(), e);
-        }
-    }
-
-    private static HiveConf createHiveConf(Configuration conf, String 
metastoreUrl, String metastorePrincipal,
-                                           String hive2Principal) throws 
IOException {
-        JobConf jobConf = new JobConf(conf);
-        String delegationToken = 
HiveDRUtils.getFilePathFromEnv("HADOOP_TOKEN_FILE_LOCATION");
-        if (delegationToken != null) {
-            Credentials credentials = Credentials.readTokenStorageFile(new 
File(delegationToken), conf);
-            jobConf.setCredentials(credentials);
-            UserGroupInformation.getCurrentUser().addCredentials(credentials);
-        }
-
-        HiveConf hcatConf = new HiveConf(jobConf, HiveConf.class);
-
-        hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, metastoreUrl);
-        hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 
3);
-        hcatConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
-                HCatSemanticAnalyzer.class.getName());
-        hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, 
"false");
-
-        hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
-        hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
-        hcatConf.set(HiveConf.ConfVars.HIVE_REPL_TASK_FACTORY.varname, 
EximReplicationTaskFactory.class.getName());
-        if (StringUtils.isNotEmpty(metastorePrincipal)) {
-            
hcatConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, 
metastorePrincipal);
-            hcatConf.set(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, 
"true");
-            hcatConf.set(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI.varname, 
"true");
-            hcatConf.set("hadoop.rpc.protection", "authentication");
-        }
-        if (StringUtils.isNotEmpty(hive2Principal)) {
-            
hcatConf.set(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL.varname, 
hive2Principal);
-            
hcatConf.set(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION.varname, "kerberos");
-        }
-
-        return hcatConf;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/util/ReplicationStatus.java
----------------------------------------------------------------------
diff --git 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/ReplicationStatus.java
 
b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/ReplicationStatus.java
deleted file mode 100644
index bb33772..0000000
--- 
a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/ReplicationStatus.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.hive.util;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.hive.exception.HiveReplicationException;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-/**
- * Object to store replication status of a DB or a table.
- */
-public class ReplicationStatus {
-
-    public static final int INDENT_FACTOR = 4;
-    private static final String SOURCE = "sourceUri";
-    private static final String TARGET = "targetUri";
-    private static final String JOB_NAME = "jobName";
-    private static final String DATABASE = "database";
-    private static final String TABLE = "table";
-    private static final String EVENT_ID = "eventId";
-    private static final String STATUS_KEY = "status";
-    private static final String STATUS_LOG = "statusLog";
-
-    /**
-     * Replication Status enum.
-     */
-    public static enum Status {
-        INIT,
-        SUCCESS,
-        FAILURE
-    }
-
-    private String sourceUri;
-    private String targetUri;
-    private String jobName;
-    private String database;
-    private String table;
-    private Status status = Status.SUCCESS;
-    private long eventId = -1;
-    private String log;
-
-    //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
-    public ReplicationStatus(String sourceUri, String targetUri, String 
jobName,
-                             String database, String table,
-                             ReplicationStatus.Status status, long eventId) 
throws HiveReplicationException {
-        init(sourceUri, targetUri, jobName, database, table, status, eventId, 
null);
-    }
-
-    private void init(String source, String target, String job,
-                      String dbName, String tableName, 
ReplicationStatus.Status replStatus,
-                      long eventNum, String logStr) throws 
HiveReplicationException {
-        setSourceUri(source);
-        setTargetUri(target);
-        setJobName(job);
-        setDatabase(dbName);
-        setTable(tableName);
-        setStatus(replStatus);
-        setEventId(eventNum);
-        setLog(logStr);
-    }
-    //RESUME CHECKSTYLE CHECK ParameterNumberCheck
-
-    public ReplicationStatus(String jsonString) throws 
HiveReplicationException {
-        try {
-            JSONObject object = new JSONObject(jsonString);
-            Status objectStatus;
-            try {
-                objectStatus = 
ReplicationStatus.Status.valueOf(object.getString(STATUS_KEY).toUpperCase());
-            } catch (IllegalArgumentException e1) {
-                throw new HiveReplicationException("Unable to deserialize 
jsonString to ReplicationStatus."
-                        + " Invalid status " + object.getString(STATUS_KEY), 
e1);
-            }
-
-            init(object.getString(SOURCE), object.getString(TARGET), 
object.getString(JOB_NAME),
-                    object.getString(DATABASE), object.has(TABLE) ? 
object.getString(TABLE) : null,
-                    objectStatus, object.has(EVENT_ID) ? 
object.getLong(EVENT_ID) : -1,
-                    object.has(STATUS_LOG) ? object.getString(STATUS_LOG) : 
null);
-        } catch (JSONException e) {
-            throw new HiveReplicationException("Unable to deserialize 
jsonString to ReplicationStatus ", e);
-        }
-
-    }
-
-    public String toJsonString() throws HiveReplicationException {
-        try {
-            return toJsonObject().toString(INDENT_FACTOR);
-        } catch (JSONException e) {
-            throw new HiveReplicationException("Unable to serialize 
ReplicationStatus ", e);
-        }
-    }
-
-    public JSONObject toJsonObject() throws HiveReplicationException {
-        JSONObject jsonObject = new JSONObject();
-        try {
-            jsonObject.put(SOURCE, this.sourceUri);
-            jsonObject.put(TARGET, this.targetUri);
-            jsonObject.put(JOB_NAME, this.jobName);
-            jsonObject.put(DATABASE, this.database);
-            if (StringUtils.isNotEmpty(this.table)) {
-                jsonObject.put(TABLE, this.table);
-            }
-            jsonObject.put(STATUS_KEY, this.status.name());
-            if (this.eventId > -1) {
-                jsonObject.put(EVENT_ID, this.eventId);
-            } else {
-                jsonObject.put(EVENT_ID, -1);
-            }
-            if (StringUtils.isNotEmpty(this.log)) {
-                jsonObject.put(STATUS_LOG, this.log);
-            }
-            return jsonObject;
-        } catch (JSONException e) {
-            throw new HiveReplicationException("Unable to serialize 
ReplicationStatus ", e);
-        }
-    }
-
-    public String getSourceUri() {
-        return this.sourceUri;
-    }
-
-    public void setSourceUri(String source) throws HiveReplicationException {
-        validateString(SOURCE, source);
-        this.sourceUri = source;
-    }
-
-    public String getTargetUri() {
-        return this.targetUri;
-    }
-
-    public void setTargetUri(String target) throws HiveReplicationException {
-        validateString(TARGET, target);
-        this.targetUri = target;
-    }
-
-    public String getJobName() {
-        return this.jobName;
-    }
-
-    public void setJobName(String jobName) throws HiveReplicationException {
-        validateString(JOB_NAME, jobName);
-        this.jobName = jobName;
-    }
-
-    public String getDatabase() {
-        return this.database;
-    }
-
-    public void setDatabase(String database) throws HiveReplicationException {
-        validateString(DATABASE, database);
-        this.database = database.toLowerCase();
-    }
-
-    public String getTable() {
-        return this.table;
-    }
-
-    public void setTable(String table) {
-        this.table = (table == null) ? null : table.toLowerCase();
-    }
-
-    public Status getStatus() {
-        return this.status;
-    }
-
-    public void setStatus(Status status) throws HiveReplicationException {
-        if (status != null) {
-            this.status = status;
-        } else {
-            throw new HiveReplicationException("Failed to set 
ReplicationStatus. Input \""
-                    + STATUS_KEY + "\" cannot be empty");
-        }
-    }
-
-    public long getEventId() {
-        return this.eventId;
-    }
-
-    public void setEventId(long eventId) throws HiveReplicationException {
-        if (eventId > -1) {
-            this.eventId = eventId;
-        }
-    }
-
-    public String getLog() {
-        return this.log;
-    }
-
-    public void setLog(String log) {
-        this.log = log;
-    }
-
-    private void validateString(String inputName, String input) throws 
HiveReplicationException {
-        if (StringUtils.isEmpty(input)) {
-            throw new HiveReplicationException("Failed to set 
ReplicationStatus. Input \""
-                    + inputName + "\" cannot be empty");
-        }
-    }
-
-    public String toString() {
-        return sourceUri + "\t" + targetUri + "\t" + jobName + "\t"
-                + database + "\t"+ table + "\t" + status + "\t"+ eventId;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/resources/log4j.xml 
b/addons/hivedr/src/main/resources/log4j.xml
deleted file mode 100644
index f83a9a9..0000000
--- a/addons/hivedr/src/main/resources/log4j.xml
+++ /dev/null
@@ -1,54 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" ?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-  -->
-
-<!--
-    This is used for falcon packaging only.
-  -->
-
-<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
-
-<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/";>
-    <appender name="console" class="org.apache.log4j.ConsoleAppender">
-        <param name="Target" value="System.out"/>
-        <layout class="org.apache.log4j.PatternLayout">
-            <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m 
(%c{1}:%L)%n"/>
-        </layout>
-    </appender>
-
-    <logger name="org.apache.falcon" additivity="false">
-        <level value="debug"/>
-        <appender-ref ref="console"/>
-    </logger>
-
-    <logger name="org.apache.hadoop" additivity="false">
-        <level value="info"/>
-        <appender-ref ref="console"/>
-    </logger>
-
-    <logger name="org.apache.hadoop.hive" additivity="false">
-        <level value="info"/>
-        <appender-ref ref="console"/>
-    </logger>
-
-    <root>
-        <priority value="info"/>
-        <appender-ref ref="console"/>
-    </root>
-
-</log4j:configuration>

Reply via email to