This is an automated email from the ASF dual-hosted git repository.
prasanthj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 3a8edc0 HIVE-22290: ObjectStore.cleanWriteNotificationEvents and
ObjectStore.cleanupEvents OutOfMemory on large number of pending events (#1484)
3a8edc0 is described below
commit 3a8edc02f542d1dc7c6f715e4b7f11e30bf65c83
Author: Naresh P R <[email protected]>
AuthorDate: Tue Sep 15 20:05:37 2020 -0700
HIVE-22290: ObjectStore.cleanWriteNotificationEvents and
ObjectStore.cleanupEvents OutOfMemory on large number of pending events (#1484)
---
.../apache/hadoop/hive/metastore/ObjectStore.java | 81 ++++++++++++++++------
1 file changed, 60 insertions(+), 21 deletions(-)
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index f866b94..f12ce84 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -10426,9 +10426,41 @@ public class ObjectStore implements RawStore,
Configurable {
int tooOld = (tmp > Integer.MAX_VALUE) ? 0 : (int) tmp;
query = pm.newQuery(MTxnWriteNotificationLog.class, "eventTime <
tooOld");
query.declareParameters("java.lang.Integer tooOld");
- Collection<MTxnWriteNotificationLog> toBeRemoved = (Collection)
query.execute(tooOld);
- if (CollectionUtils.isNotEmpty(toBeRemoved)) {
+
+ int max_events = MetastoreConf.getIntVar(conf,
MetastoreConf.ConfVars.EVENT_CLEAN_MAX_EVENTS);
+ max_events = max_events > 0 ? max_events : Integer.MAX_VALUE;
+ query.setRange(0, max_events);
+ query.setOrdering("txnId ascending");
+
+ List<MTxnWriteNotificationLog> toBeRemoved = (List)
query.execute(tooOld);
+ int iteration = 0;
+ int eventCount = 0;
+ long minTxnId = 0;
+ long minEventTime = 0;
+ long maxTxnId = 0;
+ long maxEventTime = 0;
+ while (CollectionUtils.isNotEmpty(toBeRemoved)) {
+ int listSize = toBeRemoved.size();
+ if (iteration == 0) {
+ MTxnWriteNotificationLog firstNotification = toBeRemoved.get(0);
+ minTxnId = firstNotification.getTxnId();
+ minEventTime = firstNotification.getEventTime();
+ }
+ MTxnWriteNotificationLog lastNotification = toBeRemoved.get(listSize -
1);
+ maxTxnId = lastNotification.getTxnId();
+ maxEventTime = lastNotification.getEventTime();
+
pm.deletePersistentAll(toBeRemoved);
+ eventCount += listSize;
+ iteration++;
+ toBeRemoved = (List) query.execute(tooOld);
+ }
+ if (iteration == 0) {
+ LOG.info("No WriteNotification events found to be cleaned with
eventTime < {}.", tooOld);
+ } else {
+ LOG.info("WriteNotification Cleaned {} events with eventTime < {} in
{} iteration, " +
+ "minimum txnId {} (with eventTime {}) and maximum txnId {} (with
eventTime {})",
+ eventCount, tooOld, iteration, minTxnId, minEventTime, maxTxnId,
maxEventTime);
}
commited = commitTransaction();
} finally {
@@ -10617,26 +10649,33 @@ public class ObjectStore implements RawStore,
Configurable {
query.setOrdering("eventId ascending");
List<MNotificationLog> toBeRemoved = (List) query.execute(tooOld);
- if (toBeRemoved == null || toBeRemoved.size() == 0) {
- LOG.info("No events found to be cleaned with eventTime < {}.", tooOld);
- } else {
- NotificationEvent firstEvent = translateDbToThrift(toBeRemoved.get(0));
- long minEventId = firstEvent.getEventId();
- long minEventTime = firstEvent.getEventTime();
- long maxEventId = minEventId;
- long maxEventTime = minEventTime;
- if (toBeRemoved.size() > 1) {
- NotificationEvent lastEvent =
- translateDbToThrift(toBeRemoved.get(toBeRemoved.size() - 1));
- maxEventId = lastEvent.getEventId();
- maxEventTime = lastEvent.getEventTime();
- }
- LOG.info("Cleaned {} events with eventTime < {}, minimum eventId {}
(with eventTime {}) " +
- "and maximum eventId {} (with eventTime {})",
- toBeRemoved.size(), tooOld, minEventId, minEventTime,
maxEventId, maxEventTime);
- }
- if (CollectionUtils.isNotEmpty(toBeRemoved)) {
+ int iteration = 0;
+ int eventCount = 0;
+ long minEventId = 0;
+ long minEventTime = 0;
+ long maxEventId = 0;
+ long maxEventTime = 0;
+ while (CollectionUtils.isNotEmpty(toBeRemoved)) {
+ int listSize = toBeRemoved.size();
+ if (iteration == 0) {
+ MNotificationLog firstNotification = toBeRemoved.get(0);
+ minEventId = firstNotification.getEventId();
+ minEventTime = firstNotification.getEventTime();
+ }
+ MNotificationLog lastNotification = toBeRemoved.get(listSize - 1);
+ maxEventId = lastNotification.getEventId();
+ maxEventTime = lastNotification.getEventTime();
pm.deletePersistentAll(toBeRemoved);
+ eventCount += listSize;
+ iteration++;
+ toBeRemoved = (List) query.execute(tooOld);
+ }
+ if (iteration == 0) {
+ LOG.info("No Notification events found to be cleaned with eventTime <
{}.", tooOld);
+ } else {
+ LOG.info("Notification Cleaned {} events with eventTime < {} in {}
iteration, " +
+ "minimum eventId {} (with eventTime {}) and maximum eventId {}
(with eventTime {})",
+ eventCount, tooOld, iteration, minEventId, minEventTime,
maxEventId, maxEventTime);
}
commited = commitTransaction();
} finally {