This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit abd5e20bd815e91a131b31f7c81921f3dd6c12fd
Author: Michael Blow <[email protected]>
AuthorDate: Mon Feb 22 21:19:33 2021 -0500

    [NO ISSUE][*DB][RT] Obtain ResourceIds in blocks, to reduce roundtrips 
between NC and CC
    
    Change-Id: I22840013b1f03255dfb487217bbbb75db420c42d
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/10125
    Tested-by: Jenkins <[email protected]>
    Integration-Tests: Jenkins <[email protected]>
    Reviewed-by: Michael Blow <[email protected]>
    Reviewed-by: Ali Alsuliman <[email protected]>
---
 .../common/transactions/IResourceIdManager.java    |  9 ++++
 .../runtime/message/ResourceIdRequestMessage.java  | 10 ++--
 .../message/ResourceIdRequestResponseMessage.java  | 16 +++++-
 .../transaction/GlobalResourceIdFactory.java       | 61 ++++++++++++++++------
 .../runtime/transaction/ResourceIdManager.java     | 12 ++++-
 5 files changed, 84 insertions(+), 24 deletions(-)

diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java
index d36d383..1301dea 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IResourceIdManager.java
@@ -20,10 +20,19 @@ package org.apache.asterix.common.transactions;
 
 public interface IResourceIdManager {
 
+    /**
+     * @return the created resource id, or <code>-1</code> if a resource 
cannot be created
+     */
     long createResourceId();
 
     boolean reported(String nodeId);
 
     void report(String nodeId, long maxResourceId);
 
+    /**
+     * @param blockSize the size of resource id block to create
+     * @return the starting id of contiguous block of resource ids, or 
<code>-1</code> if
+     *         the resource block cannot be created
+     */
+    long createResourceIdBlock(int blockSize);
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
index fbfca55..6198acc 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java
@@ -26,11 +26,13 @@ import 
org.apache.asterix.common.transactions.IResourceIdManager;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class ResourceIdRequestMessage implements ICcAddressedMessage {
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
     private final String src;
+    private final int blockSize;
 
-    public ResourceIdRequestMessage(String src) {
+    public ResourceIdRequestMessage(String src, int blockSize) {
         this.src = src;
+        this.blockSize = blockSize;
     }
 
     @Override
@@ -40,11 +42,11 @@ public class ResourceIdRequestMessage implements 
ICcAddressedMessage {
             ResourceIdRequestResponseMessage response = new 
ResourceIdRequestResponseMessage();
             IClusterStateManager clusterStateManager = 
appCtx.getClusterStateManager();
             IResourceIdManager resourceIdManager = 
appCtx.getResourceIdManager();
-            response.setResourceId(resourceIdManager.createResourceId());
+            
response.setResourceIdBlock(resourceIdManager.createResourceIdBlock(blockSize), 
blockSize);
             if (response.getResourceId() < 0) {
                 if (!(clusterStateManager.isClusterActive())) {
                     response.setException(
-                            new Exception("Cannot generate global resource id 
when cluster is not active."));
+                            new Exception("Cannot generate global resource 
id(s) when cluster is not active."));
                 } else {
                     response.setException(new Exception("One or more nodes has 
not reported max resource id."));
                 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
index 6a9ed35..05e6b12 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestResponseMessage.java
@@ -24,19 +24,30 @@ import 
org.apache.asterix.runtime.transaction.GlobalResourceIdFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class ResourceIdRequestResponseMessage implements INcAddressedMessage {
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
 
     private long resourceId;
+    private int blockSize = 1;
+
     private Exception exception;
 
     public long getResourceId() {
         return resourceId;
     }
 
+    public int getBlockSize() {
+        return blockSize;
+    }
+
     public void setResourceId(long resourceId) {
         this.resourceId = resourceId;
     }
 
+    public void setResourceIdBlock(long resourceId, int blockSize) {
+        this.resourceId = resourceId;
+        this.blockSize = blockSize;
+    }
+
     public Exception getException() {
         return exception;
     }
@@ -52,6 +63,7 @@ public class ResourceIdRequestResponseMessage implements 
INcAddressedMessage {
 
     @Override
     public String toString() {
-        return ResourceIdRequestResponseMessage.class.getSimpleName();
+        return "ResourceIdRequestResponseMessage{" + "resourceId=" + 
resourceId + ", blockSize=" + blockSize
+                + ", exception=" + exception + '}';
     }
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
index 78b1f17..2bd4f81 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.runtime.transaction;
 
+import java.util.NoSuchElementException;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
@@ -26,6 +27,12 @@ import 
org.apache.asterix.runtime.message.ResourceIdRequestResponseMessage;
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.common.file.IResourceIdFactory;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import it.unimi.dsi.fastutil.longs.LongArrayFIFOQueue;
+import it.unimi.dsi.fastutil.longs.LongPriorityQueue;
+import it.unimi.dsi.fastutil.longs.LongPriorityQueues;
 
 /**
  * A resource id factory that generates unique resource ids across all NCs by 
requesting
@@ -33,7 +40,11 @@ import 
org.apache.hyracks.storage.common.file.IResourceIdFactory;
  */
 public class GlobalResourceIdFactory implements IResourceIdFactory {
 
+    private static final Logger LOGGER = LogManager.getLogger();
+    private static final int RESOURCE_ID_BLOCK_SIZE = 25;
     private final INCServiceContext serviceCtx;
+    private final LongPriorityQueue resourceIds =
+            LongPriorityQueues.synchronize(new 
LongArrayFIFOQueue(RESOURCE_ID_BLOCK_SIZE));
     private final LinkedBlockingQueue<ResourceIdRequestResponseMessage> 
resourceIdResponseQ;
     private final String nodeId;
 
@@ -44,33 +55,51 @@ public class GlobalResourceIdFactory implements 
IResourceIdFactory {
     }
 
     public void addNewIds(ResourceIdRequestResponseMessage resourceIdResponse) 
throws InterruptedException {
+        LOGGER.debug("rec'd block of ids: {}", resourceIdResponse);
         resourceIdResponseQ.put(resourceIdResponse);
     }
 
     @Override
     public long createId() throws HyracksDataException {
         try {
-            ResourceIdRequestResponseMessage reponse = null;
-            //if there already exists a response, use it
-            if (!resourceIdResponseQ.isEmpty()) {
-                synchronized (resourceIdResponseQ) {
-                    if (!resourceIdResponseQ.isEmpty()) {
-                        reponse = resourceIdResponseQ.take();
+            final long resourceId = resourceIds.dequeueLong();
+            if (resourceIds.isEmpty()) {
+                serviceCtx.getControllerService().getExecutor().submit(() -> {
+                    try {
+                        requestNewBlock();
+                    } catch (Exception e) {
+                        LOGGER.warn("failed on preemptive block request", e);
                     }
-                }
+                });
+            }
+            return resourceId;
+        } catch (NoSuchElementException e) {
+            // fallthrough
+        }
+        try {
+            // if there already exists a response, use it
+            ResourceIdRequestResponseMessage response = 
resourceIdResponseQ.poll();
+            if (response == null) {
+                requestNewBlock();
+                response = resourceIdResponseQ.take();
             }
-            //if no response available or it has an exception, request a new 
one
-            if (reponse == null || reponse.getException() != null) {
-                ResourceIdRequestMessage msg = new 
ResourceIdRequestMessage(nodeId);
-                ((INCMessageBroker) 
serviceCtx.getMessageBroker()).sendMessageToPrimaryCC(msg);
-                reponse = resourceIdResponseQ.take();
-                if (reponse.getException() != null) {
-                    throw HyracksDataException.create(reponse.getException());
-                }
+            if (response.getException() != null) {
+                throw HyracksDataException.create(response.getException());
             }
-            return reponse.getResourceId();
+            // take the first id, queue the rest
+            final long startingId = response.getResourceId();
+            for (int i = 1; i < response.getBlockSize(); i++) {
+                resourceIds.enqueue(startingId + i);
+            }
+            return startingId;
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
     }
+
+    protected void requestNewBlock() throws Exception {
+        // queue is empty; request a new block
+        ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId, 
RESOURCE_ID_BLOCK_SIZE);
+        ((INCMessageBroker) 
serviceCtx.getMessageBroker()).sendMessageToPrimaryCC(msg);
+    }
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
index 5bcd5aa..8b4fd68 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java
@@ -37,8 +37,11 @@ public class ResourceIdManager implements IResourceIdManager 
{
 
     @Override
     public long createResourceId() {
-        return csm.isClusterActive() || 
reportedNodes.containsAll(csm.getParticipantNodes(true))
-                ? globalResourceId.incrementAndGet() : -1;
+        return readyState() ? globalResourceId.incrementAndGet() : -1;
+    }
+
+    protected boolean readyState() {
+        return csm.isClusterActive() || 
reportedNodes.containsAll(csm.getParticipantNodes(true));
     }
 
     @Override
@@ -51,4 +54,9 @@ public class ResourceIdManager implements IResourceIdManager {
         globalResourceId.updateAndGet(prev -> Math.max(maxResourceId, prev));
         reportedNodes.add(nodeId);
     }
+
+    @Override
+    public long createResourceIdBlock(int blockSize) {
+        return readyState() ? globalResourceId.getAndAdd(blockSize) + 1 : -1;
+    }
 }

Reply via email to