This is an automated email from the ASF dual-hosted git repository.

wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1319480d43 [ASTERIXDB-3346][MTD] Fix GlobalResourceIdFactory race cond.
1319480d43 is described below

commit 1319480d43225deb2d972fc2cce949b28714ec5b
Author: Wail Alkowaileet <[email protected]>
AuthorDate: Mon Jan 29 09:20:46 2024 -0800

    [ASTERIXDB-3346][MTD] Fix GlobalResourceIdFactory race cond.
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    Creating a new ID can wait in indefinitely due to
    a race condition (see ASTERIXDB-3346) for more details.
    
    Change-Id: Ic7ff15abbc70277a9f1ae340314335253aa23308
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18144
    Reviewed-by: Wail Alkowaileet <[email protected]>
    Reviewed-by: Ali Alsuliman <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Integration-Tests: Wail Alkowaileet <[email protected]>
---
 .../transaction/GlobalResourceIdFactory.java       | 183 +++++++++++++++------
 1 file changed, 137 insertions(+), 46 deletions(-)

diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
index 30877d95b2..86829374ae 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
@@ -18,9 +18,12 @@
  */
 package org.apache.asterix.runtime.transaction;
 
-import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.runtime.message.ResourceIdRequestMessage;
 import org.apache.asterix.runtime.message.ResourceIdRequestResponseMessage;
@@ -32,7 +35,6 @@ import org.apache.logging.log4j.Logger;
 
 import it.unimi.dsi.fastutil.longs.LongArrayFIFOQueue;
 import it.unimi.dsi.fastutil.longs.LongPriorityQueue;
-import it.unimi.dsi.fastutil.longs.LongPriorityQueues;
 
 /**
  * A resource id factory that generates unique resource ids across all NCs by 
requesting
@@ -41,23 +43,44 @@ import it.unimi.dsi.fastutil.longs.LongPriorityQueues;
 public class GlobalResourceIdFactory implements IResourceIdFactory {
 
     private static final Logger LOGGER = LogManager.getLogger();
+    private static final long INVALID_ID = -1L;
+    /**
+     * Maximum number of attempts to request a new block of IDs
+     */
+    private static final int MAX_NUMBER_OF_ATTEMPTS = 3;
+    /**
+     * Time threshold to consider a block request was lost
+     */
+    private static final long WAIT_FOR_REQUEST_TIME_THRESHOLD_NS = 
TimeUnit.SECONDS.toNanos(2);
+    /**
+     * Wait time by threads waiting for the response with the new block
+     */
+    private static final long WAIT_FOR_BLOCK_ID_TIME_MS = 
TimeUnit.SECONDS.toMillis(2);
     private final INCServiceContext serviceCtx;
     private final LongPriorityQueue resourceIds;
-    private final LinkedBlockingQueue<ResourceIdRequestResponseMessage> 
resourceIdResponseQ;
     private final String nodeId;
     private final int initialBlockSize;
     private final int maxBlockSize;
+    /**
+     * Current number of failed block requests
+     */
+    private final AtomicInteger numberOfFailedRequests;
+    /**
+     * Last time a request of a block is initiated
+     */
+    private final AtomicLong requestTime;
     private int currentBlockSize;
     private volatile boolean reset = false;
 
     public GlobalResourceIdFactory(INCServiceContext serviceCtx, int 
initialBlockSize) {
         this.serviceCtx = serviceCtx;
-        this.resourceIdResponseQ = new LinkedBlockingQueue<>();
         this.nodeId = serviceCtx.getNodeId();
         this.initialBlockSize = initialBlockSize;
         maxBlockSize = initialBlockSize * 2;
         currentBlockSize = initialBlockSize;
-        resourceIds = LongPriorityQueues.synchronize(new 
LongArrayFIFOQueue(initialBlockSize));
+        resourceIds = new LongArrayFIFOQueue(initialBlockSize);
+        numberOfFailedRequests = new AtomicInteger();
+        requestTime = new AtomicLong();
     }
 
     public synchronized void addNewIds(ResourceIdRequestResponseMessage 
resourceIdResponse)
@@ -70,67 +93,135 @@ public class GlobalResourceIdFactory implements 
IResourceIdFactory {
                     currentBlockSize);
             return;
         }
-        resourceIdResponseQ.put(resourceIdResponse);
+        populateIDs(resourceIdResponse);
     }
 
     @Override
     public long createId() throws HyracksDataException {
+        // Rest IDs if requested to reset
+        resetIDsIfNeeded();
+        // Get a new ID if possible or request a new block
+        long id = getID();
+        while (id == INVALID_ID) {
+            // All IDs in the previous block were consumed, wait for the new 
block
+            waitForID();
+            // Retry getting a new ID again
+            id = getID();
+        }
+
+        return id;
+    }
+
+    @Override
+    public synchronized void reset() {
+        reset = true;
+        currentBlockSize += 1;
+        if (currentBlockSize > maxBlockSize) {
+            currentBlockSize = initialBlockSize;
+        }
+        LOGGER.debug("current resource ids block size: {}", currentBlockSize);
+    }
+
+    private void populateIDs(ResourceIdRequestResponseMessage response) {
+        synchronized (resourceIds) {
+            long startingId = response.getResourceId();
+            for (int i = 0; i < response.getBlockSize(); i++) {
+                resourceIds.enqueue(startingId + i);
+            }
+            // Notify all waiting threads that a new block of IDs was acquired
+            resourceIds.notifyAll();
+        }
+    }
+
+    private void resetIDsIfNeeded() throws HyracksDataException {
         synchronized (resourceIds) {
             if (reset) {
                 resourceIds.clear();
-                resourceIdResponseQ.clear();
                 reset = false;
+                // Request the initial block
+                requestNewBlock();
             }
         }
-        try {
-            final long resourceId = resourceIds.dequeueLong();
-            if (resourceIds.isEmpty()) {
-                serviceCtx.getControllerService().getExecutor().submit(() -> {
-                    try {
-                        requestNewBlock();
-                    } catch (Exception e) {
-                        LOGGER.warn("failed on preemptive block request", e);
-                    }
-                });
+    }
+
+    private long getID() throws HyracksDataException {
+        long id = INVALID_ID;
+        // Record the time of which getID was called
+        long time = System.nanoTime();
+        int size;
+        synchronized (resourceIds) {
+            size = resourceIds.size();
+            if (size > 0) {
+                id = resourceIds.dequeueLong();
             }
-            return resourceId;
-        } catch (NoSuchElementException e) {
-            // fallthrough
         }
+        if (size == 1 || size == 0 && shouldRequestNewBlock(time)) {
+            // The last ID was taken. Preemptively request a new block.
+            // Or the last request failed, retry
+            // Or waiting time for the response exceeded the maximum waiting 
time threshold
+            requestNewBlock();
+        }
+
+        return id;
+    }
+
+    private void waitForID() throws HyracksDataException {
+        long time = System.nanoTime();
         try {
-            // if there already exists a response, use it
-            ResourceIdRequestResponseMessage response = 
resourceIdResponseQ.poll();
-            if (response == null) {
-                requestNewBlock();
-                response = resourceIdResponseQ.take();
-            }
-            if (response.getException() != null) {
-                throw HyracksDataException.create(response.getException());
-            }
-            // take the first id, queue the rest
-            final long startingId = response.getResourceId();
-            for (int i = 1; i < response.getBlockSize(); i++) {
-                resourceIds.enqueue(startingId + i);
+            synchronized (resourceIds) {
+                while (resourceIds.isEmpty() && !shouldRequestNewBlock(time)) {
+                    resourceIds.wait(WAIT_FOR_BLOCK_ID_TIME_MS);
+                    time = System.nanoTime();
+                }
             }
-            return startingId;
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
     }
 
-    @Override
-    public synchronized void reset() {
-        reset = true;
-        currentBlockSize += 1;
-        if (currentBlockSize > maxBlockSize) {
-            currentBlockSize = initialBlockSize;
+    private boolean shouldRequestNewBlock(long time) {
+        int failures = numberOfFailedRequests.get();
+        long timeDiff = time - requestTime.get();
+        if (failures > 0 || timeDiff >= WAIT_FOR_REQUEST_TIME_THRESHOLD_NS) {
+            long thresholdSec = 
TimeUnit.NANOSECONDS.toSeconds(WAIT_FOR_REQUEST_TIME_THRESHOLD_NS);
+            long timeDiffSec = TimeUnit.NANOSECONDS.toSeconds(timeDiff);
+            LOGGER.warn(
+                    "Preemptive requests are either failed or lost "
+                            + "(failures:{}, number-of-failures-threshold: 
{}),"
+                            + " (time-since-last-request: {}s, time-threshold: 
{}s)",
+                    failures, MAX_NUMBER_OF_ATTEMPTS, timeDiffSec, 
thresholdSec);
+            return true;
         }
-        LOGGER.debug("current resource ids block size: {}", currentBlockSize);
+        return false;
     }
 
-    protected synchronized void requestNewBlock() throws Exception {
-        // queue is empty; request a new block
-        ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId, 
currentBlockSize);
-        ((INCMessageBroker) 
serviceCtx.getMessageBroker()).sendMessageToPrimaryCC(msg);
+    private synchronized void requestNewBlock() throws HyracksDataException {
+        int attempts = numberOfFailedRequests.get();
+        if (attempts >= MAX_NUMBER_OF_ATTEMPTS) {
+            synchronized (resourceIds) {
+                // Notify all waiting threads so they can fail as well
+                resourceIds.notifyAll();
+            }
+            throw new RuntimeDataException(ErrorCode.ILLEGAL_STATE, "New block 
request was attempted (" + attempts
+                    + " times) - exceeding the maximum number of allowed 
retries. See the logs for more information.");
+        }
+
+        requestTime.set(System.nanoTime());
+        serviceCtx.getControllerService().getExecutor().submit(() -> {
+            try {
+                ResourceIdRequestMessage msg = new 
ResourceIdRequestMessage(nodeId, currentBlockSize);
+                ((INCMessageBroker) 
serviceCtx.getMessageBroker()).sendMessageToPrimaryCC(msg);
+                // Reset the number failures
+                numberOfFailedRequests.set(0);
+            } catch (Exception e) {
+                LOGGER.warn("failed to request a new block", e);
+                // Increment the number of failures
+                numberOfFailedRequests.incrementAndGet();
+                synchronized (resourceIds) {
+                    // Notify a waiting thread (if any) to request a new block
+                    resourceIds.notify();
+                }
+            }
+        });
     }
 }

Reply via email to