HIVE-17100 : Improve HS2 operation logs for REPL commands (Sankar Hariappan, 
reviewed by Anishek Agarwal, Thejas Nair)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/70cb7f0b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/70cb7f0b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/70cb7f0b

Branch: refs/heads/master
Commit: 70cb7f0b22a7beb353a27576683ad824c4a4c007
Parents: a6b9cd5
Author: Sankar Hariappan <[email protected]>
Authored: Wed Aug 30 17:22:04 2017 -0700
Committer: Thejas M Nair <[email protected]>
Committed: Wed Aug 30 17:22:04 2017 -0700

----------------------------------------------------------------------
 .../listener/DummyRawStoreFailEvent.java        |    7 +
 .../hadoop/hive/metastore/HiveMetaStore.java    |    7 +
 .../hive/metastore/HiveMetaStoreClient.java     |    7 +
 .../hadoop/hive/metastore/IMetaStoreClient.java |   11 +
 .../hadoop/hive/metastore/ObjectStore.java      |   23 +
 .../apache/hadoop/hive/metastore/RawStore.java  |    9 +
 .../hive/metastore/cache/CachedStore.java       |    7 +
 .../hive/metastore/messaging/EventUtils.java    |   13 +
 .../DummyRawStoreControlledCommit.java          |    7 +
 .../DummyRawStoreForJdoConnection.java          |    6 +
 ql/if/queryplan.thrift                          |    1 +
 ql/src/gen/thrift/gen-cpp/queryplan_types.cpp   |    8 +-
 ql/src/gen/thrift/gen-cpp/queryplan_types.h     |    3 +-
 .../hadoop/hive/ql/plan/api/StageType.java      |    5 +-
 ql/src/gen/thrift/gen-php/Types.php             |    2 +
 ql/src/gen/thrift/gen-py/queryplan/ttypes.py    |    3 +
 ql/src/gen/thrift/gen-rb/queryplan_types.rb     |    5 +-
 .../apache/hadoop/hive/ql/exec/TaskFactory.java |    3 +
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java  |   39 +-
 .../hive/ql/exec/repl/ReplStateLogTask.java     |   51 +
 .../hive/ql/exec/repl/ReplStateLogWork.java     |  100 +
 .../ql/exec/repl/bootstrap/ReplLoadTask.java    |   53 +-
 .../ql/exec/repl/bootstrap/ReplLoadWork.java    |    3 +-
 .../filesystem/BootstrapEventsIterator.java     |   56 +-
 .../filesystem/DatabaseEventsIterator.java      |    4 +
 .../exec/repl/bootstrap/load/LoadFunction.java  |   18 +-
 .../exec/repl/bootstrap/load/TaskTracker.java   |   13 +-
 .../bootstrap/load/table/LoadPartitions.java    |   65 +-
 .../repl/bootstrap/load/table/LoadTable.java    |   45 +-
 .../ql/parse/ReplicationSemanticAnalyzer.java   |   70 +-
 .../hadoop/hive/ql/parse/ReplicationSpec.java   |   14 +-
 .../hadoop/hive/ql/parse/repl/ReplLogger.java   |   36 +
 .../hadoop/hive/ql/parse/repl/ReplState.java    |   57 +
 .../hadoop/hive/ql/parse/repl/dump/Utils.java   |   17 +-
 .../repl/dump/log/BootstrapDumpLogger.java      |   71 +
 .../repl/dump/log/IncrementalDumpLogger.java    |   55 +
 .../repl/dump/log/state/BootstrapDumpBegin.java |   47 +
 .../repl/dump/log/state/BootstrapDumpEnd.java   |   59 +
 .../dump/log/state/BootstrapDumpFunction.java   |   44 +
 .../repl/dump/log/state/BootstrapDumpTable.java |   52 +
 .../dump/log/state/IncrementalDumpBegin.java    |   43 +
 .../repl/dump/log/state/IncrementalDumpEnd.java |   54 +
 .../dump/log/state/IncrementalDumpEvent.java    |   51 +
 .../repl/load/log/BootstrapLoadLogger.java      |   66 +
 .../repl/load/log/IncrementalLoadLogger.java    |   55 +
 .../repl/load/log/state/BootstrapLoadBegin.java |   51 +
 .../repl/load/log/state/BootstrapLoadEnd.java   |   59 +
 .../load/log/state/BootstrapLoadFunction.java   |   44 +
 .../repl/load/log/state/BootstrapLoadTable.java |   52 +
 .../load/log/state/IncrementalLoadBegin.java    |   47 +
 .../repl/load/log/state/IncrementalLoadEnd.java |   54 +
 .../load/log/state/IncrementalLoadEvent.java    |   51 +
 .../load/message/CreateFunctionHandler.java     |    8 +
 .../parse/repl/load/message/MessageHandler.java |    9 +-
 .../hadoop/hive/ql/plan/ImportTableDesc.java    |   53 +-
 .../gen/thrift/gen-cpp/ThriftHiveMetastore.cpp  | 2515 ++++++++++--------
 .../gen/thrift/gen-cpp/ThriftHiveMetastore.h    |  126 +
 .../ThriftHiveMetastore_server.skeleton.cpp     |    5 +
 .../gen/thrift/gen-cpp/hive_metastore_types.cpp | 1037 +++++---
 .../gen/thrift/gen-cpp/hive_metastore_types.h   |   89 +
 .../api/NotificationEventsCountRequest.java     |  488 ++++
 .../api/NotificationEventsCountResponse.java    |  387 +++
 .../hive/metastore/api/ThriftHiveMetastore.java |  858 ++++++
 .../gen-php/metastore/ThriftHiveMetastore.php   |  216 ++
 .../src/gen/thrift/gen-php/metastore/Types.php  |  173 ++
 .../hive_metastore/ThriftHiveMetastore-remote   |    7 +
 .../hive_metastore/ThriftHiveMetastore.py       |  185 ++
 .../gen/thrift/gen-py/hive_metastore/ttypes.py  |  149 ++
 .../gen/thrift/gen-rb/hive_metastore_types.rb   |   37 +
 .../gen/thrift/gen-rb/thrift_hive_metastore.rb  |   54 +
 .../src/main/thrift/hive_metastore.thrift       |   10 +
 71 files changed, 6491 insertions(+), 1638 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
----------------------------------------------------------------------
diff --git 
a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
 
b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
index 5d7cfad..8d861e4 100644
--- 
a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
+++ 
b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
@@ -47,6 +47,8 @@ import 
org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.api.NotificationEventRequest;
 import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest;
+import org.apache.hadoop.hive.metastore.api.NotificationEventsCountResponse;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.PartitionEventType;
 import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet;
@@ -841,6 +843,11 @@ public class DummyRawStoreFailEvent implements RawStore, 
Configurable {
   }
 
   @Override
+  public NotificationEventsCountResponse 
getNotificationEventsCount(NotificationEventsCountRequest rqst) {
+    return objectStore.getNotificationEventsCount(rqst);
+  }
+
+  @Override
   public void flushCache() {
     objectStore.flushCache();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 55def04..5812a1b 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -7026,6 +7026,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
     }
 
     @Override
+    public NotificationEventsCountResponse 
get_notification_events_count(NotificationEventsCountRequest rqst)
+            throws TException {
+      RawStore ms = getMS();
+      return ms.getNotificationEventsCount(rqst);
+    }
+
+    @Override
     public FireEventResponse fire_listener_event(FireEventRequest rqst) throws 
TException {
       switch (rqst.getData().getSetField()) {
       case INSERT_DATA:

http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 688f181..70451c4 100644
--- 
a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ 
b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -2356,6 +2356,13 @@ public class HiveMetaStoreClient implements 
IMetaStoreClient, AutoCloseable {
     return client.get_current_notificationEventId();
   }
 
+  @InterfaceAudience.LimitedPrivate({"HCatalog"})
+  @Override
+  public NotificationEventsCountResponse 
getNotificationEventsCount(NotificationEventsCountRequest rqst)
+          throws TException {
+    return client.get_notification_events_count(rqst);
+  }
+
   @InterfaceAudience.LimitedPrivate({"Apache Hive, HCatalog"})
   @Override
   public FireEventResponse fireListenerEvent(FireEventRequest rqst) throws 
TException {

http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
index 813a283..69a845c 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
@@ -76,6 +76,8 @@ import 
org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
 import org.apache.hadoop.hive.metastore.api.NotNullConstraintsRequest;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hadoop.hive.metastore.api.NotificationEventsCountResponse;
+import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest;
 import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.PartitionEventType;
@@ -1634,6 +1636,15 @@ public interface IMetaStoreClient {
   CurrentNotificationEventId getCurrentNotificationEventId() throws TException;
 
   /**
+   * Get the number of events from given eventID for the input database.
+   * @return number of events
+   * @throws TException
+   */
+  @InterfaceAudience.LimitedPrivate({"HCatalog"})
+  NotificationEventsCountResponse 
getNotificationEventsCount(NotificationEventsCountRequest rqst)
+          throws TException;
+
+  /**
    * Request that the metastore fire an event.  Currently this is only 
supported for DML
    * operations, since the metastore knows when DDL operations happen.
    * @param request

http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index efbdb8f..0db1bc0 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -98,6 +98,8 @@ import 
org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.api.NotificationEventRequest;
 import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest;
+import org.apache.hadoop.hive.metastore.api.NotificationEventsCountResponse;
 import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.PartitionEventType;
@@ -8339,6 +8341,27 @@ public class ObjectStore implements RawStore, 
Configurable {
     }
   }
 
+  @Override
+  public NotificationEventsCountResponse 
getNotificationEventsCount(NotificationEventsCountRequest rqst) {
+    Long result = 0L;
+    boolean commited = false;
+    Query query = null;
+    try {
+      openTransaction();
+      long fromEventId = rqst.getFromEventId();
+      String inputDbName = rqst.getDbName();
+      String queryStr = "select count(eventId) from " + 
MNotificationLog.class.getName()
+                + " where eventId > fromEventId && dbName == inputDbName";
+      query = pm.newQuery(queryStr);
+      query.declareParameters("java.lang.Long fromEventId, java.lang.String 
inputDbName");
+      result = (Long) query.execute(fromEventId, inputDbName);
+      commited = commitTransaction();
+      return new NotificationEventsCountResponse(result.longValue());
+    } finally {
+      rollbackAndCleanup(commited, query);
+    }
+  }
+
   private MNotificationLog translateThriftToDb(NotificationEvent entry) {
     MNotificationLog dbEntry = new MNotificationLog();
     dbEntry.setEventId(entry.getEventId());

http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
index a2ae4c5..71982a0 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java
@@ -46,6 +46,8 @@ import 
org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.api.NotificationEventRequest;
 import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest;
+import org.apache.hadoop.hive.metastore.api.NotificationEventsCountResponse;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.PartitionEventType;
 import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet;
@@ -621,6 +623,13 @@ public interface RawStore extends Configurable {
    */
   public CurrentNotificationEventId getCurrentNotificationEventId();
 
+  /**
+   * Get the number of events corresponding to given database with fromEventId.
+   * This is intended for use by the repl commands to track the progress of 
incremental dump.
+   * @return
+   */
+  public NotificationEventsCountResponse 
getNotificationEventsCount(NotificationEventsCountRequest rqst);
+
   /*
    * Flush any catalog objects held by the metastore implementation.  Note 
that this does not
    * flush statistics objects.  This should be called at the beginning of each 
query.

http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
index 697cc2e..93d1ba6 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
@@ -63,6 +63,8 @@ import 
org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.api.NotificationEventRequest;
 import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest;
+import org.apache.hadoop.hive.metastore.api.NotificationEventsCountResponse;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.PartitionEventType;
 import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet;
@@ -1803,6 +1805,11 @@ public class CachedStore implements RawStore, 
Configurable {
   }
 
   @Override
+  public NotificationEventsCountResponse 
getNotificationEventsCount(NotificationEventsCountRequest rqst) {
+    return rawStore.getNotificationEventsCount(rqst);
+  }
+
+  @Override
   public void flushCache() {
     rawStore.flushCache();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java
index 8205c25..8f90c7a 100644
--- 
a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java
+++ 
b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.metastore.messaging;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest;
 import 
org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
 import org.apache.thrift.TException;
 
@@ -34,6 +35,7 @@ public class EventUtils {
   public interface NotificationFetcher {
     int getBatchSize() throws IOException;
     long getCurrentNotificationEventId() throws IOException;
+    long getDbNotificationEventsCount(long fromEventId, String dbName) throws 
IOException;
     List<NotificationEvent> getNextNotificationEvents(
         long pos, IMetaStoreClient.NotificationFilter filter) throws 
IOException;
   }
@@ -75,6 +77,17 @@ public class EventUtils {
     }
 
     @Override
+    public long getDbNotificationEventsCount(long fromEventId, String dbName) 
throws IOException {
+      try {
+        NotificationEventsCountRequest rqst
+                = new NotificationEventsCountRequest(fromEventId, dbName);
+        return msc.getNotificationEventsCount(rqst).getEventsCount();
+      } catch (TException e) {
+        throw new IOException(e);
+      }
+    }
+
+    @Override
     public List<NotificationEvent> getNextNotificationEvents(
         long pos, IMetaStoreClient.NotificationFilter filter) throws 
IOException {
       try {

http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
 
b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
index fdb2866..4db203d 100644
--- 
a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
+++ 
b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
@@ -44,6 +44,8 @@ import 
org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.api.NotificationEventRequest;
 import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest;
+import org.apache.hadoop.hive.metastore.api.NotificationEventsCountResponse;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.PartitionEventType;
 import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet;
@@ -793,6 +795,11 @@ public class DummyRawStoreControlledCommit implements 
RawStore, Configurable {
   }
 
   @Override
+  public NotificationEventsCountResponse 
getNotificationEventsCount(NotificationEventsCountRequest rqst) {
+    return  objectStore.getNotificationEventsCount(rqst);
+  }
+
+  @Override
   public void flushCache() {
     objectStore.flushCache();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
 
b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
index f422c4e..fb16cfc 100644
--- 
a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
+++ 
b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
@@ -45,6 +45,8 @@ import 
org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.api.NotificationEventRequest;
 import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest;
+import org.apache.hadoop.hive.metastore.api.NotificationEventsCountResponse;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.PartitionEventType;
 import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet;
@@ -809,6 +811,10 @@ public class DummyRawStoreForJdoConnection implements 
RawStore {
     return null;
   }
 
+  @Override
+  public NotificationEventsCountResponse 
getNotificationEventsCount(NotificationEventsCountRequest rqst) {
+    return null;
+  }
 
   public void flushCache() {
 

http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/if/queryplan.thrift
----------------------------------------------------------------------
diff --git a/ql/if/queryplan.thrift b/ql/if/queryplan.thrift
index 00b0200..aaf644a 100644
--- a/ql/if/queryplan.thrift
+++ b/ql/if/queryplan.thrift
@@ -102,6 +102,7 @@ enum StageType {
   COLUMNSTATS,
   REPL_DUMP,
   REPL_BOOTSTRAP_LOAD,
+  REPL_STATE_LOG
 }
 
 struct Stage {

http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp 
b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
index f467da2..7262017 100644
--- a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
+++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
@@ -117,7 +117,8 @@ int _kStageTypeValues[] = {
   StageType::DEPENDENCY_COLLECTION,
   StageType::COLUMNSTATS,
   StageType::REPL_DUMP,
-  StageType::REPL_BOOTSTRAP_LOAD
+  StageType::REPL_BOOTSTRAP_LOAD,
+  StageType::REPL_STATE_LOG
 };
 const char* _kStageTypeNames[] = {
   "CONDITIONAL",
@@ -133,9 +134,10 @@ const char* _kStageTypeNames[] = {
   "DEPENDENCY_COLLECTION",
   "COLUMNSTATS",
   "REPL_DUMP",
-  "REPL_BOOTSTRAP_LOAD"
+  "REPL_BOOTSTRAP_LOAD",
+  "REPL_STATE_LOG"
 };
-const std::map<int, const char*> 
_StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(14, 
_kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, 
NULL));
+const std::map<int, const char*> 
_StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(15, 
_kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, 
NULL));
 
 
 Adjacency::~Adjacency() throw() {

http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/gen/thrift/gen-cpp/queryplan_types.h
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.h 
b/ql/src/gen/thrift/gen-cpp/queryplan_types.h
index ac87ef7..18dc867 100644
--- a/ql/src/gen/thrift/gen-cpp/queryplan_types.h
+++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.h
@@ -95,7 +95,8 @@ struct StageType {
     DEPENDENCY_COLLECTION = 10,
     COLUMNSTATS = 11,
     REPL_DUMP = 12,
-    REPL_BOOTSTRAP_LOAD = 13
+    REPL_BOOTSTRAP_LOAD = 13,
+    REPL_STATE_LOG = 14
   };
 };
 

http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
----------------------------------------------------------------------
diff --git 
a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
 
b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
index 11a8f6d..ed408d2 100644
--- 
a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
+++ 
b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
@@ -25,7 +25,8 @@ public enum StageType implements org.apache.thrift.TEnum {
   DEPENDENCY_COLLECTION(10),
   COLUMNSTATS(11),
   REPL_DUMP(12),
-  REPL_BOOTSTRAP_LOAD(13);
+  REPL_BOOTSTRAP_LOAD(13),
+  REPL_STATE_LOG(14);
 
   private final int value;
 
@@ -74,6 +75,8 @@ public enum StageType implements org.apache.thrift.TEnum {
         return REPL_DUMP;
       case 13:
         return REPL_BOOTSTRAP_LOAD;
+      case 14:
+        return REPL_STATE_LOG;
       default:
         return null;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/gen/thrift/gen-php/Types.php
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-php/Types.php 
b/ql/src/gen/thrift/gen-php/Types.php
index 68edfcd..bca2eee 100644
--- a/ql/src/gen/thrift/gen-php/Types.php
+++ b/ql/src/gen/thrift/gen-php/Types.php
@@ -116,6 +116,7 @@ final class StageType {
   const COLUMNSTATS = 11;
   const REPL_DUMP = 12;
   const REPL_BOOTSTRAP_LOAD = 13;
+  const REPL_STATE_LOG = 14;
   static public $__names = array(
     0 => 'CONDITIONAL',
     1 => 'COPY',
@@ -131,6 +132,7 @@ final class StageType {
     11 => 'COLUMNSTATS',
     12 => 'REPL_DUMP',
     13 => 'REPL_BOOTSTRAP_LOAD',
+    14 => 'REPL_STATE_LOG',
   );
 }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py 
b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
index 6bf65af..1f0d627 100644
--- a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
+++ b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
@@ -162,6 +162,7 @@ class StageType:
   COLUMNSTATS = 11
   REPL_DUMP = 12
   REPL_BOOTSTRAP_LOAD = 13
+  REPL_STATE_LOG = 14
 
   _VALUES_TO_NAMES = {
     0: "CONDITIONAL",
@@ -178,6 +179,7 @@ class StageType:
     11: "COLUMNSTATS",
     12: "REPL_DUMP",
     13: "REPL_BOOTSTRAP_LOAD",
+    14: "REPL_STATE_LOG",
   }
 
   _NAMES_TO_VALUES = {
@@ -195,6 +197,7 @@ class StageType:
     "COLUMNSTATS": 11,
     "REPL_DUMP": 12,
     "REPL_BOOTSTRAP_LOAD": 13,
+    "REPL_STATE_LOG": 14,
   }
 
 

http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/gen/thrift/gen-rb/queryplan_types.rb
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-rb/queryplan_types.rb 
b/ql/src/gen/thrift/gen-rb/queryplan_types.rb
index 2730dde..88d9c17 100644
--- a/ql/src/gen/thrift/gen-rb/queryplan_types.rb
+++ b/ql/src/gen/thrift/gen-rb/queryplan_types.rb
@@ -74,8 +74,9 @@ module StageType
   COLUMNSTATS = 11
   REPL_DUMP = 12
   REPL_BOOTSTRAP_LOAD = 13
-  VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 
=> "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => 
"STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPL_DUMP", 
13 => "REPL_BOOTSTRAP_LOAD"}
-  VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, 
FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPL_DUMP, 
REPL_BOOTSTRAP_LOAD]).freeze
+  REPL_STATE_LOG = 14
+  VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 
=> "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => 
"STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPL_DUMP", 
13 => "REPL_BOOTSTRAP_LOAD", 14 => "REPL_STATE_LOG"}
+  VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, 
FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPL_DUMP, 
REPL_BOOTSTRAP_LOAD, REPL_STATE_LOG]).freeze
 end
 
 class Adjacency

http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
index 91ac4bf..fe9b624 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
 import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask;
 import org.apache.hadoop.hive.ql.exec.repl.ReplDumpTask;
 import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
+import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogTask;
+import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadTask;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadWork;
 import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
@@ -114,6 +116,7 @@ public final class TaskFactory {
     taskvec.add(new TaskTuple<SparkWork>(SparkWork.class, SparkTask.class));
     taskvec.add(new TaskTuple<>(ReplDumpWork.class, ReplDumpTask.class));
     taskvec.add(new TaskTuple<>(ReplLoadWork.class, ReplLoadTask.class));
+    taskvec.add(new TaskTuple<>(ReplStateLogWork.class, 
ReplStateLogTask.class));
   }
 
   private static ThreadLocal<Integer> tid = new ThreadLocal<Integer>() {

http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 67a67fd..d3af0ed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
 import org.apache.hadoop.hive.ql.parse.repl.dump.HiveWrapper;
 import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport;
 import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
@@ -49,6 +50,8 @@ import 
org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandlerFactory;
 import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer;
 import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter;
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+import org.apache.hadoop.hive.ql.parse.repl.dump.log.BootstrapDumpLogger;
+import org.apache.hadoop.hive.ql.parse.repl.dump.log.IncrementalDumpLogger;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -64,7 +67,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
   private static final String FUNCTION_METADATA_FILE_NAME = "_metadata";
 
   private Logger LOG = LoggerFactory.getLogger(ReplDumpTask.class);
-  private Logger REPL_STATE_LOG = LoggerFactory.getLogger("ReplState");
+  private ReplLogger replLogger;
 
   @Override
   public String getName() {
@@ -127,8 +130,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
     String dbName = (null != work.dbNameOrPattern && 
!work.dbNameOrPattern.isEmpty())
         ? work.dbNameOrPattern
         : "?";
-    REPL_STATE_LOG
-        .info("Repl Dump: Started Repl Dump for DB: {}, Dump Type: 
INCREMENTAL", dbName);
+    replLogger = new IncrementalDumpLogger(dbName, dumpRoot.toString(),
+            evFetcher.getDbNotificationEventsCount(work.eventFrom, dbName));
+    replLogger.startLog();
     while (evIter.hasNext()) {
       NotificationEvent ev = evIter.next();
       lastReplId = ev.getEventId();
@@ -136,7 +140,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
       dumpEvent(ev, evRoot, cmRoot);
     }
 
-    REPL_STATE_LOG.info("Repl Dump: Completed Repl Dump for DB: {}", dbName);
+    replLogger.endLog(lastReplId.toString());
 
     LOG.info("Done dumping events, preparing to return {},{}", 
dumpRoot.toUri(), lastReplId);
     Utils.writeOutput(
@@ -159,10 +163,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
         conf,
         getNewEventOnlyReplicationSpec(ev.getEventId())
     );
-    EventHandlerFactory.handlerFor(ev).handle(context);
-    REPL_STATE_LOG.info(
-        "Repl Dump: Dumped event with ID: {}, Type: {} and dumped metadata and 
data to path {}",
-        String.valueOf(ev.getEventId()), ev.getEventType(), 
evRoot.toUri().toString());
+    EventHandler eventHandler = EventHandlerFactory.handlerFor(ev);
+    eventHandler.handle(context);
+    replLogger.eventLog(String.valueOf(ev.getEventId()), 
eventHandler.dumpType().toString());
   }
 
   private ReplicationSpec getNewEventOnlyReplicationSpec(Long eventId) throws 
SemanticException {
@@ -174,12 +177,13 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
   private Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) 
throws Exception {
     // bootstrap case
     Long bootDumpBeginReplId = 
getHive().getMSC().getCurrentNotificationEventId().getEventId();
+
     for (String dbName : Utils.matchesDb(getHive(), work.dbNameOrPattern)) {
-      REPL_STATE_LOG
-          .info("Repl Dump: Started analyzing Repl Dump for DB: {}, Dump Type: 
BOOTSTRAP",
-              dbName);
       LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + 
dbName);
-
+      replLogger = new BootstrapDumpLogger(dbName, dumpRoot.toString(),
+              Utils.getAllTables(getHive(), dbName).size(),
+              getHive().getAllFunctions().size());
+      replLogger.startLog();
       Path dbRoot = dumpDbMetadata(dbName, dumpRoot);
       dumpFunctionMetadata(dbName, dumpRoot);
       for (String tblName : Utils.matchesTbl(getHive(), dbName, 
work.tableNameOrPattern)) {
@@ -187,6 +191,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
             "analyzeReplDump dumping table: " + tblName + " to db root " + 
dbRoot.toUri());
         dumpTable(dbName, tblName, dbRoot);
       }
+      replLogger.endLog(bootDumpBeginReplId.toString());
     }
     Long bootDumpEndReplId = 
getHive().getMSC().getCurrentNotificationEventId().getEventId();
     LOG.info("Bootstrap object dump phase took from {} to {}", 
bootDumpBeginReplId,
@@ -228,7 +233,6 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
     Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME);
     HiveWrapper.Tuple<Database> database = new HiveWrapper(getHive(), 
dbName).database();
     EximUtil.createDbExportDump(fs, dumpPath, database.object, 
database.replicationSpec);
-    REPL_STATE_LOG.info("Repl Dump: Dumped DB metadata");
     return dbRoot;
   }
 
@@ -240,9 +244,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
           new TableExport.Paths(work.astRepresentationForErrorMsg, dbRoot, 
tblName, conf);
       String distCpDoAsUser = 
conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
       new TableExport(exportPaths, ts, getNewReplicationSpec(), db, 
distCpDoAsUser, conf).write();
-      REPL_STATE_LOG.info(
-          "Repl Dump: Analyzed dump for table/view: {}.{} and dumping metadata 
and data to path {}",
-          dbName, tblName, exportPaths.exportRootDir.toString());
+      replLogger.tableLog(tblName, ts.tableHandle.getTableType());
     } catch (InvalidTableException te) {
       // Bootstrap dump shouldn't fail if the table is dropped/renamed while 
dumping it.
       // Just log a debug message and skip it.
@@ -295,7 +297,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
         FunctionSerializer serializer = new FunctionSerializer(tuple.object, 
conf);
         serializer.writeTo(jsonWriter, tuple.replicationSpec);
       }
-      REPL_STATE_LOG.info("Repl Dump: Dumped metadata for function: {}", 
functionName);
+      replLogger.functionLog(functionName);
     }
   }
 
@@ -303,8 +305,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
     try {
       HiveWrapper.Tuple<Function> tuple = new HiveWrapper(getHive(), 
dbName).function(functionName);
       if (tuple.object.getResourceUris().isEmpty()) {
-        REPL_STATE_LOG.warn(
-            "Not replicating function: " + functionName + " as it seems to 
have been created "
+        LOG.warn("Not replicating function: " + functionName + " as it seems 
to have been created "
                 + "without USING clause");
         return null;
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogTask.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogTask.java
new file mode 100644
index 0000000..614af54
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogTask.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+
+import java.io.Serializable;
+
+/**
+ * ReplStateLogTask.
+ *
+ * Exists for the sole purpose of reducing the number of dependency edges in 
the task graph.
+ **/
+public class ReplStateLogTask extends Task<ReplStateLogWork> implements 
Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public int execute(DriverContext driverContext) {
+    work.replStateLog();
+    return 0;
+  }
+
+  @Override
+  public StageType getType() {
+    return StageType.REPL_STATE_LOG;
+  }
+
+  @Override
+  public String getName() {
+    return "REPL_STATE_LOG";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogWork.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogWork.java
new file mode 100644
index 0000000..6462fe5
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogWork.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+
+import java.io.Serializable;
+import java.util.Map;
+
+
+/**
+ * ReplStateLogWork
+ *
+ */
+@Explain(displayName = "Repl State Log", explainLevels = { Level.USER, 
Level.DEFAULT, Level.EXTENDED })
+public class ReplStateLogWork implements Serializable {
+  private static final long serialVersionUID = 1L;
+  private final ReplLogger replLogger;
+  private final LOG_TYPE logType;
+  private String eventId;
+  private String eventType;
+  private String tableName;
+  private TableType tableType;
+  private String functionName;
+  private String lastReplId;
+
+  private enum LOG_TYPE {
+    TABLE,
+    FUNCTION,
+    EVENT,
+    END
+  }
+
+  public ReplStateLogWork(ReplLogger replLogger, String eventId, String 
eventType) {
+    this.logType = LOG_TYPE.EVENT;
+    this.replLogger = replLogger;
+    this.eventId = eventId;
+    this.eventType = eventType;
+  }
+
+  public ReplStateLogWork(ReplLogger replLogger, String tableName, TableType 
tableType) {
+    this.logType = LOG_TYPE.TABLE;
+    this.replLogger = replLogger;
+    this.tableName = tableName;
+    this.tableType = tableType;
+  }
+
+  public ReplStateLogWork(ReplLogger replLogger, String functionName) {
+    this.logType = LOG_TYPE.FUNCTION;
+    this.replLogger = replLogger;
+    this.functionName = functionName;
+  }
+
+  public ReplStateLogWork(ReplLogger replLogger, Map<String,String> dbProps) {
+    this.logType = LOG_TYPE.END;
+    this.replLogger = replLogger;
+    this.lastReplId = 
ReplicationSpec.getLastReplicatedStateFromParameters(dbProps);
+  }
+
+  public void replStateLog() {
+    switch (logType) {
+      case TABLE: {
+        replLogger.tableLog(tableName, tableType);
+        break;
+      }
+      case FUNCTION: {
+        replLogger.functionLog(functionName);
+        break;
+      }
+      case EVENT: {
+        replLogger.eventLog(eventId, eventType);
+        break;
+      }
+      case END: {
+        replLogger.endLog(lastReplId);
+        break;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
index 6ea1754..cd31b17 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
@@ -18,9 +18,12 @@
 package org.apache.hadoop.hive.ql.exec.repl.bootstrap;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.FunctionEvent;
@@ -35,6 +38,8 @@ import 
org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadTable;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.TableContext;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
+import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 
 import java.io.Serializable;
@@ -107,7 +112,8 @@ public class ReplLoadTask extends Task<ReplLoadWork> 
implements Serializable {
           TableContext tableContext =
               new TableContext(dbTracker, work.dbNameToLoadIn, 
work.tableNameToLoadIn);
           TableEvent tableEvent = (TableEvent) next;
-          LoadTable loadTable = new LoadTable(tableEvent, context, 
tableContext, loadTaskTracker);
+          LoadTable loadTable = new LoadTable(tableEvent, context, 
iterator.replLogger(),
+                                              tableContext, loadTaskTracker);
           tableTracker = loadTable.tasks();
           if (!scope.database) {
             scope.rootTasks.addAll(tableTracker.tasks());
@@ -124,8 +130,8 @@ public class ReplLoadTask extends Task<ReplLoadWork> 
implements Serializable {
 
           // for a table we explicitly try to load partitions as there is no 
separate partitions events.
           LoadPartitions loadPartitions =
-              new LoadPartitions(context, loadTaskTracker, tableEvent, 
work.dbNameToLoadIn,
-                  tableContext);
+              new LoadPartitions(context, iterator.replLogger(), 
loadTaskTracker, tableEvent,
+                      work.dbNameToLoadIn, tableContext);
           TaskTracker partitionsTracker = loadPartitions.tasks();
           partitionsPostProcessing(iterator, scope, loadTaskTracker, 
tableTracker,
               partitionsTracker);
@@ -142,9 +148,8 @@ public class ReplLoadTask extends Task<ReplLoadWork> 
implements Serializable {
           TableContext tableContext = new TableContext(dbTracker, 
work.dbNameToLoadIn,
               work.tableNameToLoadIn);
           LoadPartitions loadPartitions =
-              new LoadPartitions(context, tableContext, loadTaskTracker, 
event.asTableEvent(),
-                  work.dbNameToLoadIn,
-                  event.lastPartitionReplicated());
+              new LoadPartitions(context, iterator.replLogger(), tableContext, 
loadTaskTracker,
+                      event.asTableEvent(), work.dbNameToLoadIn, 
event.lastPartitionReplicated());
           /*
                the tableTracker here should be a new instance and not an 
existing one as this can
                only happen when we break in between loading partitions.
@@ -156,8 +161,8 @@ public class ReplLoadTask extends Task<ReplLoadWork> 
implements Serializable {
           break;
         }
         case Function: {
-          LoadFunction loadFunction =
-              new LoadFunction(context, (FunctionEvent) next, 
work.dbNameToLoadIn, dbTracker);
+          LoadFunction loadFunction = new LoadFunction(context, 
iterator.replLogger(),
+                                              (FunctionEvent) next, 
work.dbNameToLoadIn, dbTracker);
           TaskTracker functionsTracker = loadFunction.tasks();
           if (!scope.database) {
             scope.rootTasks.addAll(functionsTracker.tasks());
@@ -169,11 +174,16 @@ public class ReplLoadTask extends Task<ReplLoadWork> 
implements Serializable {
           break;
         }
         }
+
+        if (!iterator.currentDbHasNext()) {
+          createEndReplLogTask(context, scope, iterator.replLogger());
+        }
       }
       boolean addAnotherLoadTask = iterator.hasNext() || 
loadTaskTracker.hasReplicationState();
       createBuilderTask(scope.rootTasks, addAnotherLoadTask);
       if (!iterator.hasNext()) {
         loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, 
scope));
+        work.updateDbEventState(null);
       }
       this.childTasks = scope.rootTasks;
       LOG.info("Root Tasks / Total Tasks : {} / {} ", childTasks.size(), 
loadTaskTracker.numberOfTasks());
@@ -186,6 +196,19 @@ public class ReplLoadTask extends Task<ReplLoadWork> 
implements Serializable {
     return 0;
   }
 
+  private Task<? extends Serializable> createEndReplLogTask(Context context, 
Scope scope,
+                                                  ReplLogger replLogger) 
throws SemanticException {
+    Database dbInMetadata = 
work.databaseEvent(context.hiveConf).dbInMetadata(work.dbNameToLoadIn);
+    ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, 
dbInMetadata.getParameters());
+    Task<ReplStateLogWork> replLogTask = TaskFactory.get(replLogWork, conf);
+    if (null == scope.rootTasks) {
+      scope.rootTasks.add(replLogTask);
+    } else {
+      dependency(scope.rootTasks, replLogTask);
+    }
+    return replLogTask;
+  }
+
   /**
    * There was a database update done before and we want to make sure we 
update the last repl
    * id on this database as we are now going to switch to processing a new 
database.
@@ -243,18 +266,20 @@ public class ReplLoadTask extends Task<ReplLoadWork> 
implements Serializable {
   /**
    * add the dependency to the leaf node
    */
-  private boolean dependency(List<Task<? extends Serializable>> tasks,
-      Task<ReplLoadWork> loadTask) {
+  public static boolean dependency(List<Task<? extends Serializable>> tasks, 
Task<?> tailTask) {
     if (tasks == null || tasks.isEmpty()) {
       return true;
     }
     for (Task<? extends Serializable> task : tasks) {
-      boolean dependency = dependency(task.getChildTasks(), loadTask);
-      if (dependency) {
-        task.addDependentTask(loadTask);
+      if (task == tailTask) {
+        continue;
+      }
+      boolean leafNode = dependency(task.getChildTasks(), tailTask);
+      if (leafNode) {
+        task.addDependentTask(tailTask);
       }
     }
-    return true;
+    return false;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
index eb18e5f..f51afe1 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
@@ -38,7 +38,7 @@ public class ReplLoadWork implements Serializable {
   public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String 
dbNameToLoadIn,
       String tableNameToLoadIn) throws IOException {
     this.tableNameToLoadIn = tableNameToLoadIn;
-    this.iterator = new BootstrapEventsIterator(dumpDirectory, hiveConf);
+    this.iterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, 
hiveConf);
     this.dbNameToLoadIn = dbNameToLoadIn;
   }
 
@@ -61,7 +61,6 @@ public class ReplLoadWork implements Serializable {
 
   DatabaseEvent databaseEvent(HiveConf hiveConf) {
     DatabaseEvent databaseEvent = state.toEvent(hiveConf);
-    state = null;
     return databaseEvent;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
index 4e635ad..43a85f2 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
@@ -17,13 +17,15 @@
  */
 package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem;
 
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.*;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.repl.load.log.BootstrapLoadLogger;
+import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -69,8 +71,13 @@ public class BootstrapEventsIterator implements 
Iterator<BootstrapEvent> {
       warehouse.
    */
   private Iterator<DatabaseEventsIterator> dbEventsIterator;
+  private final String dumpDirectory;
+  private final String dbNameToLoadIn;
+  private final HiveConf hiveConf;
+  private ReplLogger replLogger;
 
-  public BootstrapEventsIterator(String dumpDirectory, HiveConf hiveConf) 
throws IOException {
+  public BootstrapEventsIterator(String dumpDirectory, String dbNameToLoadIn, 
HiveConf hiveConf)
+          throws IOException {
     Path path = new Path(dumpDirectory);
     FileSystem fileSystem = path.getFileSystem(hiveConf);
     FileStatus[] fileStatuses =
@@ -93,6 +100,9 @@ public class BootstrapEventsIterator implements 
Iterator<BootstrapEvent> {
       }
     }).collect(Collectors.toList()).iterator();
 
+    this.dumpDirectory = dumpDirectory;
+    this.dbNameToLoadIn = dbNameToLoadIn;
+    this.hiveConf = hiveConf;
   }
 
   @Override
@@ -101,6 +111,7 @@ public class BootstrapEventsIterator implements 
Iterator<BootstrapEvent> {
       if (currentDatabaseIterator == null) {
         if (dbEventsIterator.hasNext()) {
           currentDatabaseIterator = dbEventsIterator.next();
+          initReplLogger();
         } else {
           return false;
         }
@@ -127,7 +138,44 @@ public class BootstrapEventsIterator implements 
Iterator<BootstrapEvent> {
     throw new UnsupportedOperationException("This operation is not supported");
   }
 
+  public boolean currentDbHasNext() {
+    return ((currentDatabaseIterator != null) && 
(currentDatabaseIterator.hasNext()));
+  }
+
   public void setReplicationState(ReplicationState replicationState) {
     this.currentDatabaseIterator.replicationState = replicationState;
   }
+
+  public ReplLogger replLogger() {
+    return replLogger;
+  }
+
+  private void initReplLogger() {
+    try {
+      Path dbDumpPath = currentDatabaseIterator.dbLevelPath();
+      FileSystem fs = dbDumpPath.getFileSystem(hiveConf);
+
+      long numTables = getSubDirs(fs, dbDumpPath).length;
+      long numFunctions = 0;
+      Path funcPath = new Path(dbDumpPath, 
ReplicationSemanticAnalyzer.FUNCTIONS_ROOT_DIR_NAME);
+      if (fs.exists(funcPath)) {
+        numFunctions = getSubDirs(fs, funcPath).length;
+      }
+      String dbName = StringUtils.isBlank(dbNameToLoadIn) ? 
dbDumpPath.getName() : dbNameToLoadIn;
+      replLogger = new BootstrapLoadLogger(dbName, dumpDirectory, numTables, 
numFunctions);
+      replLogger.startLog();
+    } catch (IOException e) {
+      // Ignore the exception
+    }
+  }
+
+  FileStatus[] getSubDirs(FileSystem fs, Path dirPath) throws IOException {
+    return fs.listStatus(dirPath, new PathFilter() {
+      @Override
+      public boolean accept(Path p) {
+        String name = p.getName();
+        return !name.startsWith("_") && !name.startsWith(".");
+      }
+    });
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java
index 3100875..dc0e192 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java
@@ -58,6 +58,10 @@ class DatabaseEventsIterator implements 
Iterator<BootstrapEvent> {
     remoteIterator = fileSystem.listFiles(dbLevelPath, true);
   }
 
+  public Path dbLevelPath() {
+    return this.dbLevelPath;
+  }
+
   @Override
   public boolean hasNext() {
     try {

http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java
index e9b8711..8852a60 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java
@@ -20,10 +20,14 @@ package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.FunctionEvent;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadTask;
 import org.apache.hadoop.hive.ql.parse.repl.load.message.CreateFunctionHandler;
 import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler;
 import org.slf4j.Logger;
@@ -39,18 +43,27 @@ import static 
org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.stripQuotes;
 public class LoadFunction {
   private static final Logger LOG = 
LoggerFactory.getLogger(LoadFunction.class);
   private Context context;
+  private ReplLogger replLogger;
   private final FunctionEvent event;
   private final String dbNameToLoadIn;
   private final TaskTracker tracker;
 
-  public LoadFunction(Context context, FunctionEvent event, String 
dbNameToLoadIn,
-      TaskTracker existingTracker) {
+  public LoadFunction(Context context, ReplLogger replLogger, FunctionEvent 
event,
+                      String dbNameToLoadIn, TaskTracker existingTracker) {
     this.context = context;
+    this.replLogger = replLogger;
     this.event = event;
     this.dbNameToLoadIn = dbNameToLoadIn;
     this.tracker = new TaskTracker(existingTracker);
   }
 
+  private void createFunctionReplLogTask(List<Task<? extends Serializable>> 
functionTasks,
+                                         String functionName) {
+    ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, 
functionName);
+    Task<ReplStateLogWork> replLogTask = TaskFactory.get(replLogWork, 
context.hiveConf);
+    ReplLoadTask.dependency(functionTasks, replLogTask);
+  }
+
   public TaskTracker tasks() throws IOException, SemanticException {
     URI fromURI = EximUtil
         .getValidatedURI(context.hiveConf, 
stripQuotes(event.rootDir().toUri().toString()));
@@ -63,6 +76,7 @@ public class LoadFunction {
               dbNameToLoadIn, null, fromPath.toString(), null, null, 
context.hiveConf,
               context.hiveDb, null, LOG)
       );
+      createFunctionReplLogTask(tasks, handler.getFunctionName());
       tasks.forEach(tracker::addTask);
       return tracker;
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java
index f246b8a..374a03b 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java
@@ -59,14 +59,21 @@ public class TaskTracker {
    */
   public void addTask(Task<? extends Serializable> task) {
     tasks.add(task);
-    updateTaskCount(task);
+
+    List <Task<? extends Serializable>> visited = new ArrayList<>();
+    updateTaskCount(task, visited);
   }
 
-  private void updateTaskCount(Task<? extends Serializable> task) {
+  public void updateTaskCount(Task<? extends Serializable> task,
+                              List <Task<? extends Serializable>> visited) {
     numberOfTasks += 1;
+    visited.add(task);
     if (task.getChildTasks() != null) {
       for (Task<? extends Serializable> childTask : task.getChildTasks()) {
-        updateTaskCount(childTask);
+        if (visited.contains(childTask)) {
+          continue;
+        }
+        updateTaskCount(childTask, visited);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
index f088ba9..c944a13 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@ -26,8 +26,10 @@ import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadTask;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -36,6 +38,7 @@ import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
 import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
 import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
@@ -46,10 +49,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 import static 
org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState.PartitionState;
 import static 
org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.isPartitioned;
@@ -59,6 +59,7 @@ public class LoadPartitions {
   private static Logger LOG = LoggerFactory.getLogger(LoadPartitions.class);
 
   private final Context context;
+  private final ReplLogger replLogger;
   private final TableContext tableContext;
   private final TableEvent event;
   private final TaskTracker tracker;
@@ -67,17 +68,19 @@ public class LoadPartitions {
   private final ImportTableDesc tableDesc;
   private Table table;
 
-  public LoadPartitions(Context context, TaskTracker tableTracker, TableEvent 
event,
-      String dbNameToLoadIn, TableContext tableContext) throws HiveException, 
IOException {
-    this(context, tableContext, tableTracker, event, dbNameToLoadIn, null);
+  public LoadPartitions(Context context, ReplLogger replLogger, TaskTracker 
tableTracker,
+                        TableEvent event, String dbNameToLoadIn,
+                        TableContext tableContext) throws HiveException, 
IOException {
+    this(context, replLogger, tableContext, tableTracker, event, 
dbNameToLoadIn, null);
   }
 
-  public LoadPartitions(Context context, TableContext tableContext, 
TaskTracker limiter,
-      TableEvent event, String dbNameToLoadIn, AddPartitionDesc 
lastReplicatedPartition)
-      throws HiveException, IOException {
+  public LoadPartitions(Context context, ReplLogger replLogger, TableContext 
tableContext,
+                        TaskTracker limiter, TableEvent event, String 
dbNameToLoadIn,
+                        AddPartitionDesc lastReplicatedPartition) throws 
HiveException, IOException {
     this.tracker = new TaskTracker(limiter);
     this.event = event;
     this.context = context;
+    this.replLogger = replLogger;
     this.lastReplicatedPartition = lastReplicatedPartition;
     this.tableContext = tableContext;
 
@@ -98,6 +101,21 @@ public class LoadPartitions {
     }
   }
 
+  private void createTableReplLogTask() throws SemanticException {
+    ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger,
+                                            tableDesc.getTableName(), 
tableDesc.tableType());
+    Task<ReplStateLogWork> replLogTask = TaskFactory.get(replLogWork, 
context.hiveConf);
+
+    if (tracker.tasks().isEmpty()) {
+      tracker.addTask(replLogTask);
+    } else {
+      ReplLoadTask.dependency(tracker.tasks(), replLogTask);
+
+      List<Task<? extends Serializable>> visited = new ArrayList<>();
+      tracker.updateTaskCount(replLogTask, visited);
+    }
+  }
+
   public TaskTracker tasks() throws SemanticException {
     try {
       /*
@@ -113,7 +131,11 @@ public class LoadPartitions {
         table = new Table(tableDesc.getDatabaseName(), 
tableDesc.getTableName());
         if (isPartitioned(tableDesc)) {
           updateReplicationState(initialReplicationState());
-          return forNewTable();
+          if (!forNewTable().hasReplicationState()) {
+            // Add ReplStateLogTask only if no pending table load tasks left 
for next cycle
+            createTableReplLogTask();
+          }
+          return tracker;
         }
       } else {
         // existing
@@ -122,7 +144,11 @@ public class LoadPartitions {
           List<AddPartitionDesc> partitionDescs = 
event.partitionDescriptions(tableDesc);
           if (!event.replicationSpec().isMetadataOnly() && 
!partitionDescs.isEmpty()) {
             updateReplicationState(initialReplicationState());
-            return forExistingTable(lastReplicatedPartition);
+            if 
(!forExistingTable(lastReplicatedPartition).hasReplicationState()) {
+              // Add ReplStateLogTask only if no pending table load tasks left 
for next cycle
+              createTableReplLogTask();
+            }
+            return tracker;
           }
         }
       }
@@ -149,9 +175,11 @@ public class LoadPartitions {
     while (iterator.hasNext() && tracker.canAddMoreTasks()) {
       AddPartitionDesc addPartitionDesc = iterator.next();
       tracker.addTask(addSinglePartition(table, addPartitionDesc));
-      ReplicationState currentReplicationState =
-          new ReplicationState(new PartitionState(table.getTableName(), 
addPartitionDesc));
-      updateReplicationState(currentReplicationState);
+      if (iterator.hasNext() && !tracker.canAddMoreTasks()) {
+        ReplicationState currentReplicationState =
+                new ReplicationState(new PartitionState(table.getTableName(), 
addPartitionDesc));
+        updateReplicationState(currentReplicationState);
+      }
     }
     return tracker;
   }
@@ -236,13 +264,16 @@ public class LoadPartitions {
     boolean encounteredTheLastReplicatedPartition = (lastPartitionReplicated 
== null);
     ReplicationSpec replicationSpec = event.replicationSpec();
     LOG.debug("table partitioned");
-    for (AddPartitionDesc addPartitionDesc : 
event.partitionDescriptions(tableDesc)) {
+
+    Iterator<AddPartitionDesc> iterator = 
event.partitionDescriptions(tableDesc).iterator();
+    while (iterator.hasNext()) {
       /*
       encounteredTheLastReplicatedPartition will be set, when we break 
creation of partition tasks
       for a table, as we have reached the limit of number of tasks we should 
create for execution.
       in this case on the next run we have to iterate over the partitions desc 
to reach the last replicated
       partition so that we can start replicating partitions after that.
        */
+      AddPartitionDesc addPartitionDesc = iterator.next();
       if (encounteredTheLastReplicatedPartition && tracker.canAddMoreTasks()) {
         Map<String, String> partSpec = 
addPartitionDesc.getPartition(0).getPartSpec();
         Partition ptn;
@@ -257,7 +288,7 @@ public class LoadPartitions {
           if (replicationSpec.allowReplacementInto(ptn.getParameters())) {
             if (replicationSpec.isMetadataOnly()) {
               tracker.addTask(alterSinglePartition(addPartitionDesc, 
replicationSpec, ptn));
-              if (!tracker.canAddMoreTasks()) {
+              if (iterator.hasNext() && !tracker.canAddMoreTasks()) {
                 tracker.setReplicationState(
                     new ReplicationState(new 
PartitionState(table.getTableName(), addPartitionDesc)
                     )

http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
index cbb964a..a1187c4 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
@@ -19,6 +19,7 @@ package 
org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.ql.ErrorMsg;
@@ -26,6 +27,8 @@ import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadTask;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
@@ -34,6 +37,7 @@ import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
 import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
@@ -42,7 +46,9 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.List;
 import java.util.TreeMap;
 
 import static 
org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.isPartitioned;
@@ -51,18 +57,36 @@ public class LoadTable {
   private final static Logger LOG = LoggerFactory.getLogger(LoadTable.class);
   //  private final Helper helper;
   private final Context context;
+  private final ReplLogger replLogger;
   private final TableContext tableContext;
   private final TaskTracker tracker;
   private final TableEvent event;
 
-  public LoadTable(TableEvent event, Context context, TableContext 
tableContext, TaskTracker limiter)
+  public LoadTable(TableEvent event, Context context, ReplLogger replLogger,
+                   TableContext tableContext, TaskTracker limiter)
       throws SemanticException, IOException {
     this.event = event;
     this.context = context;
+    this.replLogger = replLogger;
     this.tableContext = tableContext;
     this.tracker = new TaskTracker(limiter);
   }
 
+  private void createTableReplLogTask(String tableName, TableType tableType) 
throws SemanticException {
+    ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger,tableName, 
tableType);
+    Task<ReplStateLogWork> replLogTask = TaskFactory.get(replLogWork, 
context.hiveConf);
+    ReplLoadTask.dependency(tracker.tasks(), replLogTask);
+
+    if (tracker.tasks().isEmpty()) {
+      tracker.addTask(replLogTask);
+    } else {
+      ReplLoadTask.dependency(tracker.tasks(), replLogTask);
+
+      List<Task<? extends Serializable>> visited = new ArrayList<>();
+      tracker.updateTaskCount(replLogTask, visited);
+    }
+  }
+
   public TaskTracker tasks() throws SemanticException {
     // Path being passed to us is a table dump location. We go ahead and load 
it in as needed.
     // If tblName is null, then we default to the table name specified in 
_metadata, which is good.
@@ -123,22 +147,27 @@ public class LoadTable {
      behave like a noop or a pure MD alter.
   */
       if (table == null) {
-        return newTableTasks(tableDesc);
+        newTableTasks(tableDesc);
       } else {
-        return existingTableTasks(tableDesc, table, replicationSpec);
+        existingTableTasks(tableDesc, table, replicationSpec);
+      }
+
+      if (!isPartitioned(tableDesc)) {
+        createTableReplLogTask(tableDesc.getTableName(), 
tableDesc.tableType());
       }
+      return tracker;
     } catch (Exception e) {
       throw new SemanticException(e);
     }
   }
 
-  private TaskTracker existingTableTasks(ImportTableDesc tblDesc, Table table,
+  private void existingTableTasks(ImportTableDesc tblDesc, Table table,
       ReplicationSpec replicationSpec) {
     if (!table.isPartitioned()) {
 
       LOG.debug("table non-partitioned");
       if (!replicationSpec.allowReplacementInto(table.getParameters())) {
-        return tracker; // silently return, table is newer than our 
replacement.
+        return; // silently return, table is newer than our replacement.
       }
 
       Task<? extends Serializable> alterTableTask = alterTableTask(tblDesc, 
replicationSpec);
@@ -151,10 +180,9 @@ public class LoadTable {
         tracker.addTask(alterTableTask);
       }
     }
-    return tracker;
   }
 
-  private TaskTracker newTableTasks(ImportTableDesc tblDesc) throws 
SemanticException {
+  private void newTableTasks(ImportTableDesc tblDesc) throws SemanticException 
{
     Table table;
     table = new Table(tblDesc.getDatabaseName(), tblDesc.getTableName());
     // Either we're dropping and re-creating, or the table didn't exist, and 
we're creating.
@@ -162,7 +190,7 @@ public class LoadTable {
         tblDesc.getCreateTableTask(new HashSet<>(), new HashSet<>(), 
context.hiveConf);
     if (event.replicationSpec().isMetadataOnly()) {
       tracker.addTask(createTableTask);
-      return tracker;
+      return;
     }
     if (!isPartitioned(tblDesc)) {
       LOG.debug("adding dependent CopyWork/MoveWork for table");
@@ -172,7 +200,6 @@ public class LoadTable {
       createTableTask.addDependentTask(loadTableTask);
     }
     tracker.addTask(createTableTask);
-    return tracker;
   }
 
   private String location(ImportTableDesc tblDesc, Database parentDb)

http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 3e2c513..7794d3e 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.parse;
 
+import io.netty.util.internal.StringUtil;
 import org.antlr.runtime.tree.Tree;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileStatus;
@@ -28,6 +29,7 @@ import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
+import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadWork;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -37,13 +39,13 @@ import 
org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator;
 import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker;
 import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler;
+import org.apache.hadoop.hive.ql.parse.repl.load.log.IncrementalLoadLogger;
+import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
 import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
 import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.FileNotFoundException;
 import java.io.Serializable;
@@ -76,7 +78,6 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
   private static final String dumpSchema = 
"dump_dir,last_repl_id#string,string";
 
   public static final String FUNCTIONS_ROOT_DIR_NAME = "_functions";
-  private final static Logger REPL_STATE_LOG = 
LoggerFactory.getLogger("ReplState");
 
   ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException {
     super(queryState);
@@ -329,12 +330,9 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
         Task<? extends Serializable> evTaskRoot = TaskFactory.get(new 
DependencyCollectionWork(), conf);
         Task<? extends Serializable> taskChainTail = evTaskRoot;
 
-        int evstage = 0;
-        int evIter = 0;
+        ReplLogger replLogger = new IncrementalLoadLogger(dbNameOrPattern,
+                loadPath.toString(), dirsInLoadPath.length);
 
-        REPL_STATE_LOG.info("Repl Load: Started analyzing Repl load for DB: {} 
from path {}, Dump Type: INCREMENTAL",
-                (null != dbNameOrPattern && !dbNameOrPattern.isEmpty()) ? 
dbNameOrPattern : "?",
-                loadPath.toUri().toString());
         for (FileStatus dir : dirsInLoadPath){
           LOG.debug("Loading event from {} to {}.{}", dir.getPath().toUri(), 
dbNameOrPattern, tblNameOrPattern);
 
@@ -359,17 +357,16 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
 
           String locn = dir.getPath().toUri().toString();
           DumpMetaData eventDmd = new DumpMetaData(new Path(locn), conf);
-          List<Task<? extends Serializable>> evTasks = analyzeEventLoad(
-              dbNameOrPattern, tblNameOrPattern, locn, taskChainTail, 
eventDmd);
-          evIter++;
-          REPL_STATE_LOG.info("Repl Load: Analyzed load for event {}/{} " +
-                              "with ID: {}, Type: {}, Path: {}",
-                              evIter, dirsInLoadPath.length,
-                              dir.getPath().getName(), 
eventDmd.getDumpType().toString(), locn);
-
-          LOG.debug("evstage#{} got {} tasks", evstage, evTasks!=null ? 
evTasks.size() : 0);
+          MessageHandler.Context context = new 
MessageHandler.Context(dbNameOrPattern,
+                                                          tblNameOrPattern, 
locn, taskChainTail,
+                                                          eventDmd, conf, db, 
ctx, LOG);
+          List<Task<? extends Serializable>> evTasks = 
analyzeEventLoad(context);
+
           if ((evTasks != null) && (!evTasks.isEmpty())){
-            Task<? extends Serializable> barrierTask = TaskFactory.get(new 
DependencyCollectionWork(), conf);
+            ReplStateLogWork replStateLogWork = new 
ReplStateLogWork(replLogger,
+                                                          
dir.getPath().getName(),
+                                                          
eventDmd.getDumpType().toString());
+            Task<? extends Serializable> barrierTask = 
TaskFactory.get(replStateLogWork, conf);
             for (Task<? extends Serializable> t : evTasks){
               t.addDependentTask(barrierTask);
               LOG.debug("Added {}:{} as a precursor of barrier task {}:{}",
@@ -378,14 +375,23 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
             LOG.debug("Updated taskChainTail from {}{} to {}{}",
                 taskChainTail.getClass(), taskChainTail.getId(), 
barrierTask.getClass(), barrierTask.getId());
             taskChainTail = barrierTask;
-            evstage++;
           }
         }
+
+        // If any event is there and db name is known, then dump the start and 
end logs
+        if (!evTaskRoot.equals(taskChainTail)) {
+          Map<String, String> dbProps = new HashMap<>();
+          dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), 
String.valueOf(dmd.getEventTo()));
+          ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, 
dbProps);
+          Task<? extends Serializable> barrierTask = 
TaskFactory.get(replStateLogWork, conf);
+          taskChainTail.addDependentTask(barrierTask);
+          LOG.debug("Added {}:{} as a precursor of barrier task {}:{}",
+                  taskChainTail.getClass(), taskChainTail.getId(),
+                  barrierTask.getClass(), barrierTask.getId());
+
+          replLogger.startLog();
+        }
         rootTasks.add(evTaskRoot);
-        REPL_STATE_LOG.info("Repl Load: Completed analyzing Repl load for DB: 
{} from path {} and created import " +
-                            "(DDL/COPY/MOVE) tasks",
-                            (null != dbNameOrPattern && 
!dbNameOrPattern.isEmpty()) ? dbNameOrPattern : "?",
-                            loadPath.toUri().toString());
       }
 
     } catch (Exception e) {
@@ -395,26 +401,22 @@ public class ReplicationSemanticAnalyzer extends 
BaseSemanticAnalyzer {
   }
 
   private List<Task<? extends Serializable>> analyzeEventLoad(
-      String dbName, String tblName,
-      String location,
-      Task<? extends Serializable> precursor,
-      DumpMetaData dmd)
+          MessageHandler.Context context)
       throws SemanticException {
-    MessageHandler.Context context =
-        new MessageHandler.Context(dbName, tblName, location, precursor, dmd, 
conf, db, ctx, LOG);
-    MessageHandler messageHandler = dmd.getDumpType().handler();
+    MessageHandler messageHandler = context.dmd.getDumpType().handler();
     List<Task<? extends Serializable>> tasks = messageHandler.handle(context);
 
-    if (precursor != null) {
+    if (context.precursor != null) {
       for (Task<? extends Serializable> t : tasks) {
-        precursor.addDependentTask(t);
+        context.precursor.addDependentTask(t);
         LOG.debug("Added {}:{} as a precursor of {}:{}",
-            precursor.getClass(), precursor.getId(), t.getClass(), t.getId());
+                context.precursor.getClass(), context.precursor.getId(), 
t.getClass(), t.getId());
       }
     }
+
     inputs.addAll(messageHandler.readEntities());
     outputs.addAll(messageHandler.writeEntities());
-    return addUpdateReplStateTasks(StringUtils.isEmpty(tblName),
+    return addUpdateReplStateTasks(StringUtils.isEmpty(context.tableName),
                             messageHandler.getUpdatedMetadata(), tasks);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
index 1c54d29..235a44c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
@@ -207,13 +207,6 @@ public class ReplicationSpec {
     };
   }
 
-  private static String getLastReplicatedStateFromParameters(Map<String, 
String> m) {
-    if ((m != null) && (m.containsKey(KEY.CURR_STATE_ID.toString()))){
-      return m.get(KEY.CURR_STATE_ID.toString());
-    }
-    return null;
-  }
-
   private void init(ASTNode node){
     // -> ^(TOK_REPLICATION $replId $isMetadataOnly)
     isInReplicationScope = true;
@@ -225,6 +218,13 @@ public class ReplicationSpec {
     }
   }
 
+  public static String getLastReplicatedStateFromParameters(Map<String, 
String> m) {
+    if ((m != null) && (m.containsKey(KEY.CURR_STATE_ID.toString()))){
+      return m.get(KEY.CURR_STATE_ID.toString());
+    }
+    return null;
+  }
+
   /**
    * @return true if this statement is being run for the purposes of 
replication
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/ReplLogger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/ReplLogger.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/ReplLogger.java
new file mode 100644
index 0000000..6f8d5e0
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/ReplLogger.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl;
+
+import org.apache.hadoop.hive.metastore.TableType;
+
+public abstract class ReplLogger {
+
+  public ReplLogger() {
+  }
+
+  public abstract void startLog();
+  public abstract void endLog(String lastReplId);
+
+  public void tableLog(String tableName, TableType tableType) {
+  }
+  public void functionLog(String funcName){
+  }
+  public void eventLog(String eventId, String eventType) {
+  }
+}

Reply via email to