This is an automated email from the ASF dual-hosted git repository.
mahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 54a0ab9bdb9 HIVE-26225: Delete operations in
ObjectStore.cleanWriteNotificationEvents should be performed in different
transactions (Haymant Mangla, reviewed by Mahesh Kumar Behera)
54a0ab9bdb9 is described below
commit 54a0ab9bdb9dc2da1a55c595ff0916d5c79d7cd4
Author: Haymant Mangla <[email protected]>
AuthorDate: Wed May 18 07:18:00 2022 +0530
HIVE-26225: Delete operations in ObjectStore.cleanWriteNotificationEvents
should be performed in different transactions (Haymant Mangla, reviewed by
Mahesh Kumar Behera)
---
.../apache/hadoop/hive/metastore/ObjectStore.java | 116 +++++++--------------
.../org/apache/hadoop/hive/metastore/RawStore.java | 2 +-
2 files changed, 40 insertions(+), 78 deletions(-)
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 9b5ef82fdc2..83afdce22ea 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -24,7 +24,6 @@ import static
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCa
import static
org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;
import java.io.IOException;
-import java.lang.reflect.Constructor;
import java.net.InetAddress;
import java.net.URI;
import java.nio.ByteBuffer;
@@ -37,7 +36,6 @@ import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
-import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -56,7 +54,6 @@ import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import javax.jdo.JDODataStoreException;
@@ -11542,54 +11539,7 @@ public class ObjectStore implements RawStore,
Configurable {
@Override
public void cleanWriteNotificationEvents(int olderThan) {
- boolean commited = false;
- Query query = null;
- try {
- openTransaction();
- long tmp = System.currentTimeMillis() / 1000 - olderThan;
- int tooOld = (tmp > Integer.MAX_VALUE) ? 0 : (int) tmp;
- query = pm.newQuery(MTxnWriteNotificationLog.class, "eventTime <
tooOld");
- query.declareParameters("java.lang.Integer tooOld");
-
- int max_events = MetastoreConf.getIntVar(conf,
MetastoreConf.ConfVars.EVENT_CLEAN_MAX_EVENTS);
- max_events = max_events > 0 ? max_events : Integer.MAX_VALUE;
- query.setRange(0, max_events);
- query.setOrdering("txnId ascending");
-
- List<MTxnWriteNotificationLog> toBeRemoved = (List)
query.execute(tooOld);
- int iteration = 0;
- int eventCount = 0;
- long minTxnId = 0;
- long minEventTime = 0;
- long maxTxnId = 0;
- long maxEventTime = 0;
- while (CollectionUtils.isNotEmpty(toBeRemoved)) {
- int listSize = toBeRemoved.size();
- if (iteration == 0) {
- MTxnWriteNotificationLog firstNotification = toBeRemoved.get(0);
- minTxnId = firstNotification.getTxnId();
- minEventTime = firstNotification.getEventTime();
- }
- MTxnWriteNotificationLog lastNotification = toBeRemoved.get(listSize -
1);
- maxTxnId = lastNotification.getTxnId();
- maxEventTime = lastNotification.getEventTime();
-
- pm.deletePersistentAll(toBeRemoved);
- eventCount += listSize;
- iteration++;
- toBeRemoved = (List) query.execute(tooOld);
- }
- if (iteration == 0) {
- LOG.info("No WriteNotification events found to be cleaned with
eventTime < {}.", tooOld);
- } else {
- LOG.info("WriteNotification Cleaned {} events with eventTime < {} in
{} iteration, " +
- "minimum txnId {} (with eventTime {}) and maximum txnId {} (with
eventTime {})",
- eventCount, tooOld, iteration, minTxnId, minEventTime, maxTxnId,
maxEventTime);
- }
- commited = commitTransaction();
- } finally {
- rollbackAndCleanup(commited, query);
- }
+ cleanOlderEvents(olderThan, MTxnWriteNotificationLog.class,
"TxnWriteNotificationLog");
}
@Override
@@ -11756,6 +11706,10 @@ public class ObjectStore implements RawStore,
Configurable {
@Override
public void cleanNotificationEvents(int olderThan) {
+ cleanOlderEvents(olderThan, MNotificationLog.class, "NotificationLog");
+ }
+
+ private void cleanOlderEvents(int olderThan, Class table, String tableName) {
final int eventBatchSize = MetastoreConf.getIntVar(conf,
MetastoreConf.ConfVars.EVENT_CLEAN_MAX_EVENTS);
final long ageSec = olderThan;
@@ -11766,62 +11720,70 @@ public class ObjectStore implements RawStore,
Configurable {
final Optional<Integer> batchSize = (eventBatchSize > 0) ?
Optional.of(eventBatchSize) : Optional.empty();
final long start = System.nanoTime();
- int deleteCount = doCleanNotificationEvents(tooOld, batchSize);
+ int deleteCount = doCleanNotificationEvents(tooOld, batchSize, table,
tableName);
if (deleteCount == 0) {
- LOG.info("No Notification events found to be cleaned with eventTime <
{}", tooOld);
+ LOG.info("No {} events found to be cleaned with eventTime < {}",
tableName, tooOld);
} else {
int batchCount = 0;
do {
- batchCount = doCleanNotificationEvents(tooOld, batchSize);
+ batchCount = doCleanNotificationEvents(tooOld, batchSize, table,
tableName);
deleteCount += batchCount;
} while (batchCount > 0);
}
final long finish = System.nanoTime();
- LOG.info("Deleted {} notification events older than epoch:{} in {}ms",
deleteCount, tooOld,
- TimeUnit.NANOSECONDS.toMillis(finish - start));
+ LOG.info("Deleted {} {} events older than epoch:{} in {}ms", deleteCount,
tableName, tooOld,
+ TimeUnit.NANOSECONDS.toMillis(finish - start));
}
- private int doCleanNotificationEvents(final int ageSec, final
Optional<Integer> batchSize) {
+ private <T> int doCleanNotificationEvents(final int ageSec, final
Optional<Integer> batchSize, Class<T> tableClass, String tableName) {
final Transaction tx = pm.currentTransaction();
int eventsCount = 0;
try {
+ String key = null;
tx.begin();
- try (Query query = pm.newQuery(MNotificationLog.class, "eventTime <=
tooOld")) {
+ try (Query query = pm.newQuery(tableClass, "eventTime <= tooOld")) {
query.declareParameters("java.lang.Integer tooOld");
- query.setOrdering("eventId ascending");
+ if (MNotificationLog.class.equals(tableClass)) {
+ key = "eventId";
+ } else if (MTxnWriteNotificationLog.class.equals(tableClass)) {
+ key = "txnId";
+ }
+ query.setOrdering(key + " ascending");
if (batchSize.isPresent()) {
query.setRange(0, batchSize.get());
}
- List<MNotificationLog> events = (List) query.execute(ageSec);
+ List<T> events = (List) query.execute(ageSec);
if (CollectionUtils.isNotEmpty(events)) {
eventsCount = events.size();
-
if (LOG.isDebugEnabled()) {
int minEventTime, maxEventTime;
- long minEventId, maxEventId;
- Iterator<MNotificationLog> iter = events.iterator();
- MNotificationLog firstNotification = iter.next();
-
- minEventTime = maxEventTime = firstNotification.getEventTime();
- minEventId = maxEventId = firstNotification.getEventId();
-
- while (iter.hasNext()) {
- MNotificationLog notification = iter.next();
- minEventTime = Math.min(minEventTime,
notification.getEventTime());
- maxEventTime = Math.max(maxEventTime,
notification.getEventTime());
- minEventId = Math.min(minEventId, notification.getEventId());
- maxEventId = Math.max(maxEventId, notification.getEventId());
+ long minId, maxId;
+ T firstNotification = events.get(0);
+ T lastNotification = events.get(eventsCount - 1);
+ if (MNotificationLog.class.equals(tableClass)) {
+ minEventTime =
((MNotificationLog)firstNotification).getEventTime();
+ minId = ((MNotificationLog)firstNotification).getEventId();
+ maxEventTime =
((MNotificationLog)lastNotification).getEventTime();
+ maxId = ((MNotificationLog)lastNotification).getEventId();
+ } else if (MTxnWriteNotificationLog.class.equals(tableClass)) {
+ minEventTime =
((MTxnWriteNotificationLog)firstNotification).getEventTime();
+ minId = ((MTxnWriteNotificationLog)firstNotification).getTxnId();
+ maxEventTime =
((MTxnWriteNotificationLog)lastNotification).getEventTime();
+ maxId = ((MTxnWriteNotificationLog)lastNotification).getTxnId();
+ } else {
+ throw new RuntimeException("Cleaning of older " + tableName + "
events failed. " +
+ "Reason: Unknown table encountered " +
tableClass.getName());
}
LOG.debug(
- "Remove notification batch of {} events with eventTime < {},
min eventId {}, max eventId {}, min eventTime {}, max eventTime {}",
- eventsCount, ageSec, minEventId, maxEventId, minEventTime,
maxEventTime);
+ "Remove {} batch of {} events with eventTime < {}, min {}:
{}, max {}: {}, min eventTime {}, max eventTime {}",
+ tableName, eventsCount, ageSec, key, minId, key, maxId,
minEventTime, maxEventTime);
}
pm.deletePersistentAll(events);
@@ -11830,7 +11792,7 @@ public class ObjectStore implements RawStore,
Configurable {
tx.commit();
} catch (Exception e) {
- LOG.error("Unable to delete batch of notification events", e);
+ LOG.error("Unable to delete batch of " + tableName + " events", e);
eventsCount = 0;
} finally {
if (tx.isActive()) {
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
index fac5b836e2b..9ec66c48bbf 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
@@ -2090,7 +2090,7 @@ public interface RawStore extends Configurable {
/**
* Remove older notification events.
- * @param olderThan Remove any events older than a given number of seconds
+ * @param olderThan Remove any events older or equal to a given number of
seconds
*/
void cleanWriteNotificationEvents(int olderThan);