HIVE-17100 : Improve HS2 operation logs for REPL commands (Sankar Hariappan, reviewed by Anishek Agarwal, Thejas Nair)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/70cb7f0b Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/70cb7f0b Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/70cb7f0b Branch: refs/heads/master Commit: 70cb7f0b22a7beb353a27576683ad824c4a4c007 Parents: a6b9cd5 Author: Sankar Hariappan <[email protected]> Authored: Wed Aug 30 17:22:04 2017 -0700 Committer: Thejas M Nair <[email protected]> Committed: Wed Aug 30 17:22:04 2017 -0700 ---------------------------------------------------------------------- .../listener/DummyRawStoreFailEvent.java | 7 + .../hadoop/hive/metastore/HiveMetaStore.java | 7 + .../hive/metastore/HiveMetaStoreClient.java | 7 + .../hadoop/hive/metastore/IMetaStoreClient.java | 11 + .../hadoop/hive/metastore/ObjectStore.java | 23 + .../apache/hadoop/hive/metastore/RawStore.java | 9 + .../hive/metastore/cache/CachedStore.java | 7 + .../hive/metastore/messaging/EventUtils.java | 13 + .../DummyRawStoreControlledCommit.java | 7 + .../DummyRawStoreForJdoConnection.java | 6 + ql/if/queryplan.thrift | 1 + ql/src/gen/thrift/gen-cpp/queryplan_types.cpp | 8 +- ql/src/gen/thrift/gen-cpp/queryplan_types.h | 3 +- .../hadoop/hive/ql/plan/api/StageType.java | 5 +- ql/src/gen/thrift/gen-php/Types.php | 2 + ql/src/gen/thrift/gen-py/queryplan/ttypes.py | 3 + ql/src/gen/thrift/gen-rb/queryplan_types.rb | 5 +- .../apache/hadoop/hive/ql/exec/TaskFactory.java | 3 + .../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 39 +- .../hive/ql/exec/repl/ReplStateLogTask.java | 51 + .../hive/ql/exec/repl/ReplStateLogWork.java | 100 + .../ql/exec/repl/bootstrap/ReplLoadTask.java | 53 +- .../ql/exec/repl/bootstrap/ReplLoadWork.java | 3 +- .../filesystem/BootstrapEventsIterator.java | 56 +- .../filesystem/DatabaseEventsIterator.java | 4 + .../exec/repl/bootstrap/load/LoadFunction.java | 18 +- .../exec/repl/bootstrap/load/TaskTracker.java | 13 +- .../bootstrap/load/table/LoadPartitions.java | 65 +- .../repl/bootstrap/load/table/LoadTable.java | 45 +- .../ql/parse/ReplicationSemanticAnalyzer.java | 70 +- .../hadoop/hive/ql/parse/ReplicationSpec.java | 14 +- .../hadoop/hive/ql/parse/repl/ReplLogger.java | 36 + .../hadoop/hive/ql/parse/repl/ReplState.java | 57 + .../hadoop/hive/ql/parse/repl/dump/Utils.java | 17 +- .../repl/dump/log/BootstrapDumpLogger.java | 71 + .../repl/dump/log/IncrementalDumpLogger.java | 55 + .../repl/dump/log/state/BootstrapDumpBegin.java | 47 + .../repl/dump/log/state/BootstrapDumpEnd.java | 59 + .../dump/log/state/BootstrapDumpFunction.java | 44 + .../repl/dump/log/state/BootstrapDumpTable.java | 52 + .../dump/log/state/IncrementalDumpBegin.java | 43 + .../repl/dump/log/state/IncrementalDumpEnd.java | 54 + .../dump/log/state/IncrementalDumpEvent.java | 51 + .../repl/load/log/BootstrapLoadLogger.java | 66 + .../repl/load/log/IncrementalLoadLogger.java | 55 + .../repl/load/log/state/BootstrapLoadBegin.java | 51 + .../repl/load/log/state/BootstrapLoadEnd.java | 59 + .../load/log/state/BootstrapLoadFunction.java | 44 + .../repl/load/log/state/BootstrapLoadTable.java | 52 + .../load/log/state/IncrementalLoadBegin.java | 47 + .../repl/load/log/state/IncrementalLoadEnd.java | 54 + .../load/log/state/IncrementalLoadEvent.java | 51 + .../load/message/CreateFunctionHandler.java | 8 + .../parse/repl/load/message/MessageHandler.java | 9 +- .../hadoop/hive/ql/plan/ImportTableDesc.java | 53 +- .../gen/thrift/gen-cpp/ThriftHiveMetastore.cpp | 2515 ++++++++++-------- .../gen/thrift/gen-cpp/ThriftHiveMetastore.h | 126 + .../ThriftHiveMetastore_server.skeleton.cpp | 5 + .../gen/thrift/gen-cpp/hive_metastore_types.cpp | 1037 +++++--- .../gen/thrift/gen-cpp/hive_metastore_types.h | 89 + .../api/NotificationEventsCountRequest.java | 488 ++++ .../api/NotificationEventsCountResponse.java | 387 +++ .../hive/metastore/api/ThriftHiveMetastore.java | 858 ++++++ .../gen-php/metastore/ThriftHiveMetastore.php | 216 ++ .../src/gen/thrift/gen-php/metastore/Types.php | 173 ++ .../hive_metastore/ThriftHiveMetastore-remote | 7 + .../hive_metastore/ThriftHiveMetastore.py | 185 ++ .../gen/thrift/gen-py/hive_metastore/ttypes.py | 149 ++ .../gen/thrift/gen-rb/hive_metastore_types.rb | 37 + .../gen/thrift/gen-rb/thrift_hive_metastore.rb | 54 + .../src/main/thrift/hive_metastore.thrift | 10 + 71 files changed, 6491 insertions(+), 1638 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java ---------------------------------------------------------------------- diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java index 5d7cfad..8d861e4 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java @@ -47,6 +47,8 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.NotificationEventRequest; import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; +import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest; +import org.apache.hadoop.hive.metastore.api.NotificationEventsCountResponse; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PartitionEventType; import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet; @@ -841,6 +843,11 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable { } @Override + public NotificationEventsCountResponse getNotificationEventsCount(NotificationEventsCountRequest rqst) { + return objectStore.getNotificationEventsCount(rqst); + } + + @Override public void flushCache() { objectStore.flushCache(); } http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 55def04..5812a1b 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -7026,6 +7026,13 @@ public class HiveMetaStore extends ThriftHiveMetastore { } @Override + public NotificationEventsCountResponse get_notification_events_count(NotificationEventsCountRequest rqst) + throws TException { + RawStore ms = getMS(); + return ms.getNotificationEventsCount(rqst); + } + + @Override public FireEventResponse fire_listener_event(FireEventRequest rqst) throws TException { switch (rqst.getData().getSetField()) { case INSERT_DATA: http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index 688f181..70451c4 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -2356,6 +2356,13 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable { return client.get_current_notificationEventId(); } + @InterfaceAudience.LimitedPrivate({"HCatalog"}) + @Override + public NotificationEventsCountResponse getNotificationEventsCount(NotificationEventsCountRequest rqst) + throws TException { + return client.get_notification_events_count(rqst); + } + @InterfaceAudience.LimitedPrivate({"Apache Hive, HCatalog"}) @Override public FireEventResponse fireListenerEvent(FireEventRequest rqst) throws TException { http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java index 813a283..69a845c 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java @@ -76,6 +76,8 @@ import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; import org.apache.hadoop.hive.metastore.api.NotNullConstraintsRequest; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; +import org.apache.hadoop.hive.metastore.api.NotificationEventsCountResponse; +import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest; import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PartitionEventType; @@ -1634,6 +1636,15 @@ public interface IMetaStoreClient { CurrentNotificationEventId getCurrentNotificationEventId() throws TException; /** + * Get the number of events from given eventID for the input database. + * @return number of events + * @throws TException + */ + @InterfaceAudience.LimitedPrivate({"HCatalog"}) + NotificationEventsCountResponse getNotificationEventsCount(NotificationEventsCountRequest rqst) + throws TException; + + /** * Request that the metastore fire an event. Currently this is only supported for DML * operations, since the metastore knows when DDL operations happen. * @param request http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java index efbdb8f..0db1bc0 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -98,6 +98,8 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.NotificationEventRequest; import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; +import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest; +import org.apache.hadoop.hive.metastore.api.NotificationEventsCountResponse; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PartitionEventType; @@ -8339,6 +8341,27 @@ public class ObjectStore implements RawStore, Configurable { } } + @Override + public NotificationEventsCountResponse getNotificationEventsCount(NotificationEventsCountRequest rqst) { + Long result = 0L; + boolean commited = false; + Query query = null; + try { + openTransaction(); + long fromEventId = rqst.getFromEventId(); + String inputDbName = rqst.getDbName(); + String queryStr = "select count(eventId) from " + MNotificationLog.class.getName() + + " where eventId > fromEventId && dbName == inputDbName"; + query = pm.newQuery(queryStr); + query.declareParameters("java.lang.Long fromEventId, java.lang.String inputDbName"); + result = (Long) query.execute(fromEventId, inputDbName); + commited = commitTransaction(); + return new NotificationEventsCountResponse(result.longValue()); + } finally { + rollbackAndCleanup(commited, query); + } + } + private MNotificationLog translateThriftToDb(NotificationEvent entry) { MNotificationLog dbEntry = new MNotificationLog(); dbEntry.setEventId(entry.getEventId()); http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java index a2ae4c5..71982a0 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/RawStore.java @@ -46,6 +46,8 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.NotificationEventRequest; import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; +import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest; +import org.apache.hadoop.hive.metastore.api.NotificationEventsCountResponse; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PartitionEventType; import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet; @@ -621,6 +623,13 @@ public interface RawStore extends Configurable { */ public CurrentNotificationEventId getCurrentNotificationEventId(); + /** + * Get the number of events corresponding to given database with fromEventId. + * This is intended for use by the repl commands to track the progress of incremental dump. + * @return + */ + public NotificationEventsCountResponse getNotificationEventsCount(NotificationEventsCountRequest rqst); + /* * Flush any catalog objects held by the metastore implementation. Note that this does not * flush statistics objects. This should be called at the beginning of each query. http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java index 697cc2e..93d1ba6 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java @@ -63,6 +63,8 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.NotificationEventRequest; import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; +import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest; +import org.apache.hadoop.hive.metastore.api.NotificationEventsCountResponse; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PartitionEventType; import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet; @@ -1803,6 +1805,11 @@ public class CachedStore implements RawStore, Configurable { } @Override + public NotificationEventsCountResponse getNotificationEventsCount(NotificationEventsCountRequest rqst) { + return rawStore.getNotificationEventsCount(rqst); + } + + @Override public void flushCache() { rawStore.flushCache(); } http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java index 8205c25..8f90c7a 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hive.metastore.messaging; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest; import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter; import org.apache.thrift.TException; @@ -34,6 +35,7 @@ public class EventUtils { public interface NotificationFetcher { int getBatchSize() throws IOException; long getCurrentNotificationEventId() throws IOException; + long getDbNotificationEventsCount(long fromEventId, String dbName) throws IOException; List<NotificationEvent> getNextNotificationEvents( long pos, IMetaStoreClient.NotificationFilter filter) throws IOException; } @@ -75,6 +77,17 @@ public class EventUtils { } @Override + public long getDbNotificationEventsCount(long fromEventId, String dbName) throws IOException { + try { + NotificationEventsCountRequest rqst + = new NotificationEventsCountRequest(fromEventId, dbName); + return msc.getNotificationEventsCount(rqst).getEventsCount(); + } catch (TException e) { + throw new IOException(e); + } + } + + @Override public List<NotificationEvent> getNextNotificationEvents( long pos, IMetaStoreClient.NotificationFilter filter) throws IOException { try { http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java index fdb2866..4db203d 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java @@ -44,6 +44,8 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.NotificationEventRequest; import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; +import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest; +import org.apache.hadoop.hive.metastore.api.NotificationEventsCountResponse; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PartitionEventType; import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet; @@ -793,6 +795,11 @@ public class DummyRawStoreControlledCommit implements RawStore, Configurable { } @Override + public NotificationEventsCountResponse getNotificationEventsCount(NotificationEventsCountRequest rqst) { + return objectStore.getNotificationEventsCount(rqst); + } + + @Override public void flushCache() { objectStore.flushCache(); } http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java index f422c4e..fb16cfc 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java @@ -45,6 +45,8 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.NotificationEventRequest; import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; +import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest; +import org.apache.hadoop.hive.metastore.api.NotificationEventsCountResponse; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PartitionEventType; import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet; @@ -809,6 +811,10 @@ public class DummyRawStoreForJdoConnection implements RawStore { return null; } + @Override + public NotificationEventsCountResponse getNotificationEventsCount(NotificationEventsCountRequest rqst) { + return null; + } public void flushCache() { http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/if/queryplan.thrift ---------------------------------------------------------------------- diff --git a/ql/if/queryplan.thrift b/ql/if/queryplan.thrift index 00b0200..aaf644a 100644 --- a/ql/if/queryplan.thrift +++ b/ql/if/queryplan.thrift @@ -102,6 +102,7 @@ enum StageType { COLUMNSTATS, REPL_DUMP, REPL_BOOTSTRAP_LOAD, + REPL_STATE_LOG } struct Stage { http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp ---------------------------------------------------------------------- diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp index f467da2..7262017 100644 --- a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp +++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp @@ -117,7 +117,8 @@ int _kStageTypeValues[] = { StageType::DEPENDENCY_COLLECTION, StageType::COLUMNSTATS, StageType::REPL_DUMP, - StageType::REPL_BOOTSTRAP_LOAD + StageType::REPL_BOOTSTRAP_LOAD, + StageType::REPL_STATE_LOG }; const char* _kStageTypeNames[] = { "CONDITIONAL", @@ -133,9 +134,10 @@ const char* _kStageTypeNames[] = { "DEPENDENCY_COLLECTION", "COLUMNSTATS", "REPL_DUMP", - "REPL_BOOTSTRAP_LOAD" + "REPL_BOOTSTRAP_LOAD", + "REPL_STATE_LOG" }; -const std::map<int, const char*> _StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(14, _kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); +const std::map<int, const char*> _StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(15, _kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); Adjacency::~Adjacency() throw() { http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/gen/thrift/gen-cpp/queryplan_types.h ---------------------------------------------------------------------- diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.h b/ql/src/gen/thrift/gen-cpp/queryplan_types.h index ac87ef7..18dc867 100644 --- a/ql/src/gen/thrift/gen-cpp/queryplan_types.h +++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.h @@ -95,7 +95,8 @@ struct StageType { DEPENDENCY_COLLECTION = 10, COLUMNSTATS = 11, REPL_DUMP = 12, - REPL_BOOTSTRAP_LOAD = 13 + REPL_BOOTSTRAP_LOAD = 13, + REPL_STATE_LOG = 14 }; }; http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java ---------------------------------------------------------------------- diff --git a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java index 11a8f6d..ed408d2 100644 --- a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java +++ b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java @@ -25,7 +25,8 @@ public enum StageType implements org.apache.thrift.TEnum { DEPENDENCY_COLLECTION(10), COLUMNSTATS(11), REPL_DUMP(12), - REPL_BOOTSTRAP_LOAD(13); + REPL_BOOTSTRAP_LOAD(13), + REPL_STATE_LOG(14); private final int value; @@ -74,6 +75,8 @@ public enum StageType implements org.apache.thrift.TEnum { return REPL_DUMP; case 13: return REPL_BOOTSTRAP_LOAD; + case 14: + return REPL_STATE_LOG; default: return null; } http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/gen/thrift/gen-php/Types.php ---------------------------------------------------------------------- diff --git a/ql/src/gen/thrift/gen-php/Types.php b/ql/src/gen/thrift/gen-php/Types.php index 68edfcd..bca2eee 100644 --- a/ql/src/gen/thrift/gen-php/Types.php +++ b/ql/src/gen/thrift/gen-php/Types.php @@ -116,6 +116,7 @@ final class StageType { const COLUMNSTATS = 11; const REPL_DUMP = 12; const REPL_BOOTSTRAP_LOAD = 13; + const REPL_STATE_LOG = 14; static public $__names = array( 0 => 'CONDITIONAL', 1 => 'COPY', @@ -131,6 +132,7 @@ final class StageType { 11 => 'COLUMNSTATS', 12 => 'REPL_DUMP', 13 => 'REPL_BOOTSTRAP_LOAD', + 14 => 'REPL_STATE_LOG', ); } http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py ---------------------------------------------------------------------- diff --git a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py index 6bf65af..1f0d627 100644 --- a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py +++ b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py @@ -162,6 +162,7 @@ class StageType: COLUMNSTATS = 11 REPL_DUMP = 12 REPL_BOOTSTRAP_LOAD = 13 + REPL_STATE_LOG = 14 _VALUES_TO_NAMES = { 0: "CONDITIONAL", @@ -178,6 +179,7 @@ class StageType: 11: "COLUMNSTATS", 12: "REPL_DUMP", 13: "REPL_BOOTSTRAP_LOAD", + 14: "REPL_STATE_LOG", } _NAMES_TO_VALUES = { @@ -195,6 +197,7 @@ class StageType: "COLUMNSTATS": 11, "REPL_DUMP": 12, "REPL_BOOTSTRAP_LOAD": 13, + "REPL_STATE_LOG": 14, } http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/gen/thrift/gen-rb/queryplan_types.rb ---------------------------------------------------------------------- diff --git a/ql/src/gen/thrift/gen-rb/queryplan_types.rb b/ql/src/gen/thrift/gen-rb/queryplan_types.rb index 2730dde..88d9c17 100644 --- a/ql/src/gen/thrift/gen-rb/queryplan_types.rb +++ b/ql/src/gen/thrift/gen-rb/queryplan_types.rb @@ -74,8 +74,9 @@ module StageType COLUMNSTATS = 11 REPL_DUMP = 12 REPL_BOOTSTRAP_LOAD = 13 - VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 => "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => "STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPL_DUMP", 13 => "REPL_BOOTSTRAP_LOAD"} - VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPL_DUMP, REPL_BOOTSTRAP_LOAD]).freeze + REPL_STATE_LOG = 14 + VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 => "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => "STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPL_DUMP", 13 => "REPL_BOOTSTRAP_LOAD", 14 => "REPL_STATE_LOG"} + VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPL_DUMP, REPL_BOOTSTRAP_LOAD, REPL_STATE_LOG]).freeze end class Adjacency http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java index 91ac4bf..fe9b624 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java @@ -27,6 +27,8 @@ import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpTask; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; +import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogTask; +import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadTask; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadWork; import org.apache.hadoop.hive.ql.exec.spark.SparkTask; @@ -114,6 +116,7 @@ public final class TaskFactory { taskvec.add(new TaskTuple<SparkWork>(SparkWork.class, SparkTask.class)); taskvec.add(new TaskTuple<>(ReplDumpWork.class, ReplDumpTask.class)); taskvec.add(new TaskTuple<>(ReplLoadWork.class, ReplLoadTask.class)); + taskvec.add(new TaskTuple<>(ReplStateLogWork.class, ReplStateLogTask.class)); } private static ThreadLocal<Integer> tid = new ThreadLocal<Integer>() { http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 67a67fd..d3af0ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.DumpType; +import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; import org.apache.hadoop.hive.ql.parse.repl.dump.HiveWrapper; import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport; import org.apache.hadoop.hive.ql.parse.repl.dump.Utils; @@ -49,6 +50,8 @@ import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandlerFactory; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer; import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter; import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; +import org.apache.hadoop.hive.ql.parse.repl.dump.log.BootstrapDumpLogger; +import org.apache.hadoop.hive.ql.parse.repl.dump.log.IncrementalDumpLogger; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.thrift.TException; import org.slf4j.Logger; @@ -64,7 +67,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { private static final String FUNCTION_METADATA_FILE_NAME = "_metadata"; private Logger LOG = LoggerFactory.getLogger(ReplDumpTask.class); - private Logger REPL_STATE_LOG = LoggerFactory.getLogger("ReplState"); + private ReplLogger replLogger; @Override public String getName() { @@ -127,8 +130,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { String dbName = (null != work.dbNameOrPattern && !work.dbNameOrPattern.isEmpty()) ? work.dbNameOrPattern : "?"; - REPL_STATE_LOG - .info("Repl Dump: Started Repl Dump for DB: {}, Dump Type: INCREMENTAL", dbName); + replLogger = new IncrementalDumpLogger(dbName, dumpRoot.toString(), + evFetcher.getDbNotificationEventsCount(work.eventFrom, dbName)); + replLogger.startLog(); while (evIter.hasNext()) { NotificationEvent ev = evIter.next(); lastReplId = ev.getEventId(); @@ -136,7 +140,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { dumpEvent(ev, evRoot, cmRoot); } - REPL_STATE_LOG.info("Repl Dump: Completed Repl Dump for DB: {}", dbName); + replLogger.endLog(lastReplId.toString()); LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), lastReplId); Utils.writeOutput( @@ -159,10 +163,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { conf, getNewEventOnlyReplicationSpec(ev.getEventId()) ); - EventHandlerFactory.handlerFor(ev).handle(context); - REPL_STATE_LOG.info( - "Repl Dump: Dumped event with ID: {}, Type: {} and dumped metadata and data to path {}", - String.valueOf(ev.getEventId()), ev.getEventType(), evRoot.toUri().toString()); + EventHandler eventHandler = EventHandlerFactory.handlerFor(ev); + eventHandler.handle(context); + replLogger.eventLog(String.valueOf(ev.getEventId()), eventHandler.dumpType().toString()); } private ReplicationSpec getNewEventOnlyReplicationSpec(Long eventId) throws SemanticException { @@ -174,12 +177,13 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { private Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws Exception { // bootstrap case Long bootDumpBeginReplId = getHive().getMSC().getCurrentNotificationEventId().getEventId(); + for (String dbName : Utils.matchesDb(getHive(), work.dbNameOrPattern)) { - REPL_STATE_LOG - .info("Repl Dump: Started analyzing Repl Dump for DB: {}, Dump Type: BOOTSTRAP", - dbName); LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName); - + replLogger = new BootstrapDumpLogger(dbName, dumpRoot.toString(), + Utils.getAllTables(getHive(), dbName).size(), + getHive().getAllFunctions().size()); + replLogger.startLog(); Path dbRoot = dumpDbMetadata(dbName, dumpRoot); dumpFunctionMetadata(dbName, dumpRoot); for (String tblName : Utils.matchesTbl(getHive(), dbName, work.tableNameOrPattern)) { @@ -187,6 +191,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { "analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri()); dumpTable(dbName, tblName, dbRoot); } + replLogger.endLog(bootDumpBeginReplId.toString()); } Long bootDumpEndReplId = getHive().getMSC().getCurrentNotificationEventId().getEventId(); LOG.info("Bootstrap object dump phase took from {} to {}", bootDumpBeginReplId, @@ -228,7 +233,6 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME); HiveWrapper.Tuple<Database> database = new HiveWrapper(getHive(), dbName).database(); EximUtil.createDbExportDump(fs, dumpPath, database.object, database.replicationSpec); - REPL_STATE_LOG.info("Repl Dump: Dumped DB metadata"); return dbRoot; } @@ -240,9 +244,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { new TableExport.Paths(work.astRepresentationForErrorMsg, dbRoot, tblName, conf); String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); new TableExport(exportPaths, ts, getNewReplicationSpec(), db, distCpDoAsUser, conf).write(); - REPL_STATE_LOG.info( - "Repl Dump: Analyzed dump for table/view: {}.{} and dumping metadata and data to path {}", - dbName, tblName, exportPaths.exportRootDir.toString()); + replLogger.tableLog(tblName, ts.tableHandle.getTableType()); } catch (InvalidTableException te) { // Bootstrap dump shouldn't fail if the table is dropped/renamed while dumping it. // Just log a debug message and skip it. @@ -295,7 +297,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { FunctionSerializer serializer = new FunctionSerializer(tuple.object, conf); serializer.writeTo(jsonWriter, tuple.replicationSpec); } - REPL_STATE_LOG.info("Repl Dump: Dumped metadata for function: {}", functionName); + replLogger.functionLog(functionName); } } @@ -303,8 +305,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { try { HiveWrapper.Tuple<Function> tuple = new HiveWrapper(getHive(), dbName).function(functionName); if (tuple.object.getResourceUris().isEmpty()) { - REPL_STATE_LOG.warn( - "Not replicating function: " + functionName + " as it seems to have been created " + LOG.warn("Not replicating function: " + functionName + " as it seems to have been created " + "without USING clause"); return null; } http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogTask.java new file mode 100644 index 0000000..614af54 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogTask.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl; + +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.plan.api.StageType; + +import java.io.Serializable; + +/** + * ReplStateLogTask. + * + * Exists for the sole purpose of reducing the number of dependency edges in the task graph. + **/ +public class ReplStateLogTask extends Task<ReplStateLogWork> implements Serializable { + + private static final long serialVersionUID = 1L; + + @Override + public int execute(DriverContext driverContext) { + work.replStateLog(); + return 0; + } + + @Override + public StageType getType() { + return StageType.REPL_STATE_LOG; + } + + @Override + public String getName() { + return "REPL_STATE_LOG"; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogWork.java new file mode 100644 index 0000000..6462fe5 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplStateLogWork.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.repl; + +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.plan.Explain; +import org.apache.hadoop.hive.ql.plan.Explain.Level; + +import java.io.Serializable; +import java.util.Map; + + +/** + * ReplStateLogWork + * + */ +@Explain(displayName = "Repl State Log", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) +public class ReplStateLogWork implements Serializable { + private static final long serialVersionUID = 1L; + private final ReplLogger replLogger; + private final LOG_TYPE logType; + private String eventId; + private String eventType; + private String tableName; + private TableType tableType; + private String functionName; + private String lastReplId; + + private enum LOG_TYPE { + TABLE, + FUNCTION, + EVENT, + END + } + + public ReplStateLogWork(ReplLogger replLogger, String eventId, String eventType) { + this.logType = LOG_TYPE.EVENT; + this.replLogger = replLogger; + this.eventId = eventId; + this.eventType = eventType; + } + + public ReplStateLogWork(ReplLogger replLogger, String tableName, TableType tableType) { + this.logType = LOG_TYPE.TABLE; + this.replLogger = replLogger; + this.tableName = tableName; + this.tableType = tableType; + } + + public ReplStateLogWork(ReplLogger replLogger, String functionName) { + this.logType = LOG_TYPE.FUNCTION; + this.replLogger = replLogger; + this.functionName = functionName; + } + + public ReplStateLogWork(ReplLogger replLogger, Map<String,String> dbProps) { + this.logType = LOG_TYPE.END; + this.replLogger = replLogger; + this.lastReplId = ReplicationSpec.getLastReplicatedStateFromParameters(dbProps); + } + + public void replStateLog() { + switch (logType) { + case TABLE: { + replLogger.tableLog(tableName, tableType); + break; + } + case FUNCTION: { + replLogger.functionLog(functionName); + break; + } + case EVENT: { + replLogger.eventLog(eventId, eventType); + break; + } + case END: { + replLogger.endLog(lastReplId); + break; + } + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java index 6ea1754..cd31b17 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java @@ -18,9 +18,12 @@ package org.apache.hadoop.hive.ql.exec.repl.bootstrap; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.FunctionEvent; @@ -35,6 +38,8 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.LoadTable; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.TableContext; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.plan.ImportTableDesc; import org.apache.hadoop.hive.ql.plan.api.StageType; import java.io.Serializable; @@ -107,7 +112,8 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { TableContext tableContext = new TableContext(dbTracker, work.dbNameToLoadIn, work.tableNameToLoadIn); TableEvent tableEvent = (TableEvent) next; - LoadTable loadTable = new LoadTable(tableEvent, context, tableContext, loadTaskTracker); + LoadTable loadTable = new LoadTable(tableEvent, context, iterator.replLogger(), + tableContext, loadTaskTracker); tableTracker = loadTable.tasks(); if (!scope.database) { scope.rootTasks.addAll(tableTracker.tasks()); @@ -124,8 +130,8 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { // for a table we explicitly try to load partitions as there is no separate partitions events. LoadPartitions loadPartitions = - new LoadPartitions(context, loadTaskTracker, tableEvent, work.dbNameToLoadIn, - tableContext); + new LoadPartitions(context, iterator.replLogger(), loadTaskTracker, tableEvent, + work.dbNameToLoadIn, tableContext); TaskTracker partitionsTracker = loadPartitions.tasks(); partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker, partitionsTracker); @@ -142,9 +148,8 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { TableContext tableContext = new TableContext(dbTracker, work.dbNameToLoadIn, work.tableNameToLoadIn); LoadPartitions loadPartitions = - new LoadPartitions(context, tableContext, loadTaskTracker, event.asTableEvent(), - work.dbNameToLoadIn, - event.lastPartitionReplicated()); + new LoadPartitions(context, iterator.replLogger(), tableContext, loadTaskTracker, + event.asTableEvent(), work.dbNameToLoadIn, event.lastPartitionReplicated()); /* the tableTracker here should be a new instance and not an existing one as this can only happen when we break in between loading partitions. @@ -156,8 +161,8 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { break; } case Function: { - LoadFunction loadFunction = - new LoadFunction(context, (FunctionEvent) next, work.dbNameToLoadIn, dbTracker); + LoadFunction loadFunction = new LoadFunction(context, iterator.replLogger(), + (FunctionEvent) next, work.dbNameToLoadIn, dbTracker); TaskTracker functionsTracker = loadFunction.tasks(); if (!scope.database) { scope.rootTasks.addAll(functionsTracker.tasks()); @@ -169,11 +174,16 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { break; } } + + if (!iterator.currentDbHasNext()) { + createEndReplLogTask(context, scope, iterator.replLogger()); + } } boolean addAnotherLoadTask = iterator.hasNext() || loadTaskTracker.hasReplicationState(); createBuilderTask(scope.rootTasks, addAnotherLoadTask); if (!iterator.hasNext()) { loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, context, scope)); + work.updateDbEventState(null); } this.childTasks = scope.rootTasks; LOG.info("Root Tasks / Total Tasks : {} / {} ", childTasks.size(), loadTaskTracker.numberOfTasks()); @@ -186,6 +196,19 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { return 0; } + private Task<? extends Serializable> createEndReplLogTask(Context context, Scope scope, + ReplLogger replLogger) throws SemanticException { + Database dbInMetadata = work.databaseEvent(context.hiveConf).dbInMetadata(work.dbNameToLoadIn); + ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, dbInMetadata.getParameters()); + Task<ReplStateLogWork> replLogTask = TaskFactory.get(replLogWork, conf); + if (null == scope.rootTasks) { + scope.rootTasks.add(replLogTask); + } else { + dependency(scope.rootTasks, replLogTask); + } + return replLogTask; + } + /** * There was a database update done before and we want to make sure we update the last repl * id on this database as we are now going to switch to processing a new database. @@ -243,18 +266,20 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable { /** * add the dependency to the leaf node */ - private boolean dependency(List<Task<? extends Serializable>> tasks, - Task<ReplLoadWork> loadTask) { + public static boolean dependency(List<Task<? extends Serializable>> tasks, Task<?> tailTask) { if (tasks == null || tasks.isEmpty()) { return true; } for (Task<? extends Serializable> task : tasks) { - boolean dependency = dependency(task.getChildTasks(), loadTask); - if (dependency) { - task.addDependentTask(loadTask); + if (task == tailTask) { + continue; + } + boolean leafNode = dependency(task.getChildTasks(), tailTask); + if (leafNode) { + task.addDependentTask(tailTask); } } - return true; + return false; } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java index eb18e5f..f51afe1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java @@ -38,7 +38,7 @@ public class ReplLoadWork implements Serializable { public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoadIn, String tableNameToLoadIn) throws IOException { this.tableNameToLoadIn = tableNameToLoadIn; - this.iterator = new BootstrapEventsIterator(dumpDirectory, hiveConf); + this.iterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, hiveConf); this.dbNameToLoadIn = dbNameToLoadIn; } @@ -61,7 +61,6 @@ public class ReplLoadWork implements Serializable { DatabaseEvent databaseEvent(HiveConf hiveConf) { DatabaseEvent databaseEvent = state.toEvent(hiveConf); - state = null; return databaseEvent; } http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java index 4e635ad..43a85f2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java @@ -17,13 +17,15 @@ */ package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.*; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent; import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer; +import org.apache.hadoop.hive.ql.parse.repl.load.log.BootstrapLoadLogger; +import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; import java.io.IOException; import java.util.Arrays; @@ -69,8 +71,13 @@ public class BootstrapEventsIterator implements Iterator<BootstrapEvent> { warehouse. */ private Iterator<DatabaseEventsIterator> dbEventsIterator; + private final String dumpDirectory; + private final String dbNameToLoadIn; + private final HiveConf hiveConf; + private ReplLogger replLogger; - public BootstrapEventsIterator(String dumpDirectory, HiveConf hiveConf) throws IOException { + public BootstrapEventsIterator(String dumpDirectory, String dbNameToLoadIn, HiveConf hiveConf) + throws IOException { Path path = new Path(dumpDirectory); FileSystem fileSystem = path.getFileSystem(hiveConf); FileStatus[] fileStatuses = @@ -93,6 +100,9 @@ public class BootstrapEventsIterator implements Iterator<BootstrapEvent> { } }).collect(Collectors.toList()).iterator(); + this.dumpDirectory = dumpDirectory; + this.dbNameToLoadIn = dbNameToLoadIn; + this.hiveConf = hiveConf; } @Override @@ -101,6 +111,7 @@ public class BootstrapEventsIterator implements Iterator<BootstrapEvent> { if (currentDatabaseIterator == null) { if (dbEventsIterator.hasNext()) { currentDatabaseIterator = dbEventsIterator.next(); + initReplLogger(); } else { return false; } @@ -127,7 +138,44 @@ public class BootstrapEventsIterator implements Iterator<BootstrapEvent> { throw new UnsupportedOperationException("This operation is not supported"); } + public boolean currentDbHasNext() { + return ((currentDatabaseIterator != null) && (currentDatabaseIterator.hasNext())); + } + public void setReplicationState(ReplicationState replicationState) { this.currentDatabaseIterator.replicationState = replicationState; } + + public ReplLogger replLogger() { + return replLogger; + } + + private void initReplLogger() { + try { + Path dbDumpPath = currentDatabaseIterator.dbLevelPath(); + FileSystem fs = dbDumpPath.getFileSystem(hiveConf); + + long numTables = getSubDirs(fs, dbDumpPath).length; + long numFunctions = 0; + Path funcPath = new Path(dbDumpPath, ReplicationSemanticAnalyzer.FUNCTIONS_ROOT_DIR_NAME); + if (fs.exists(funcPath)) { + numFunctions = getSubDirs(fs, funcPath).length; + } + String dbName = StringUtils.isBlank(dbNameToLoadIn) ? dbDumpPath.getName() : dbNameToLoadIn; + replLogger = new BootstrapLoadLogger(dbName, dumpDirectory, numTables, numFunctions); + replLogger.startLog(); + } catch (IOException e) { + // Ignore the exception + } + } + + FileStatus[] getSubDirs(FileSystem fs, Path dirPath) throws IOException { + return fs.listStatus(dirPath, new PathFilter() { + @Override + public boolean accept(Path p) { + String name = p.getName(); + return !name.startsWith("_") && !name.startsWith("."); + } + }); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java index 3100875..dc0e192 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java @@ -58,6 +58,10 @@ class DatabaseEventsIterator implements Iterator<BootstrapEvent> { remoteIterator = fileSystem.listFiles(dbLevelPath, true); } + public Path dbLevelPath() { + return this.dbLevelPath; + } + @Override public boolean hasNext() { try { http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java index e9b8711..8852a60 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/LoadFunction.java @@ -20,10 +20,14 @@ package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.FunctionEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadTask; import org.apache.hadoop.hive.ql.parse.repl.load.message.CreateFunctionHandler; import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler; import org.slf4j.Logger; @@ -39,18 +43,27 @@ import static org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.stripQuotes; public class LoadFunction { private static final Logger LOG = LoggerFactory.getLogger(LoadFunction.class); private Context context; + private ReplLogger replLogger; private final FunctionEvent event; private final String dbNameToLoadIn; private final TaskTracker tracker; - public LoadFunction(Context context, FunctionEvent event, String dbNameToLoadIn, - TaskTracker existingTracker) { + public LoadFunction(Context context, ReplLogger replLogger, FunctionEvent event, + String dbNameToLoadIn, TaskTracker existingTracker) { this.context = context; + this.replLogger = replLogger; this.event = event; this.dbNameToLoadIn = dbNameToLoadIn; this.tracker = new TaskTracker(existingTracker); } + private void createFunctionReplLogTask(List<Task<? extends Serializable>> functionTasks, + String functionName) { + ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, functionName); + Task<ReplStateLogWork> replLogTask = TaskFactory.get(replLogWork, context.hiveConf); + ReplLoadTask.dependency(functionTasks, replLogTask); + } + public TaskTracker tasks() throws IOException, SemanticException { URI fromURI = EximUtil .getValidatedURI(context.hiveConf, stripQuotes(event.rootDir().toUri().toString())); @@ -63,6 +76,7 @@ public class LoadFunction { dbNameToLoadIn, null, fromPath.toString(), null, null, context.hiveConf, context.hiveDb, null, LOG) ); + createFunctionReplLogTask(tasks, handler.getFunctionName()); tasks.forEach(tracker::addTask); return tracker; } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java index f246b8a..374a03b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/TaskTracker.java @@ -59,14 +59,21 @@ public class TaskTracker { */ public void addTask(Task<? extends Serializable> task) { tasks.add(task); - updateTaskCount(task); + + List <Task<? extends Serializable>> visited = new ArrayList<>(); + updateTaskCount(task, visited); } - private void updateTaskCount(Task<? extends Serializable> task) { + public void updateTaskCount(Task<? extends Serializable> task, + List <Task<? extends Serializable>> visited) { numberOfTasks += 1; + visited.add(task); if (task.getChildTasks() != null) { for (Task<? extends Serializable> childTask : task.getChildTasks()) { - updateTaskCount(childTask); + if (visited.contains(childTask)) { + continue; + } + updateTaskCount(childTask, visited); } } } http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index f088ba9..c944a13 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -26,8 +26,10 @@ import org.apache.hadoop.hive.ql.exec.ReplCopyTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadTask; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -36,6 +38,7 @@ import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.ImportTableDesc; @@ -46,10 +49,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; +import java.util.*; import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState.PartitionState; import static org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.isPartitioned; @@ -59,6 +59,7 @@ public class LoadPartitions { private static Logger LOG = LoggerFactory.getLogger(LoadPartitions.class); private final Context context; + private final ReplLogger replLogger; private final TableContext tableContext; private final TableEvent event; private final TaskTracker tracker; @@ -67,17 +68,19 @@ public class LoadPartitions { private final ImportTableDesc tableDesc; private Table table; - public LoadPartitions(Context context, TaskTracker tableTracker, TableEvent event, - String dbNameToLoadIn, TableContext tableContext) throws HiveException, IOException { - this(context, tableContext, tableTracker, event, dbNameToLoadIn, null); + public LoadPartitions(Context context, ReplLogger replLogger, TaskTracker tableTracker, + TableEvent event, String dbNameToLoadIn, + TableContext tableContext) throws HiveException, IOException { + this(context, replLogger, tableContext, tableTracker, event, dbNameToLoadIn, null); } - public LoadPartitions(Context context, TableContext tableContext, TaskTracker limiter, - TableEvent event, String dbNameToLoadIn, AddPartitionDesc lastReplicatedPartition) - throws HiveException, IOException { + public LoadPartitions(Context context, ReplLogger replLogger, TableContext tableContext, + TaskTracker limiter, TableEvent event, String dbNameToLoadIn, + AddPartitionDesc lastReplicatedPartition) throws HiveException, IOException { this.tracker = new TaskTracker(limiter); this.event = event; this.context = context; + this.replLogger = replLogger; this.lastReplicatedPartition = lastReplicatedPartition; this.tableContext = tableContext; @@ -98,6 +101,21 @@ public class LoadPartitions { } } + private void createTableReplLogTask() throws SemanticException { + ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger, + tableDesc.getTableName(), tableDesc.tableType()); + Task<ReplStateLogWork> replLogTask = TaskFactory.get(replLogWork, context.hiveConf); + + if (tracker.tasks().isEmpty()) { + tracker.addTask(replLogTask); + } else { + ReplLoadTask.dependency(tracker.tasks(), replLogTask); + + List<Task<? extends Serializable>> visited = new ArrayList<>(); + tracker.updateTaskCount(replLogTask, visited); + } + } + public TaskTracker tasks() throws SemanticException { try { /* @@ -113,7 +131,11 @@ public class LoadPartitions { table = new Table(tableDesc.getDatabaseName(), tableDesc.getTableName()); if (isPartitioned(tableDesc)) { updateReplicationState(initialReplicationState()); - return forNewTable(); + if (!forNewTable().hasReplicationState()) { + // Add ReplStateLogTask only if no pending table load tasks left for next cycle + createTableReplLogTask(); + } + return tracker; } } else { // existing @@ -122,7 +144,11 @@ public class LoadPartitions { List<AddPartitionDesc> partitionDescs = event.partitionDescriptions(tableDesc); if (!event.replicationSpec().isMetadataOnly() && !partitionDescs.isEmpty()) { updateReplicationState(initialReplicationState()); - return forExistingTable(lastReplicatedPartition); + if (!forExistingTable(lastReplicatedPartition).hasReplicationState()) { + // Add ReplStateLogTask only if no pending table load tasks left for next cycle + createTableReplLogTask(); + } + return tracker; } } } @@ -149,9 +175,11 @@ public class LoadPartitions { while (iterator.hasNext() && tracker.canAddMoreTasks()) { AddPartitionDesc addPartitionDesc = iterator.next(); tracker.addTask(addSinglePartition(table, addPartitionDesc)); - ReplicationState currentReplicationState = - new ReplicationState(new PartitionState(table.getTableName(), addPartitionDesc)); - updateReplicationState(currentReplicationState); + if (iterator.hasNext() && !tracker.canAddMoreTasks()) { + ReplicationState currentReplicationState = + new ReplicationState(new PartitionState(table.getTableName(), addPartitionDesc)); + updateReplicationState(currentReplicationState); + } } return tracker; } @@ -236,13 +264,16 @@ public class LoadPartitions { boolean encounteredTheLastReplicatedPartition = (lastPartitionReplicated == null); ReplicationSpec replicationSpec = event.replicationSpec(); LOG.debug("table partitioned"); - for (AddPartitionDesc addPartitionDesc : event.partitionDescriptions(tableDesc)) { + + Iterator<AddPartitionDesc> iterator = event.partitionDescriptions(tableDesc).iterator(); + while (iterator.hasNext()) { /* encounteredTheLastReplicatedPartition will be set, when we break creation of partition tasks for a table, as we have reached the limit of number of tasks we should create for execution. in this case on the next run we have to iterate over the partitions desc to reach the last replicated partition so that we can start replicating partitions after that. */ + AddPartitionDesc addPartitionDesc = iterator.next(); if (encounteredTheLastReplicatedPartition && tracker.canAddMoreTasks()) { Map<String, String> partSpec = addPartitionDesc.getPartition(0).getPartSpec(); Partition ptn; @@ -257,7 +288,7 @@ public class LoadPartitions { if (replicationSpec.allowReplacementInto(ptn.getParameters())) { if (replicationSpec.isMetadataOnly()) { tracker.addTask(alterSinglePartition(addPartitionDesc, replicationSpec, ptn)); - if (!tracker.canAddMoreTasks()) { + if (iterator.hasNext() && !tracker.canAddMoreTasks()) { tracker.setReplicationState( new ReplicationState(new PartitionState(table.getTableName(), addPartitionDesc) ) http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index cbb964a..a1187c4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.ql.ErrorMsg; @@ -26,6 +27,8 @@ import org.apache.hadoop.hive.ql.exec.ReplCopyTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadTask; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context; @@ -34,6 +37,7 @@ import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; import org.apache.hadoop.hive.ql.plan.ImportTableDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.MoveWork; @@ -42,7 +46,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.TreeMap; import static org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.isPartitioned; @@ -51,18 +57,36 @@ public class LoadTable { private final static Logger LOG = LoggerFactory.getLogger(LoadTable.class); // private final Helper helper; private final Context context; + private final ReplLogger replLogger; private final TableContext tableContext; private final TaskTracker tracker; private final TableEvent event; - public LoadTable(TableEvent event, Context context, TableContext tableContext, TaskTracker limiter) + public LoadTable(TableEvent event, Context context, ReplLogger replLogger, + TableContext tableContext, TaskTracker limiter) throws SemanticException, IOException { this.event = event; this.context = context; + this.replLogger = replLogger; this.tableContext = tableContext; this.tracker = new TaskTracker(limiter); } + private void createTableReplLogTask(String tableName, TableType tableType) throws SemanticException { + ReplStateLogWork replLogWork = new ReplStateLogWork(replLogger,tableName, tableType); + Task<ReplStateLogWork> replLogTask = TaskFactory.get(replLogWork, context.hiveConf); + ReplLoadTask.dependency(tracker.tasks(), replLogTask); + + if (tracker.tasks().isEmpty()) { + tracker.addTask(replLogTask); + } else { + ReplLoadTask.dependency(tracker.tasks(), replLogTask); + + List<Task<? extends Serializable>> visited = new ArrayList<>(); + tracker.updateTaskCount(replLogTask, visited); + } + } + public TaskTracker tasks() throws SemanticException { // Path being passed to us is a table dump location. We go ahead and load it in as needed. // If tblName is null, then we default to the table name specified in _metadata, which is good. @@ -123,22 +147,27 @@ public class LoadTable { behave like a noop or a pure MD alter. */ if (table == null) { - return newTableTasks(tableDesc); + newTableTasks(tableDesc); } else { - return existingTableTasks(tableDesc, table, replicationSpec); + existingTableTasks(tableDesc, table, replicationSpec); + } + + if (!isPartitioned(tableDesc)) { + createTableReplLogTask(tableDesc.getTableName(), tableDesc.tableType()); } + return tracker; } catch (Exception e) { throw new SemanticException(e); } } - private TaskTracker existingTableTasks(ImportTableDesc tblDesc, Table table, + private void existingTableTasks(ImportTableDesc tblDesc, Table table, ReplicationSpec replicationSpec) { if (!table.isPartitioned()) { LOG.debug("table non-partitioned"); if (!replicationSpec.allowReplacementInto(table.getParameters())) { - return tracker; // silently return, table is newer than our replacement. + return; // silently return, table is newer than our replacement. } Task<? extends Serializable> alterTableTask = alterTableTask(tblDesc, replicationSpec); @@ -151,10 +180,9 @@ public class LoadTable { tracker.addTask(alterTableTask); } } - return tracker; } - private TaskTracker newTableTasks(ImportTableDesc tblDesc) throws SemanticException { + private void newTableTasks(ImportTableDesc tblDesc) throws SemanticException { Table table; table = new Table(tblDesc.getDatabaseName(), tblDesc.getTableName()); // Either we're dropping and re-creating, or the table didn't exist, and we're creating. @@ -162,7 +190,7 @@ public class LoadTable { tblDesc.getCreateTableTask(new HashSet<>(), new HashSet<>(), context.hiveConf); if (event.replicationSpec().isMetadataOnly()) { tracker.addTask(createTableTask); - return tracker; + return; } if (!isPartitioned(tblDesc)) { LOG.debug("adding dependent CopyWork/MoveWork for table"); @@ -172,7 +200,6 @@ public class LoadTable { createTableTask.addDependentTask(loadTableTask); } tracker.addTask(createTableTask); - return tracker; } private String location(ImportTableDesc tblDesc, Database parentDb) http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 3e2c513..7794d3e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.parse; +import io.netty.util.internal.StringUtil; import org.antlr.runtime.tree.Tree; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileStatus; @@ -28,6 +29,7 @@ import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork; +import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadWork; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -37,13 +39,13 @@ import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData; import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator; import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker; import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler; +import org.apache.hadoop.hive.ql.parse.repl.load.log.IncrementalLoadLogger; +import org.apache.hadoop.hive.ql.parse.repl.ReplLogger; import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc; import org.apache.hadoop.hive.ql.plan.AlterTableDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; import org.apache.hadoop.hive.ql.plan.PlanUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.FileNotFoundException; import java.io.Serializable; @@ -76,7 +78,6 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { private static final String dumpSchema = "dump_dir,last_repl_id#string,string"; public static final String FUNCTIONS_ROOT_DIR_NAME = "_functions"; - private final static Logger REPL_STATE_LOG = LoggerFactory.getLogger("ReplState"); ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException { super(queryState); @@ -329,12 +330,9 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { Task<? extends Serializable> evTaskRoot = TaskFactory.get(new DependencyCollectionWork(), conf); Task<? extends Serializable> taskChainTail = evTaskRoot; - int evstage = 0; - int evIter = 0; + ReplLogger replLogger = new IncrementalLoadLogger(dbNameOrPattern, + loadPath.toString(), dirsInLoadPath.length); - REPL_STATE_LOG.info("Repl Load: Started analyzing Repl load for DB: {} from path {}, Dump Type: INCREMENTAL", - (null != dbNameOrPattern && !dbNameOrPattern.isEmpty()) ? dbNameOrPattern : "?", - loadPath.toUri().toString()); for (FileStatus dir : dirsInLoadPath){ LOG.debug("Loading event from {} to {}.{}", dir.getPath().toUri(), dbNameOrPattern, tblNameOrPattern); @@ -359,17 +357,16 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { String locn = dir.getPath().toUri().toString(); DumpMetaData eventDmd = new DumpMetaData(new Path(locn), conf); - List<Task<? extends Serializable>> evTasks = analyzeEventLoad( - dbNameOrPattern, tblNameOrPattern, locn, taskChainTail, eventDmd); - evIter++; - REPL_STATE_LOG.info("Repl Load: Analyzed load for event {}/{} " + - "with ID: {}, Type: {}, Path: {}", - evIter, dirsInLoadPath.length, - dir.getPath().getName(), eventDmd.getDumpType().toString(), locn); - - LOG.debug("evstage#{} got {} tasks", evstage, evTasks!=null ? evTasks.size() : 0); + MessageHandler.Context context = new MessageHandler.Context(dbNameOrPattern, + tblNameOrPattern, locn, taskChainTail, + eventDmd, conf, db, ctx, LOG); + List<Task<? extends Serializable>> evTasks = analyzeEventLoad(context); + if ((evTasks != null) && (!evTasks.isEmpty())){ - Task<? extends Serializable> barrierTask = TaskFactory.get(new DependencyCollectionWork(), conf); + ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, + dir.getPath().getName(), + eventDmd.getDumpType().toString()); + Task<? extends Serializable> barrierTask = TaskFactory.get(replStateLogWork, conf); for (Task<? extends Serializable> t : evTasks){ t.addDependentTask(barrierTask); LOG.debug("Added {}:{} as a precursor of barrier task {}:{}", @@ -378,14 +375,23 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { LOG.debug("Updated taskChainTail from {}{} to {}{}", taskChainTail.getClass(), taskChainTail.getId(), barrierTask.getClass(), barrierTask.getId()); taskChainTail = barrierTask; - evstage++; } } + + // If any event is there and db name is known, then dump the start and end logs + if (!evTaskRoot.equals(taskChainTail)) { + Map<String, String> dbProps = new HashMap<>(); + dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), String.valueOf(dmd.getEventTo())); + ReplStateLogWork replStateLogWork = new ReplStateLogWork(replLogger, dbProps); + Task<? extends Serializable> barrierTask = TaskFactory.get(replStateLogWork, conf); + taskChainTail.addDependentTask(barrierTask); + LOG.debug("Added {}:{} as a precursor of barrier task {}:{}", + taskChainTail.getClass(), taskChainTail.getId(), + barrierTask.getClass(), barrierTask.getId()); + + replLogger.startLog(); + } rootTasks.add(evTaskRoot); - REPL_STATE_LOG.info("Repl Load: Completed analyzing Repl load for DB: {} from path {} and created import " + - "(DDL/COPY/MOVE) tasks", - (null != dbNameOrPattern && !dbNameOrPattern.isEmpty()) ? dbNameOrPattern : "?", - loadPath.toUri().toString()); } } catch (Exception e) { @@ -395,26 +401,22 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { } private List<Task<? extends Serializable>> analyzeEventLoad( - String dbName, String tblName, - String location, - Task<? extends Serializable> precursor, - DumpMetaData dmd) + MessageHandler.Context context) throws SemanticException { - MessageHandler.Context context = - new MessageHandler.Context(dbName, tblName, location, precursor, dmd, conf, db, ctx, LOG); - MessageHandler messageHandler = dmd.getDumpType().handler(); + MessageHandler messageHandler = context.dmd.getDumpType().handler(); List<Task<? extends Serializable>> tasks = messageHandler.handle(context); - if (precursor != null) { + if (context.precursor != null) { for (Task<? extends Serializable> t : tasks) { - precursor.addDependentTask(t); + context.precursor.addDependentTask(t); LOG.debug("Added {}:{} as a precursor of {}:{}", - precursor.getClass(), precursor.getId(), t.getClass(), t.getId()); + context.precursor.getClass(), context.precursor.getId(), t.getClass(), t.getId()); } } + inputs.addAll(messageHandler.readEntities()); outputs.addAll(messageHandler.writeEntities()); - return addUpdateReplStateTasks(StringUtils.isEmpty(tblName), + return addUpdateReplStateTasks(StringUtils.isEmpty(context.tableName), messageHandler.getUpdatedMetadata(), tasks); } http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java index 1c54d29..235a44c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java @@ -207,13 +207,6 @@ public class ReplicationSpec { }; } - private static String getLastReplicatedStateFromParameters(Map<String, String> m) { - if ((m != null) && (m.containsKey(KEY.CURR_STATE_ID.toString()))){ - return m.get(KEY.CURR_STATE_ID.toString()); - } - return null; - } - private void init(ASTNode node){ // -> ^(TOK_REPLICATION $replId $isMetadataOnly) isInReplicationScope = true; @@ -225,6 +218,13 @@ public class ReplicationSpec { } } + public static String getLastReplicatedStateFromParameters(Map<String, String> m) { + if ((m != null) && (m.containsKey(KEY.CURR_STATE_ID.toString()))){ + return m.get(KEY.CURR_STATE_ID.toString()); + } + return null; + } + /** * @return true if this statement is being run for the purposes of replication */ http://git-wip-us.apache.org/repos/asf/hive/blob/70cb7f0b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/ReplLogger.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/ReplLogger.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/ReplLogger.java new file mode 100644 index 0000000..6f8d5e0 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/ReplLogger.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse.repl; + +import org.apache.hadoop.hive.metastore.TableType; + +public abstract class ReplLogger { + + public ReplLogger() { + } + + public abstract void startLog(); + public abstract void endLog(String lastReplId); + + public void tableLog(String tableName, TableType tableType) { + } + public void functionLog(String funcName){ + } + public void eventLog(String eventId, String eventType) { + } +}
