This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 0f19b1ed40fbeb5ed859051b12b844abb3e2e9f2 Author: Murtadha Hubail <[email protected]> AuthorDate: Sat May 20 19:03:33 2023 +0300 [ASTERIXDB-3188][*DB] Clear cached global resource ids on NCs - user model changes: no - storage format changes: no - interface changes: yes Details: - Ensure any cached global resource ids on NCs are cleared after reporting max used resource id to CC. Change-Id: I3286915bcf313a88aaa465fa28ffeb56b3e9a0cd Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17541 Reviewed-by: Ali Alsuliman <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> --- .../message/RegistrationTasksResponseMessage.java | 3 ++ .../asterix/common/utils/NcLocalCounters.java | 8 +++++ .../transaction/GlobalResourceIdFactory.java | 39 +++++++++++++++++++--- .../PersistentLocalResourceRepository.java | 5 ++- .../storage/common/file/IResourceIdFactory.java | 10 ++++-- 5 files changed, 57 insertions(+), 8 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java index f0f0470c20..bee5ff9c26 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java @@ -70,6 +70,9 @@ public class RegistrationTasksResponseMessage extends CcIdentifiedMessage } NcLocalCounters localCounter = success ? NcLocalCounters.collect(getCcId(), (NodeControllerService) appCtx.getServiceContext().getControllerService()) : null; + if (localCounter != null) { + LOGGER.debug("returning local counters to cc: {}", localCounter); + } // wrap the returned partitions in a hash set to make it serializable Set<Integer> nodeActivePartitions = new HashSet<>(appCtx.getReplicaManager().getPartitions()); NCLifecycleTaskReportMessage result = diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java index 8956b93043..ea4e7ad6b1 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/NcLocalCounters.java @@ -25,6 +25,7 @@ import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties; import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.control.nc.NodeControllerService; +import org.apache.hyracks.storage.common.file.IResourceIdFactory; public class NcLocalCounters implements Serializable { private static final long serialVersionUID = 3798954558299915995L; @@ -41,6 +42,7 @@ public class NcLocalCounters implements Serializable { public static NcLocalCounters collect(CcId ccId, NodeControllerService ncs) throws HyracksDataException { final INcApplicationContext appContext = (INcApplicationContext) ncs.getApplicationContext(); + resetGlobalCounters(ncs, appContext); long maxResourceId = Math.max(appContext.getLocalResourceRepository().maxId(), MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID); long maxTxnId = appContext.getMaxTxnId(); @@ -65,4 +67,10 @@ public class NcLocalCounters implements Serializable { return "NcLocalCounters{" + "maxResourceId=" + maxResourceId + ", maxTxnId=" + maxTxnId + ", maxJobId=" + maxJobId + '}'; } + + private static void resetGlobalCounters(NodeControllerService ncs, INcApplicationContext appContext) { + IResourceIdFactory resourceIdFactory = + appContext.getStorageComponentProvider().getStorageManager().getResourceIdFactory(ncs.getContext()); + resourceIdFactory.reset(); + } } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java index 2bd4f81765..908663fe63 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java @@ -41,26 +41,45 @@ import it.unimi.dsi.fastutil.longs.LongPriorityQueues; public class GlobalResourceIdFactory implements IResourceIdFactory { private static final Logger LOGGER = LogManager.getLogger(); - private static final int RESOURCE_ID_BLOCK_SIZE = 25; + private static final int RESOURCE_ID_INITIAL_BLOCK_SIZE = 24; + private static final int MAX_BLOCK_SIZE = 35; private final INCServiceContext serviceCtx; private final LongPriorityQueue resourceIds = - LongPriorityQueues.synchronize(new LongArrayFIFOQueue(RESOURCE_ID_BLOCK_SIZE)); + LongPriorityQueues.synchronize(new LongArrayFIFOQueue(RESOURCE_ID_INITIAL_BLOCK_SIZE)); private final LinkedBlockingQueue<ResourceIdRequestResponseMessage> resourceIdResponseQ; private final String nodeId; + private volatile boolean reset = false; + private int currentBlockSize; public GlobalResourceIdFactory(INCServiceContext serviceCtx) { this.serviceCtx = serviceCtx; this.resourceIdResponseQ = new LinkedBlockingQueue<>(); this.nodeId = serviceCtx.getNodeId(); + this.currentBlockSize = RESOURCE_ID_INITIAL_BLOCK_SIZE; } - public void addNewIds(ResourceIdRequestResponseMessage resourceIdResponse) throws InterruptedException { + public synchronized void addNewIds(ResourceIdRequestResponseMessage resourceIdResponse) + throws InterruptedException { LOGGER.debug("rec'd block of ids: {}", resourceIdResponse); + // to ensure any block that was requested before a reset call isn't processed, we will ignore blocks where their + // block size doesn't match the current block size + if (resourceIdResponse.getBlockSize() != currentBlockSize) { + LOGGER.debug("dropping outdated block size of resource ids: {}, current block size: {}", resourceIdResponse, + currentBlockSize); + return; + } resourceIdResponseQ.put(resourceIdResponse); } @Override public long createId() throws HyracksDataException { + synchronized (resourceIds) { + if (reset) { + resourceIds.clear(); + resourceIdResponseQ.clear(); + reset = false; + } + } try { final long resourceId = resourceIds.dequeueLong(); if (resourceIds.isEmpty()) { @@ -97,9 +116,19 @@ public class GlobalResourceIdFactory implements IResourceIdFactory { } } - protected void requestNewBlock() throws Exception { + @Override + public synchronized void reset() { + reset = true; + currentBlockSize += 1; + if (currentBlockSize > MAX_BLOCK_SIZE) { + currentBlockSize = RESOURCE_ID_INITIAL_BLOCK_SIZE; + } + LOGGER.debug("current resource ids block size: {}", currentBlockSize); + } + + protected synchronized void requestNewBlock() throws Exception { // queue is empty; request a new block - ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId, RESOURCE_ID_BLOCK_SIZE); + ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId, currentBlockSize); ((INCMessageBroker) serviceCtx.getMessageBroker()).sendMessageToPrimaryCC(msg); } } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java index bb3cde5944..f9bf175d93 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java @@ -253,7 +253,10 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito for (File file : files) { final LocalResource localResource = readLocalResource(file); if (filter.test(localResource)) { - resourcesMap.put(localResource.getId(), localResource); + LocalResource duplicate = resourcesMap.putIfAbsent(localResource.getId(), localResource); + if (duplicate != null) { + LOGGER.warn("found duplicate resource ids {} and {}", localResource, duplicate); + } } } } catch (IOException e) { diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/IResourceIdFactory.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/IResourceIdFactory.java index 9f675401cc..5e38b16456 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/IResourceIdFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/IResourceIdFactory.java @@ -20,12 +20,18 @@ package org.apache.hyracks.storage.common.file; import org.apache.hyracks.api.exceptions.HyracksDataException; -@FunctionalInterface public interface IResourceIdFactory { /** * @return A unique id - * @throws Exception + * @throws HyracksDataException */ long createId() throws HyracksDataException; + + /** + * Resets this factory to the last value used + */ + default void reset() { + // no op + } }
