This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 0f19b1ed40fbeb5ed859051b12b844abb3e2e9f2
Author: Murtadha Hubail <[email protected]>
AuthorDate: Sat May 20 19:03:33 2023 +0300

    [ASTERIXDB-3188][*DB] Clear cached global resource ids on NCs
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    
    - Ensure any cached global resource ids on NCs are cleared after
      reporting max used resource id to CC.
    
    Change-Id: I3286915bcf313a88aaa465fa28ffeb56b3e9a0cd
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17541
    Reviewed-by: Ali Alsuliman <[email protected]>
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
---
 .../message/RegistrationTasksResponseMessage.java  |  3 ++
 .../asterix/common/utils/NcLocalCounters.java      |  8 +++++
 .../transaction/GlobalResourceIdFactory.java       | 39 +++++++++++++++++++---
 .../PersistentLocalResourceRepository.java         |  5 ++-
 .../storage/common/file/IResourceIdFactory.java    | 10 ++++--
 5 files changed, 57 insertions(+), 8 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
index f0f0470c20..bee5ff9c26 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
@@ -70,6 +70,9 @@ public class RegistrationTasksResponseMessage extends 
CcIdentifiedMessage
             }
             NcLocalCounters localCounter = success ? 
NcLocalCounters.collect(getCcId(),
                     (NodeControllerService) 
appCtx.getServiceContext().getControllerService()) : null;
+            if (localCounter != null) {
+                LOGGER.debug("returning local counters to cc: {}", 
localCounter);
+            }
             // wrap the returned partitions in a hash set to make it 
serializable
             Set<Integer> nodeActivePartitions = new 
HashSet<>(appCtx.getReplicaManager().getPartitions());
             NCLifecycleTaskReportMessage result =
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java
index 8956b93043..ea4e7ad6b1 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java
@@ -25,6 +25,7 @@ import 
org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
 import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.hyracks.storage.common.file.IResourceIdFactory;
 
 public class NcLocalCounters implements Serializable {
     private static final long serialVersionUID = 3798954558299915995L;
@@ -41,6 +42,7 @@ public class NcLocalCounters implements Serializable {
 
     public static NcLocalCounters collect(CcId ccId, NodeControllerService 
ncs) throws HyracksDataException {
         final INcApplicationContext appContext = (INcApplicationContext) 
ncs.getApplicationContext();
+        resetGlobalCounters(ncs, appContext);
         long maxResourceId = 
Math.max(appContext.getLocalResourceRepository().maxId(),
                 
MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
         long maxTxnId = appContext.getMaxTxnId();
@@ -65,4 +67,10 @@ public class NcLocalCounters implements Serializable {
         return "NcLocalCounters{" + "maxResourceId=" + maxResourceId + ", 
maxTxnId=" + maxTxnId + ", maxJobId="
                 + maxJobId + '}';
     }
+
+    private static void resetGlobalCounters(NodeControllerService ncs, 
INcApplicationContext appContext) {
+        IResourceIdFactory resourceIdFactory =
+                
appContext.getStorageComponentProvider().getStorageManager().getResourceIdFactory(ncs.getContext());
+        resourceIdFactory.reset();
+    }
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
index 2bd4f81765..908663fe63 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
@@ -41,26 +41,45 @@ import it.unimi.dsi.fastutil.longs.LongPriorityQueues;
 public class GlobalResourceIdFactory implements IResourceIdFactory {
 
     private static final Logger LOGGER = LogManager.getLogger();
-    private static final int RESOURCE_ID_BLOCK_SIZE = 25;
+    private static final int RESOURCE_ID_INITIAL_BLOCK_SIZE = 24;
+    private static final int MAX_BLOCK_SIZE = 35;
     private final INCServiceContext serviceCtx;
     private final LongPriorityQueue resourceIds =
-            LongPriorityQueues.synchronize(new 
LongArrayFIFOQueue(RESOURCE_ID_BLOCK_SIZE));
+            LongPriorityQueues.synchronize(new 
LongArrayFIFOQueue(RESOURCE_ID_INITIAL_BLOCK_SIZE));
     private final LinkedBlockingQueue<ResourceIdRequestResponseMessage> 
resourceIdResponseQ;
     private final String nodeId;
+    private volatile boolean reset = false;
+    private int currentBlockSize;
 
     public GlobalResourceIdFactory(INCServiceContext serviceCtx) {
         this.serviceCtx = serviceCtx;
         this.resourceIdResponseQ = new LinkedBlockingQueue<>();
         this.nodeId = serviceCtx.getNodeId();
+        this.currentBlockSize = RESOURCE_ID_INITIAL_BLOCK_SIZE;
     }
 
-    public void addNewIds(ResourceIdRequestResponseMessage resourceIdResponse) 
throws InterruptedException {
+    public synchronized void addNewIds(ResourceIdRequestResponseMessage 
resourceIdResponse)
+            throws InterruptedException {
         LOGGER.debug("rec'd block of ids: {}", resourceIdResponse);
+        // to ensure any block that was requested before a reset call isn't 
processed, we will ignore blocks where their
+        // block size doesn't match the current block size
+        if (resourceIdResponse.getBlockSize() != currentBlockSize) {
+            LOGGER.debug("dropping outdated block size of resource ids: {}, 
current block size: {}", resourceIdResponse,
+                    currentBlockSize);
+            return;
+        }
         resourceIdResponseQ.put(resourceIdResponse);
     }
 
     @Override
     public long createId() throws HyracksDataException {
+        synchronized (resourceIds) {
+            if (reset) {
+                resourceIds.clear();
+                resourceIdResponseQ.clear();
+                reset = false;
+            }
+        }
         try {
             final long resourceId = resourceIds.dequeueLong();
             if (resourceIds.isEmpty()) {
@@ -97,9 +116,19 @@ public class GlobalResourceIdFactory implements 
IResourceIdFactory {
         }
     }
 
-    protected void requestNewBlock() throws Exception {
+    @Override
+    public synchronized void reset() {
+        reset = true;
+        currentBlockSize += 1;
+        if (currentBlockSize > MAX_BLOCK_SIZE) {
+            currentBlockSize = RESOURCE_ID_INITIAL_BLOCK_SIZE;
+        }
+        LOGGER.debug("current resource ids block size: {}", currentBlockSize);
+    }
+
+    protected synchronized void requestNewBlock() throws Exception {
         // queue is empty; request a new block
-        ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId, 
RESOURCE_ID_BLOCK_SIZE);
+        ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId, 
currentBlockSize);
         ((INCMessageBroker) 
serviceCtx.getMessageBroker()).sendMessageToPrimaryCC(msg);
     }
 }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index bb3cde5944..f9bf175d93 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -253,7 +253,10 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
                 for (File file : files) {
                     final LocalResource localResource = 
readLocalResource(file);
                     if (filter.test(localResource)) {
-                        resourcesMap.put(localResource.getId(), localResource);
+                        LocalResource duplicate = 
resourcesMap.putIfAbsent(localResource.getId(), localResource);
+                        if (duplicate != null) {
+                            LOGGER.warn("found duplicate resource ids {} and 
{}", localResource, duplicate);
+                        }
                     }
                 }
             } catch (IOException e) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/IResourceIdFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/IResourceIdFactory.java
index 9f675401cc..5e38b16456 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/IResourceIdFactory.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/IResourceIdFactory.java
@@ -20,12 +20,18 @@ package org.apache.hyracks.storage.common.file;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-@FunctionalInterface
 public interface IResourceIdFactory {
 
     /**
      * @return A unique id
-     * @throws Exception
+     * @throws HyracksDataException
      */
     long createId() throws HyracksDataException;
+
+    /**
+     * Resets this factory to the last value used
+     */
+    default void reset() {
+        // no op
+    }
 }

Reply via email to