This is an automated email from the ASF dual-hosted git repository.

mahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 54a0ab9bdb9 HIVE-26225: Delete operations in 
ObjectStore.cleanWriteNotificationEvents should be performed in different 
transactions (Haymant Mangla, reviewed by Mahesh Kumar Behera)
54a0ab9bdb9 is described below

commit 54a0ab9bdb9dc2da1a55c595ff0916d5c79d7cd4
Author: Haymant Mangla <[email protected]>
AuthorDate: Wed May 18 07:18:00 2022 +0530

    HIVE-26225: Delete operations in ObjectStore.cleanWriteNotificationEvents 
should be performed in different transactions (Haymant Mangla, reviewed by 
Mahesh Kumar Behera)
---
 .../apache/hadoop/hive/metastore/ObjectStore.java  | 116 +++++++--------------
 .../org/apache/hadoop/hive/metastore/RawStore.java |   2 +-
 2 files changed, 40 insertions(+), 78 deletions(-)

diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 9b5ef82fdc2..83afdce22ea 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -24,7 +24,6 @@ import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCa
 import static 
org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;
 
 import java.io.IOException;
-import java.lang.reflect.Constructor;
 import java.net.InetAddress;
 import java.net.URI;
 import java.nio.ByteBuffer;
@@ -37,7 +36,6 @@ import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.time.ZonedDateTime;
 import java.time.format.DateTimeFormatter;
-import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -56,7 +54,6 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 
 import javax.jdo.JDODataStoreException;
@@ -11542,54 +11539,7 @@ public class ObjectStore implements RawStore, 
Configurable {
 
   @Override
   public void cleanWriteNotificationEvents(int olderThan) {
-    boolean commited = false;
-    Query query = null;
-    try {
-      openTransaction();
-      long tmp = System.currentTimeMillis() / 1000 - olderThan;
-      int tooOld = (tmp > Integer.MAX_VALUE) ? 0 : (int) tmp;
-      query = pm.newQuery(MTxnWriteNotificationLog.class, "eventTime < 
tooOld");
-      query.declareParameters("java.lang.Integer tooOld");
-
-      int max_events = MetastoreConf.getIntVar(conf, 
MetastoreConf.ConfVars.EVENT_CLEAN_MAX_EVENTS);
-      max_events = max_events > 0 ? max_events : Integer.MAX_VALUE;
-      query.setRange(0, max_events);
-      query.setOrdering("txnId ascending");
-
-      List<MTxnWriteNotificationLog> toBeRemoved = (List) 
query.execute(tooOld);
-      int iteration = 0;
-      int eventCount = 0;
-      long minTxnId = 0;
-      long minEventTime = 0;
-      long maxTxnId = 0;
-      long maxEventTime = 0;
-      while (CollectionUtils.isNotEmpty(toBeRemoved)) {
-        int listSize = toBeRemoved.size();
-        if (iteration == 0) {
-          MTxnWriteNotificationLog firstNotification = toBeRemoved.get(0);
-          minTxnId = firstNotification.getTxnId();
-          minEventTime = firstNotification.getEventTime();
-        }
-        MTxnWriteNotificationLog lastNotification = toBeRemoved.get(listSize - 
1);
-        maxTxnId = lastNotification.getTxnId();
-        maxEventTime = lastNotification.getEventTime();
-
-        pm.deletePersistentAll(toBeRemoved);
-        eventCount += listSize;
-        iteration++;
-        toBeRemoved = (List) query.execute(tooOld);
-      }
-      if (iteration == 0) {
-        LOG.info("No WriteNotification events found to be cleaned with 
eventTime < {}.", tooOld);
-      } else {
-        LOG.info("WriteNotification Cleaned {} events with eventTime < {} in 
{} iteration, " +
-            "minimum txnId {} (with eventTime {}) and maximum txnId {} (with 
eventTime {})",
-            eventCount, tooOld, iteration, minTxnId, minEventTime, maxTxnId, 
maxEventTime);
-      }
-      commited = commitTransaction();
-    } finally {
-      rollbackAndCleanup(commited, query);
-    }
+    cleanOlderEvents(olderThan, MTxnWriteNotificationLog.class, 
"TxnWriteNotificationLog");
   }
 
   @Override
@@ -11756,6 +11706,10 @@ public class ObjectStore implements RawStore, 
Configurable {
 
   @Override
   public void cleanNotificationEvents(int olderThan) {
+    cleanOlderEvents(olderThan, MNotificationLog.class, "NotificationLog");
+  }
+
+  private void cleanOlderEvents(int olderThan, Class table, String tableName) {
     final int eventBatchSize = MetastoreConf.getIntVar(conf, 
MetastoreConf.ConfVars.EVENT_CLEAN_MAX_EVENTS);
 
     final long ageSec = olderThan;
@@ -11766,62 +11720,70 @@ public class ObjectStore implements RawStore, 
Configurable {
     final Optional<Integer> batchSize = (eventBatchSize > 0) ? 
Optional.of(eventBatchSize) : Optional.empty();
 
     final long start = System.nanoTime();
-    int deleteCount = doCleanNotificationEvents(tooOld, batchSize);
+    int deleteCount = doCleanNotificationEvents(tooOld, batchSize, table, 
tableName);
 
     if (deleteCount == 0) {
-      LOG.info("No Notification events found to be cleaned with eventTime < 
{}", tooOld);
+      LOG.info("No {} events found to be cleaned with eventTime < {}", 
tableName, tooOld);
     } else {
       int batchCount = 0;
       do {
-        batchCount = doCleanNotificationEvents(tooOld, batchSize);
+        batchCount = doCleanNotificationEvents(tooOld, batchSize, table, 
tableName);
         deleteCount += batchCount;
       } while (batchCount > 0);
     }
 
     final long finish = System.nanoTime();
 
-    LOG.info("Deleted {} notification events older than epoch:{} in {}ms", 
deleteCount, tooOld,
-        TimeUnit.NANOSECONDS.toMillis(finish - start));
+    LOG.info("Deleted {} {} events older than epoch:{} in {}ms", deleteCount, 
tableName, tooOld,
+            TimeUnit.NANOSECONDS.toMillis(finish - start));
   }
 
-  private int doCleanNotificationEvents(final int ageSec, final 
Optional<Integer> batchSize) {
+  private <T> int doCleanNotificationEvents(final int ageSec, final 
Optional<Integer> batchSize, Class<T> tableClass, String tableName) {
     final Transaction tx = pm.currentTransaction();
     int eventsCount = 0;
 
     try {
+      String key = null;
       tx.begin();
 
-      try (Query query = pm.newQuery(MNotificationLog.class, "eventTime <= 
tooOld")) {
+      try (Query query = pm.newQuery(tableClass, "eventTime <= tooOld")) {
         query.declareParameters("java.lang.Integer tooOld");
-        query.setOrdering("eventId ascending");
+        if (MNotificationLog.class.equals(tableClass)) {
+          key = "eventId";
+        } else if (MTxnWriteNotificationLog.class.equals(tableClass)) {
+          key = "txnId";
+        }
+        query.setOrdering(key + " ascending");
         if (batchSize.isPresent()) {
           query.setRange(0, batchSize.get());
         }
 
-        List<MNotificationLog> events = (List) query.execute(ageSec);
+        List<T> events = (List) query.execute(ageSec);
         if (CollectionUtils.isNotEmpty(events)) {
           eventsCount = events.size();
-
           if (LOG.isDebugEnabled()) {
             int minEventTime, maxEventTime;
-            long minEventId, maxEventId;
-            Iterator<MNotificationLog> iter = events.iterator();
-            MNotificationLog firstNotification = iter.next();
-
-            minEventTime = maxEventTime = firstNotification.getEventTime();
-            minEventId = maxEventId = firstNotification.getEventId();
-
-            while (iter.hasNext()) {
-              MNotificationLog notification = iter.next();
-              minEventTime = Math.min(minEventTime, 
notification.getEventTime());
-              maxEventTime = Math.max(maxEventTime, 
notification.getEventTime());
-              minEventId = Math.min(minEventId, notification.getEventId());
-              maxEventId = Math.max(maxEventId, notification.getEventId());
+            long minId, maxId;
+            T firstNotification = events.get(0);
+            T lastNotification = events.get(eventsCount - 1);
+            if (MNotificationLog.class.equals(tableClass)) {
+              minEventTime = 
((MNotificationLog)firstNotification).getEventTime();
+              minId = ((MNotificationLog)firstNotification).getEventId();
+              maxEventTime = 
((MNotificationLog)lastNotification).getEventTime();
+              maxId = ((MNotificationLog)lastNotification).getEventId();
+            } else if (MTxnWriteNotificationLog.class.equals(tableClass)) {
+              minEventTime = 
((MTxnWriteNotificationLog)firstNotification).getEventTime();
+              minId = ((MTxnWriteNotificationLog)firstNotification).getTxnId();
+              maxEventTime = 
((MTxnWriteNotificationLog)lastNotification).getEventTime();
+              maxId = ((MTxnWriteNotificationLog)lastNotification).getTxnId();
+            } else {
+              throw new RuntimeException("Cleaning of older " + tableName + " 
events failed. " +
+                      "Reason: Unknown table encountered " + 
tableClass.getName());
             }
 
             LOG.debug(
-                "Remove notification batch of {} events with eventTime < {}, 
min eventId {}, max eventId {}, min eventTime {}, max eventTime {}",
-                eventsCount, ageSec, minEventId, maxEventId, minEventTime, 
maxEventTime);
+                    "Remove {} batch of {} events with eventTime < {}, min {}: 
{}, max {}: {}, min eventTime {}, max eventTime {}",
+                    tableName, eventsCount, ageSec, key, minId, key, maxId, 
minEventTime, maxEventTime);
           }
 
           pm.deletePersistentAll(events);
@@ -11830,7 +11792,7 @@ public class ObjectStore implements RawStore, 
Configurable {
 
       tx.commit();
     } catch (Exception e) {
-      LOG.error("Unable to delete batch of notification events", e);
+      LOG.error("Unable to delete batch of " + tableName + " events", e);
       eventsCount = 0;
     } finally {
       if (tx.isActive()) {
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
index fac5b836e2b..9ec66c48bbf 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
@@ -2090,7 +2090,7 @@ public interface RawStore extends Configurable {
 
   /**
    * Remove older notification events.
-   * @param olderThan Remove any events older than a given number of seconds
+   * @param olderThan Remove any events older or equal to a given number of 
seconds
    */
   void cleanWriteNotificationEvents(int olderThan);
 

Reply via email to