This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit abd5e20bd815e91a131b31f7c81921f3dd6c12fd Author: Michael Blow <[email protected]> AuthorDate: Mon Feb 22 21:19:33 2021 -0500 [NO ISSUE][*DB][RT] Obtain ResourceIds in blocks, to reduce roundtrips between NC and CC Change-Id: I22840013b1f03255dfb487217bbbb75db420c42d Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10125 Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> --- .../common/transactions/IResourceIdManager.java | 9 ++++ .../runtime/message/ResourceIdRequestMessage.java | 10 ++-- .../message/ResourceIdRequestResponseMessage.java | 16 +++++- .../transaction/GlobalResourceIdFactory.java | 61 ++++++++++++++++------ .../runtime/transaction/ResourceIdManager.java | 12 ++++- 5 files changed, 84 insertions(+), 24 deletions(-) diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java index d36d383..1301dea 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java @@ -20,10 +20,19 @@ package org.apache.asterix.common.transactions; public interface IResourceIdManager { + /** + * @return the created resource id, or <code>-1</code> if a resource cannot be created + */ long createResourceId(); boolean reported(String nodeId); void report(String nodeId, long maxResourceId); + /** + * @param blockSize the size of resource id block to create + * @return the starting id of contiguous block of resource ids, or <code>-1</code> if + * the resource block cannot be created + */ + long createResourceIdBlock(int blockSize); } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java index fbfca55..6198acc 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java @@ -26,11 +26,13 @@ import org.apache.asterix.common.transactions.IResourceIdManager; import org.apache.hyracks.api.exceptions.HyracksDataException; public class ResourceIdRequestMessage implements ICcAddressedMessage { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 2L; private final String src; + private final int blockSize; - public ResourceIdRequestMessage(String src) { + public ResourceIdRequestMessage(String src, int blockSize) { this.src = src; + this.blockSize = blockSize; } @Override @@ -40,11 +42,11 @@ public class ResourceIdRequestMessage implements ICcAddressedMessage { ResourceIdRequestResponseMessage response = new ResourceIdRequestResponseMessage(); IClusterStateManager clusterStateManager = appCtx.getClusterStateManager(); IResourceIdManager resourceIdManager = appCtx.getResourceIdManager(); - response.setResourceId(resourceIdManager.createResourceId()); + response.setResourceIdBlock(resourceIdManager.createResourceIdBlock(blockSize), blockSize); if (response.getResourceId() < 0) { if (!(clusterStateManager.isClusterActive())) { response.setException( - new Exception("Cannot generate global resource id when cluster is not active.")); + new Exception("Cannot generate global resource id(s) when cluster is not active.")); } else { response.setException(new Exception("One or more nodes has not reported max resource id.")); } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java index 6a9ed35..05e6b12 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java @@ -24,19 +24,30 @@ import org.apache.asterix.runtime.transaction.GlobalResourceIdFactory; import org.apache.hyracks.api.exceptions.HyracksDataException; public class ResourceIdRequestResponseMessage implements INcAddressedMessage { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 2L; private long resourceId; + private int blockSize = 1; + private Exception exception; public long getResourceId() { return resourceId; } + public int getBlockSize() { + return blockSize; + } + public void setResourceId(long resourceId) { this.resourceId = resourceId; } + public void setResourceIdBlock(long resourceId, int blockSize) { + this.resourceId = resourceId; + this.blockSize = blockSize; + } + public Exception getException() { return exception; } @@ -52,6 +63,7 @@ public class ResourceIdRequestResponseMessage implements INcAddressedMessage { @Override public String toString() { - return ResourceIdRequestResponseMessage.class.getSimpleName(); + return "ResourceIdRequestResponseMessage{" + "resourceId=" + resourceId + ", blockSize=" + blockSize + + ", exception=" + exception + '}'; } } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java index 78b1f17..2bd4f81 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.runtime.transaction; +import java.util.NoSuchElementException; import java.util.concurrent.LinkedBlockingQueue; import org.apache.asterix.common.messaging.api.INCMessageBroker; @@ -26,6 +27,12 @@ import org.apache.asterix.runtime.message.ResourceIdRequestResponseMessage; import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.common.file.IResourceIdFactory; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import it.unimi.dsi.fastutil.longs.LongArrayFIFOQueue; +import it.unimi.dsi.fastutil.longs.LongPriorityQueue; +import it.unimi.dsi.fastutil.longs.LongPriorityQueues; /** * A resource id factory that generates unique resource ids across all NCs by requesting @@ -33,7 +40,11 @@ import org.apache.hyracks.storage.common.file.IResourceIdFactory; */ public class GlobalResourceIdFactory implements IResourceIdFactory { + private static final Logger LOGGER = LogManager.getLogger(); + private static final int RESOURCE_ID_BLOCK_SIZE = 25; private final INCServiceContext serviceCtx; + private final LongPriorityQueue resourceIds = + LongPriorityQueues.synchronize(new LongArrayFIFOQueue(RESOURCE_ID_BLOCK_SIZE)); private final LinkedBlockingQueue<ResourceIdRequestResponseMessage> resourceIdResponseQ; private final String nodeId; @@ -44,33 +55,51 @@ public class GlobalResourceIdFactory implements IResourceIdFactory { } public void addNewIds(ResourceIdRequestResponseMessage resourceIdResponse) throws InterruptedException { + LOGGER.debug("rec'd block of ids: {}", resourceIdResponse); resourceIdResponseQ.put(resourceIdResponse); } @Override public long createId() throws HyracksDataException { try { - ResourceIdRequestResponseMessage reponse = null; - //if there already exists a response, use it - if (!resourceIdResponseQ.isEmpty()) { - synchronized (resourceIdResponseQ) { - if (!resourceIdResponseQ.isEmpty()) { - reponse = resourceIdResponseQ.take(); + final long resourceId = resourceIds.dequeueLong(); + if (resourceIds.isEmpty()) { + serviceCtx.getControllerService().getExecutor().submit(() -> { + try { + requestNewBlock(); + } catch (Exception e) { + LOGGER.warn("failed on preemptive block request", e); } - } + }); + } + return resourceId; + } catch (NoSuchElementException e) { + // fallthrough + } + try { + // if there already exists a response, use it + ResourceIdRequestResponseMessage response = resourceIdResponseQ.poll(); + if (response == null) { + requestNewBlock(); + response = resourceIdResponseQ.take(); } - //if no response available or it has an exception, request a new one - if (reponse == null || reponse.getException() != null) { - ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId); - ((INCMessageBroker) serviceCtx.getMessageBroker()).sendMessageToPrimaryCC(msg); - reponse = resourceIdResponseQ.take(); - if (reponse.getException() != null) { - throw HyracksDataException.create(reponse.getException()); - } + if (response.getException() != null) { + throw HyracksDataException.create(response.getException()); } - return reponse.getResourceId(); + // take the first id, queue the rest + final long startingId = response.getResourceId(); + for (int i = 1; i < response.getBlockSize(); i++) { + resourceIds.enqueue(startingId + i); + } + return startingId; } catch (Exception e) { throw HyracksDataException.create(e); } } + + protected void requestNewBlock() throws Exception { + // queue is empty; request a new block + ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId, RESOURCE_ID_BLOCK_SIZE); + ((INCMessageBroker) serviceCtx.getMessageBroker()).sendMessageToPrimaryCC(msg); + } } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java index 5bcd5aa..8b4fd68 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java @@ -37,8 +37,11 @@ public class ResourceIdManager implements IResourceIdManager { @Override public long createResourceId() { - return csm.isClusterActive() || reportedNodes.containsAll(csm.getParticipantNodes(true)) - ? globalResourceId.incrementAndGet() : -1; + return readyState() ? globalResourceId.incrementAndGet() : -1; + } + + protected boolean readyState() { + return csm.isClusterActive() || reportedNodes.containsAll(csm.getParticipantNodes(true)); } @Override @@ -51,4 +54,9 @@ public class ResourceIdManager implements IResourceIdManager { globalResourceId.updateAndGet(prev -> Math.max(maxResourceId, prev)); reportedNodes.add(nodeId); } + + @Override + public long createResourceIdBlock(int blockSize) { + return readyState() ? globalResourceId.getAndAdd(blockSize) + 1 : -1; + } }
