This is an automated email from the ASF dual-hosted git repository.
wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1319480d43 [ASTERIXDB-3346][MTD] Fix GlobalResourceIdFactory race cond.
1319480d43 is described below
commit 1319480d43225deb2d972fc2cce949b28714ec5b
Author: Wail Alkowaileet <[email protected]>
AuthorDate: Mon Jan 29 09:20:46 2024 -0800
[ASTERIXDB-3346][MTD] Fix GlobalResourceIdFactory race cond.
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Creating a new ID can wait in indefinitely due to
a race condition (see ASTERIXDB-3346) for more details.
Change-Id: Ic7ff15abbc70277a9f1ae340314335253aa23308
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18144
Reviewed-by: Wail Alkowaileet <[email protected]>
Reviewed-by: Ali Alsuliman <[email protected]>
Tested-by: Jenkins <[email protected]>
Integration-Tests: Wail Alkowaileet <[email protected]>
---
.../transaction/GlobalResourceIdFactory.java | 183 +++++++++++++++------
1 file changed, 137 insertions(+), 46 deletions(-)
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
index 30877d95b2..86829374ae 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java
@@ -18,9 +18,12 @@
*/
package org.apache.asterix.runtime.transaction;
-import java.util.NoSuchElementException;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.messaging.api.INCMessageBroker;
import org.apache.asterix.runtime.message.ResourceIdRequestMessage;
import org.apache.asterix.runtime.message.ResourceIdRequestResponseMessage;
@@ -32,7 +35,6 @@ import org.apache.logging.log4j.Logger;
import it.unimi.dsi.fastutil.longs.LongArrayFIFOQueue;
import it.unimi.dsi.fastutil.longs.LongPriorityQueue;
-import it.unimi.dsi.fastutil.longs.LongPriorityQueues;
/**
* A resource id factory that generates unique resource ids across all NCs by
requesting
@@ -41,23 +43,44 @@ import it.unimi.dsi.fastutil.longs.LongPriorityQueues;
public class GlobalResourceIdFactory implements IResourceIdFactory {
private static final Logger LOGGER = LogManager.getLogger();
+ private static final long INVALID_ID = -1L;
+ /**
+ * Maximum number of attempts to request a new block of IDs
+ */
+ private static final int MAX_NUMBER_OF_ATTEMPTS = 3;
+ /**
+ * Time threshold to consider a block request was lost
+ */
+ private static final long WAIT_FOR_REQUEST_TIME_THRESHOLD_NS =
TimeUnit.SECONDS.toNanos(2);
+ /**
+ * Wait time by threads waiting for the response with the new block
+ */
+ private static final long WAIT_FOR_BLOCK_ID_TIME_MS =
TimeUnit.SECONDS.toMillis(2);
private final INCServiceContext serviceCtx;
private final LongPriorityQueue resourceIds;
- private final LinkedBlockingQueue<ResourceIdRequestResponseMessage>
resourceIdResponseQ;
private final String nodeId;
private final int initialBlockSize;
private final int maxBlockSize;
+ /**
+ * Current number of failed block requests
+ */
+ private final AtomicInteger numberOfFailedRequests;
+ /**
+ * Last time a request of a block is initiated
+ */
+ private final AtomicLong requestTime;
private int currentBlockSize;
private volatile boolean reset = false;
public GlobalResourceIdFactory(INCServiceContext serviceCtx, int
initialBlockSize) {
this.serviceCtx = serviceCtx;
- this.resourceIdResponseQ = new LinkedBlockingQueue<>();
this.nodeId = serviceCtx.getNodeId();
this.initialBlockSize = initialBlockSize;
maxBlockSize = initialBlockSize * 2;
currentBlockSize = initialBlockSize;
- resourceIds = LongPriorityQueues.synchronize(new
LongArrayFIFOQueue(initialBlockSize));
+ resourceIds = new LongArrayFIFOQueue(initialBlockSize);
+ numberOfFailedRequests = new AtomicInteger();
+ requestTime = new AtomicLong();
}
public synchronized void addNewIds(ResourceIdRequestResponseMessage
resourceIdResponse)
@@ -70,67 +93,135 @@ public class GlobalResourceIdFactory implements
IResourceIdFactory {
currentBlockSize);
return;
}
- resourceIdResponseQ.put(resourceIdResponse);
+ populateIDs(resourceIdResponse);
}
@Override
public long createId() throws HyracksDataException {
+ // Rest IDs if requested to reset
+ resetIDsIfNeeded();
+ // Get a new ID if possible or request a new block
+ long id = getID();
+ while (id == INVALID_ID) {
+ // All IDs in the previous block were consumed, wait for the new
block
+ waitForID();
+ // Retry getting a new ID again
+ id = getID();
+ }
+
+ return id;
+ }
+
+ @Override
+ public synchronized void reset() {
+ reset = true;
+ currentBlockSize += 1;
+ if (currentBlockSize > maxBlockSize) {
+ currentBlockSize = initialBlockSize;
+ }
+ LOGGER.debug("current resource ids block size: {}", currentBlockSize);
+ }
+
+ private void populateIDs(ResourceIdRequestResponseMessage response) {
+ synchronized (resourceIds) {
+ long startingId = response.getResourceId();
+ for (int i = 0; i < response.getBlockSize(); i++) {
+ resourceIds.enqueue(startingId + i);
+ }
+ // Notify all waiting threads that a new block of IDs was acquired
+ resourceIds.notifyAll();
+ }
+ }
+
+ private void resetIDsIfNeeded() throws HyracksDataException {
synchronized (resourceIds) {
if (reset) {
resourceIds.clear();
- resourceIdResponseQ.clear();
reset = false;
+ // Request the initial block
+ requestNewBlock();
}
}
- try {
- final long resourceId = resourceIds.dequeueLong();
- if (resourceIds.isEmpty()) {
- serviceCtx.getControllerService().getExecutor().submit(() -> {
- try {
- requestNewBlock();
- } catch (Exception e) {
- LOGGER.warn("failed on preemptive block request", e);
- }
- });
+ }
+
+ private long getID() throws HyracksDataException {
+ long id = INVALID_ID;
+ // Record the time of which getID was called
+ long time = System.nanoTime();
+ int size;
+ synchronized (resourceIds) {
+ size = resourceIds.size();
+ if (size > 0) {
+ id = resourceIds.dequeueLong();
}
- return resourceId;
- } catch (NoSuchElementException e) {
- // fallthrough
}
+ if (size == 1 || size == 0 && shouldRequestNewBlock(time)) {
+ // The last ID was taken. Preemptively request a new block.
+ // Or the last request failed, retry
+ // Or waiting time for the response exceeded the maximum waiting
time threshold
+ requestNewBlock();
+ }
+
+ return id;
+ }
+
+ private void waitForID() throws HyracksDataException {
+ long time = System.nanoTime();
try {
- // if there already exists a response, use it
- ResourceIdRequestResponseMessage response =
resourceIdResponseQ.poll();
- if (response == null) {
- requestNewBlock();
- response = resourceIdResponseQ.take();
- }
- if (response.getException() != null) {
- throw HyracksDataException.create(response.getException());
- }
- // take the first id, queue the rest
- final long startingId = response.getResourceId();
- for (int i = 1; i < response.getBlockSize(); i++) {
- resourceIds.enqueue(startingId + i);
+ synchronized (resourceIds) {
+ while (resourceIds.isEmpty() && !shouldRequestNewBlock(time)) {
+ resourceIds.wait(WAIT_FOR_BLOCK_ID_TIME_MS);
+ time = System.nanoTime();
+ }
}
- return startingId;
} catch (Exception e) {
throw HyracksDataException.create(e);
}
}
- @Override
- public synchronized void reset() {
- reset = true;
- currentBlockSize += 1;
- if (currentBlockSize > maxBlockSize) {
- currentBlockSize = initialBlockSize;
+ private boolean shouldRequestNewBlock(long time) {
+ int failures = numberOfFailedRequests.get();
+ long timeDiff = time - requestTime.get();
+ if (failures > 0 || timeDiff >= WAIT_FOR_REQUEST_TIME_THRESHOLD_NS) {
+ long thresholdSec =
TimeUnit.NANOSECONDS.toSeconds(WAIT_FOR_REQUEST_TIME_THRESHOLD_NS);
+ long timeDiffSec = TimeUnit.NANOSECONDS.toSeconds(timeDiff);
+ LOGGER.warn(
+ "Preemptive requests are either failed or lost "
+ + "(failures:{}, number-of-failures-threshold:
{}),"
+ + " (time-since-last-request: {}s, time-threshold:
{}s)",
+ failures, MAX_NUMBER_OF_ATTEMPTS, timeDiffSec,
thresholdSec);
+ return true;
}
- LOGGER.debug("current resource ids block size: {}", currentBlockSize);
+ return false;
}
- protected synchronized void requestNewBlock() throws Exception {
- // queue is empty; request a new block
- ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId,
currentBlockSize);
- ((INCMessageBroker)
serviceCtx.getMessageBroker()).sendMessageToPrimaryCC(msg);
+ private synchronized void requestNewBlock() throws HyracksDataException {
+ int attempts = numberOfFailedRequests.get();
+ if (attempts >= MAX_NUMBER_OF_ATTEMPTS) {
+ synchronized (resourceIds) {
+ // Notify all waiting threads so they can fail as well
+ resourceIds.notifyAll();
+ }
+ throw new RuntimeDataException(ErrorCode.ILLEGAL_STATE, "New block
request was attempted (" + attempts
+ + " times) - exceeding the maximum number of allowed
retries. See the logs for more information.");
+ }
+
+ requestTime.set(System.nanoTime());
+ serviceCtx.getControllerService().getExecutor().submit(() -> {
+ try {
+ ResourceIdRequestMessage msg = new
ResourceIdRequestMessage(nodeId, currentBlockSize);
+ ((INCMessageBroker)
serviceCtx.getMessageBroker()).sendMessageToPrimaryCC(msg);
+ // Reset the number failures
+ numberOfFailedRequests.set(0);
+ } catch (Exception e) {
+ LOGGER.warn("failed to request a new block", e);
+ // Increment the number of failures
+ numberOfFailedRequests.incrementAndGet();
+ synchronized (resourceIds) {
+ // Notify a waiting thread (if any) to request a new block
+ resourceIds.notify();
+ }
+ }
+ });
}
}