This is an automated email from the ASF dual-hosted git repository.
dishatalreja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 4eb05db53 ATLAS-5101: Enable concurrent import requests and retry
locking (#439)
4eb05db53 is described below
commit 4eb05db53f3ff1b7803deb0a116e46b4398689ce
Author: jackhalfalltrades <[email protected]>
AuthorDate: Tue Sep 23 12:35:11 2025 -0500
ATLAS-5101: Enable concurrent import requests and retry locking (#439)
Co-authored-by: Chandra Kanth Peravelli
<[email protected]>
---
.../atlas/repository/impexp/ImportService.java | 9 +--
.../store/graph/v2/AsyncImportTaskExecutor.java | 76 +++++++++++++++++-----
.../graph/v2/AsyncImportTaskExecutorTest.java | 55 ++++++++++++++++
.../apache/atlas/web/resources/AdminResource.java | 18 +----
4 files changed, 123 insertions(+), 35 deletions(-)
diff --git
a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
index f8e8b024f..0b502515c 100644
---
a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
+++
b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
@@ -247,7 +247,7 @@ public class ImportService implements AsyncImporter {
@Override
public void onImportTypeDef(AtlasTypesDef typesDef, String importId)
throws AtlasBaseException {
- LOG.info("==> onImportTypeDef(typesDef={}, importId={})", typesDef,
importId);
+ LOG.info("==> onImportTypeDef(importId={})", importId);
AtlasAsyncImportRequest importRequest =
asyncImportService.fetchImportRequestByImportId(importId);
@@ -272,13 +272,13 @@ public class ImportService implements AsyncImporter {
asyncImportService.updateImportRequest(importRequest);
- LOG.info("<== onImportTypeDef(typesDef={}, importResult={})",
typesDef, importRequest.getImportResult());
+ LOG.info("<== onImportTypeDef()");
}
}
@Override
public Boolean onImportEntity(AtlasEntityWithExtInfo entityWithExtInfo,
String importId, int position) throws AtlasBaseException {
- LOG.info("==> onImportEntity(entityWithExtInfo={}, importId={},
position={})", entityWithExtInfo, importId, position);
+ LOG.info("==> onImportEntity(importId={}, position={})", importId,
position);
AtlasAsyncImportRequest importRequest =
asyncImportService.fetchImportRequestByImportId(importId);
@@ -309,6 +309,7 @@ public class ImportService implements AsyncImporter {
importRequest.getImportDetails().setImportProgress(resp.right);
} catch (AtlasBaseException abe) {
+ LOG.warn("Failed to import entity: {} at position: {} for import:
{}", entityWithExtInfo.getEntity().getGuid(), position, importId, abe);
failedEntitiesCounter += 1;
importRequest.getImportDetails().setFailedEntitiesCount(failedEntitiesCounter);
@@ -324,7 +325,7 @@ public class ImportService implements AsyncImporter {
asyncImportService.updateImportRequest(importRequest);
- LOG.info("<== onImportEntity(entityWithExtInfo={}, importId={},
position={})", entityWithExtInfo, importId, position);
+ LOG.info("<== onImportEntity(importId={}, position={})", importId,
position);
}
if (importRequest.getImportDetails().getPublishedEntityCount() <=
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutor.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutor.java
index a6f781cfb..cb6bebdee 100644
---
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutor.java
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutor.java
@@ -44,6 +44,7 @@ import javax.inject.Inject;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.Callable;
import static
org.apache.atlas.notification.NotificationInterface.NotificationType.ASYNC_IMPORT;
@@ -52,6 +53,8 @@ public class AsyncImportTaskExecutor {
private static final Logger LOG =
LoggerFactory.getLogger(AsyncImportTaskExecutor.class);
private static final String MESSAGE_SOURCE =
AsyncImportTaskExecutor.class.getSimpleName();
+ private static final int MAX_RETRIES = 3;
+ private static final int BASE_BACKOFF_MS = 500;
private final AsyncImportService importService;
private final NotificationInterface notificationInterface;
@@ -91,14 +94,14 @@ public class AsyncImportTaskExecutor {
}
public void publishTypeDefNotification(AtlasAsyncImportRequest
importRequest, AtlasTypesDef atlasTypesDef) throws AtlasBaseException {
- LOG.info("==> publishTypeDefNotification(importRequest={},
atlasTypesDef={})", importRequest, atlasTypesDef);
+ LOG.info("==> publishTypeDefNotification()");
try {
HookNotification typeDefImportNotification = new
ImportNotification.AtlasTypesDefImportNotification(importRequest.getImportId(),
importRequest.getImportResult().getUserName(), atlasTypesDef);
sendToTopic(importRequest.getTopicName(),
typeDefImportNotification);
} finally {
- LOG.info("<==
publishTypeDefNotification(atlasAsyncImportRequest={})", importRequest);
+ LOG.info("<== publishTypeDefNotification()");
}
}
@@ -144,13 +147,13 @@ public class AsyncImportTaskExecutor {
} finally {
notificationInterface.closeProducer(ASYNC_IMPORT,
importRequest.getTopicName());
- LOG.info("<== publishImportRequest(atlasAsyncImportRequest={})",
importRequest);
+ LOG.info("<== publishImportRequest()");
}
}
@VisibleForTesting
void publishEntityNotification(AtlasAsyncImportRequest importRequest,
EntityImportStream entityImportStream) {
- LOG.info("==> publishEntityNotification(atlasAsyncImportRequest={})",
importRequest);
+ LOG.info("==> publishEntityNotification()");
int publishedEntityCounter =
importRequest.getImportDetails().getPublishedEntityCount();
int failedEntityCounter =
importRequest.getImportDetails().getFailedEntitiesCount();
@@ -186,7 +189,7 @@ public class AsyncImportTaskExecutor {
importService.updateImportRequest(importRequest);
- LOG.info("<==
publishEntityNotification(atlasAsyncImportRequest={})", importRequest);
+ LOG.info("<== publishEntityNotification()");
}
}
}
@@ -195,13 +198,13 @@ public class AsyncImportTaskExecutor {
void skipToStartEntityPosition(AtlasAsyncImportRequest importRequest,
EntityImportStream entityImportStream) {
int startEntityPosition =
importRequest.getImportTrackingInfo().getStartEntityPosition();
- LOG.info("==> skipToStartEntityPosition(atlasAsyncImportRequest={}):
position={}", importRequest, startEntityPosition);
+ LOG.info("==> skipToPosition(importId={}): startEntityPosition={}",
importRequest.getImportId(), startEntityPosition);
while (entityImportStream.hasNext() && startEntityPosition >
entityImportStream.getPosition()) {
entityImportStream.next();
}
- LOG.info("<== skipToStartEntityPosition(atlasAsyncImportRequest={}):
position={}", importRequest, startEntityPosition);
+ LOG.info("<== skipToPosition()");
}
@VisibleForTesting
@@ -223,20 +226,22 @@ public class AsyncImportTaskExecutor {
newImportRequest.setReceivedTime(System.currentTimeMillis());
newImportRequest.getImportDetails().setTotalEntitiesCount(totalEntities);
newImportRequest.getImportDetails().setCreationOrder(creationOrder);
-
- importService.saveImportRequest(newImportRequest);
-
- LOG.info("registerRequest(importId={}): registered new request
{}", importId, newImportRequest);
-
- return newImportRequest;
+ return withRetry(() -> {
+ importService.saveImportRequest(newImportRequest);
+ LOG.info("registerRequest(importId={}): registered new
request", importId);
+ return newImportRequest; }, importId);
} else if (ObjectUtils.equals(existingImportRequest.getStatus(),
ImportStatus.STAGING)) {
// if we are resuming staging, we need to update the latest
request received at
existingImportRequest.setReceivedTime(System.currentTimeMillis());
- importService.updateImportRequest(existingImportRequest);
+ return withRetry(() -> {
+ importService.updateImportRequest(existingImportRequest);
+ LOG.info("registerRequest(importId={}): resumed {}",
importId, existingImportRequest);
+ return existingImportRequest;
+ }, importId);
}
- // handle request in STAGING / WAITING / PROCESSING status as
resume
+ // handle request in / WAITING / PROCESSING status as resume
LOG.info("registerRequest(importId={}): not a new request,
resuming {}", importId, existingImportRequest);
return existingImportRequest;
@@ -249,6 +254,47 @@ public class AsyncImportTaskExecutor {
}
}
+ // retry to handle JanusGraph locking conflicts
+ private <T> T withRetry(Callable<T> action, String importId) throws
AtlasBaseException {
+ int attempt = 0;
+
+ while (true) {
+ try {
+ return action.call();
+ } catch (Exception e) {
+ // detect JanusGraph lock contention by walking the cause chain
+ boolean lockingConflict = false;
+ for (Throwable c = e; c != null; c = c.getCause()) {
+ if
("org.janusgraph.diskstorage.locking.PermanentLockingException"
+ .equals(c.getClass().getName())) {
+ lockingConflict = true;
+ break;
+ }
+ }
+
+ boolean canRetry = lockingConflict && attempt < (MAX_RETRIES -
1);
+ if (canRetry) {
+ long backoff = (long) BASE_BACKOFF_MS * (attempt + 1);
+ LOG.warn("Lock conflict for importId={} on attempt {}/{},
backing off {} ms",
+ importId, attempt + 1, MAX_RETRIES, backoff);
+ try {
+ Thread.sleep(backoff);
+ } catch (InterruptedException ignored) {
+ }
+ attempt++;
+ continue; // next attempt
+ }
+
+ // Non-retryable OR last attempt failed
+ LOG.error("Failed to process importId={} on attempt {}/{}",
importId, attempt + 1, MAX_RETRIES, e);
+ if (e instanceof AtlasBaseException) {
+ throw (AtlasBaseException) e;
+ }
+ throw new
AtlasBaseException(AtlasErrorCode.IMPORT_REGISTRATION_FAILED, e);
+ }
+ }
+ }
+
private void sendToTopic(String topic, HookNotification notification)
throws AtlasBaseException {
try {
notificationInterface.send(topic,
Collections.singletonList(notification), messageSource);
diff --git
a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutorTest.java
b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutorTest.java
index 6c731d016..1fe38885e 100644
---
a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutorTest.java
+++
b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutorTest.java
@@ -30,6 +30,7 @@ import org.apache.atlas.notification.NotificationException;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.repository.impexp.AsyncImportService;
import
org.apache.atlas.repository.store.graph.v2.asyncimport.ImportTaskListener;
+import org.janusgraph.diskstorage.locking.PermanentLockingException;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
@@ -450,6 +451,60 @@ public class AsyncImportTaskExecutorTest {
assertEquals(exception.getAtlasErrorCode(),
AtlasErrorCode.IMPORT_REGISTRATION_FAILED);
}
+ @Test
+ public void testWithRetrySucceedsAfterLockingConflict() throws Exception {
+ AtlasImportResult result = mock(AtlasImportResult.class);
+ AtlasAsyncImportRequest newRequest = new
AtlasAsyncImportRequest(result);
+
+ // First call fails with a PermanentLockingException, second succeeds
+ doThrow(new RuntimeException(new PermanentLockingException("lock
conflict")))
+ .doNothing()
+
.when(importService).saveImportRequest(any(AtlasAsyncImportRequest.class));
+
+
when(importService.fetchImportRequestByImportId("import-id")).thenReturn(null);
+
+ AtlasAsyncImportRequest response =
+ asyncImportTaskExecutor.registerRequest(result, "import-id",
5, Collections.emptyList());
+
+ assertNotNull(response);
+ verify(importService,
times(2)).saveImportRequest(any(AtlasAsyncImportRequest.class));
+ }
+
+ @Test
+ public void testWithRetryFailsAfterMaxLockingConflicts() throws Exception {
+ AtlasImportResult result = mock(AtlasImportResult.class);
+
+ // Always fail with PermanentLockingException
+ doThrow(new RuntimeException(new PermanentLockingException("lock
conflict")))
+
.when(importService).saveImportRequest(any(AtlasAsyncImportRequest.class));
+
+
when(importService.fetchImportRequestByImportId("import-id")).thenReturn(null);
+
+ AtlasBaseException ex = expectThrows(
+ AtlasBaseException.class,
+ () -> asyncImportTaskExecutor.registerRequest(result,
"import-id", 5, Collections.emptyList()));
+
+ assertEquals(ex.getAtlasErrorCode(),
AtlasErrorCode.IMPORT_REGISTRATION_FAILED);
+ verify(importService,
times(3)).saveImportRequest(any(AtlasAsyncImportRequest.class));
+ }
+
+ @Test
+ public void testWithRetryFailsOnNonLockingException() throws Exception {
+ AtlasImportResult result = mock(AtlasImportResult.class);
+
+ // Fail with a different exception
+ doThrow(new RuntimeException("Unexpected error"))
+
.when(importService).saveImportRequest(any(AtlasAsyncImportRequest.class));
+
+
when(importService.fetchImportRequestByImportId("import-id")).thenReturn(null);
+
+ AtlasBaseException ex = expectThrows(
+ AtlasBaseException.class,
+ () -> asyncImportTaskExecutor.registerRequest(result,
"import-id", 5, Collections.emptyList()));
+
+ assertEquals(ex.getAtlasErrorCode(),
AtlasErrorCode.IMPORT_REGISTRATION_FAILED);
+ }
+
@Test
public void testAbortAsyncImportRequest() throws AtlasBaseException {
AtlasAsyncImportRequest mockImportRequest =
mock(AtlasAsyncImportRequest.class);
diff --git
a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index 7064cc0da..22e51f77c 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -703,19 +703,9 @@ public class AdminResource {
AtlasAuthorizationUtils.verifyAccess(new
AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_IMPORT), "asyncImportData");
AtlasAsyncImportRequest asyncImportRequest;
- boolean releaseExportImportLockOnCompletion = false;
-
try {
- AtlasImportRequest request =
AtlasType.fromJson(jsonData, AtlasImportRequest.class);
- boolean preventMultipleRequests = request != null &&
request.getOptions() != null &&
!request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM);
-
- if (preventMultipleRequests) {
- acquireExportImportLock("import");
-
- releaseExportImportLockOnCompletion = true;
- }
-
- asyncImportRequest = importService.run(request, inputStream,
Servlets.getUserName(httpServletRequest),
Servlets.getHostName(httpServletRequest),
AtlasAuthorizationUtils.getRequestIpAddress(httpServletRequest));
+ AtlasImportRequest request = AtlasType.fromJson(jsonData,
AtlasImportRequest.class);
+ asyncImportRequest = importService.run(request,
inputStream, Servlets.getUserName(httpServletRequest),
Servlets.getHostName(httpServletRequest),
AtlasAuthorizationUtils.getRequestIpAddress(httpServletRequest));
} catch (AtlasBaseException excp) {
if
(excp.getAtlasErrorCode().equals(AtlasErrorCode.IMPORT_ATTEMPTING_EMPTY_ZIP)) {
LOG.info(excp.getMessage());
@@ -731,10 +721,6 @@ public class AdminResource {
throw new AtlasBaseException(excp);
} finally {
- if (releaseExportImportLockOnCompletion) {
- releaseExportImportLock();
- }
-
LOG.debug("<== AdminResource.importAsync(binary)");
}