This is an automated email from the ASF dual-hosted git repository.

dishatalreja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 4eb05db53 ATLAS-5101: Enable concurrent import requests and retry 
locking (#439)
4eb05db53 is described below

commit 4eb05db53f3ff1b7803deb0a116e46b4398689ce
Author: jackhalfalltrades <[email protected]>
AuthorDate: Tue Sep 23 12:35:11 2025 -0500

    ATLAS-5101: Enable concurrent import requests and retry locking (#439)
    
    Co-authored-by: Chandra Kanth Peravelli 
<[email protected]>
---
 .../atlas/repository/impexp/ImportService.java     |  9 +--
 .../store/graph/v2/AsyncImportTaskExecutor.java    | 76 +++++++++++++++++-----
 .../graph/v2/AsyncImportTaskExecutorTest.java      | 55 ++++++++++++++++
 .../apache/atlas/web/resources/AdminResource.java  | 18 +----
 4 files changed, 123 insertions(+), 35 deletions(-)

diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
index f8e8b024f..0b502515c 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
@@ -247,7 +247,7 @@ public class ImportService implements AsyncImporter {
 
     @Override
     public void onImportTypeDef(AtlasTypesDef typesDef, String importId) 
throws AtlasBaseException {
-        LOG.info("==> onImportTypeDef(typesDef={}, importId={})", typesDef, 
importId);
+        LOG.info("==> onImportTypeDef(importId={})", importId);
 
         AtlasAsyncImportRequest importRequest = 
asyncImportService.fetchImportRequestByImportId(importId);
 
@@ -272,13 +272,13 @@ public class ImportService implements AsyncImporter {
 
             asyncImportService.updateImportRequest(importRequest);
 
-            LOG.info("<== onImportTypeDef(typesDef={}, importResult={})", 
typesDef, importRequest.getImportResult());
+            LOG.info("<== onImportTypeDef()");
         }
     }
 
     @Override
     public Boolean onImportEntity(AtlasEntityWithExtInfo entityWithExtInfo, 
String importId, int position) throws AtlasBaseException {
-        LOG.info("==> onImportEntity(entityWithExtInfo={}, importId={}, 
position={})", entityWithExtInfo, importId, position);
+        LOG.info("==> onImportEntity(importId={}, position={})", importId, 
position);
 
         AtlasAsyncImportRequest importRequest = 
asyncImportService.fetchImportRequestByImportId(importId);
 
@@ -309,6 +309,7 @@ public class ImportService implements AsyncImporter {
 
             importRequest.getImportDetails().setImportProgress(resp.right);
         } catch (AtlasBaseException abe) {
+            LOG.warn("Failed to import entity: {} at position: {} for import: 
{}", entityWithExtInfo.getEntity().getGuid(), position, importId, abe);
             failedEntitiesCounter += 1;
 
             
importRequest.getImportDetails().setFailedEntitiesCount(failedEntitiesCounter);
@@ -324,7 +325,7 @@ public class ImportService implements AsyncImporter {
 
             asyncImportService.updateImportRequest(importRequest);
 
-            LOG.info("<== onImportEntity(entityWithExtInfo={}, importId={}, 
position={})", entityWithExtInfo, importId, position);
+            LOG.info("<== onImportEntity(importId={}, position={})", importId, 
position);
         }
 
         if (importRequest.getImportDetails().getPublishedEntityCount() <=
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutor.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutor.java
index a6f781cfb..cb6bebdee 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutor.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutor.java
@@ -44,6 +44,7 @@ import javax.inject.Inject;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.Callable;
 
 import static 
org.apache.atlas.notification.NotificationInterface.NotificationType.ASYNC_IMPORT;
 
@@ -52,6 +53,8 @@ public class AsyncImportTaskExecutor {
     private static final Logger LOG = 
LoggerFactory.getLogger(AsyncImportTaskExecutor.class);
 
     private static final String MESSAGE_SOURCE = 
AsyncImportTaskExecutor.class.getSimpleName();
+    private static final int MAX_RETRIES       = 3;
+    private static final int BASE_BACKOFF_MS   = 500;
 
     private final AsyncImportService    importService;
     private final NotificationInterface notificationInterface;
@@ -91,14 +94,14 @@ public class AsyncImportTaskExecutor {
     }
 
     public void publishTypeDefNotification(AtlasAsyncImportRequest 
importRequest, AtlasTypesDef atlasTypesDef) throws AtlasBaseException {
-        LOG.info("==> publishTypeDefNotification(importRequest={}, 
atlasTypesDef={})", importRequest, atlasTypesDef);
+        LOG.info("==> publishTypeDefNotification()");
 
         try {
             HookNotification typeDefImportNotification = new 
ImportNotification.AtlasTypesDefImportNotification(importRequest.getImportId(), 
importRequest.getImportResult().getUserName(), atlasTypesDef);
 
             sendToTopic(importRequest.getTopicName(), 
typeDefImportNotification);
         } finally {
-            LOG.info("<== 
publishTypeDefNotification(atlasAsyncImportRequest={})", importRequest);
+            LOG.info("<== publishTypeDefNotification()");
         }
     }
 
@@ -144,13 +147,13 @@ public class AsyncImportTaskExecutor {
         } finally {
             notificationInterface.closeProducer(ASYNC_IMPORT, 
importRequest.getTopicName());
 
-            LOG.info("<== publishImportRequest(atlasAsyncImportRequest={})", 
importRequest);
+            LOG.info("<== publishImportRequest()");
         }
     }
 
     @VisibleForTesting
     void publishEntityNotification(AtlasAsyncImportRequest importRequest, 
EntityImportStream entityImportStream) {
-        LOG.info("==> publishEntityNotification(atlasAsyncImportRequest={})", 
importRequest);
+        LOG.info("==> publishEntityNotification()");
 
         int publishedEntityCounter = 
importRequest.getImportDetails().getPublishedEntityCount();
         int failedEntityCounter    = 
importRequest.getImportDetails().getFailedEntitiesCount();
@@ -186,7 +189,7 @@ public class AsyncImportTaskExecutor {
 
                 importService.updateImportRequest(importRequest);
 
-                LOG.info("<== 
publishEntityNotification(atlasAsyncImportRequest={})", importRequest);
+                LOG.info("<== publishEntityNotification()");
             }
         }
     }
@@ -195,13 +198,13 @@ public class AsyncImportTaskExecutor {
     void skipToStartEntityPosition(AtlasAsyncImportRequest importRequest, 
EntityImportStream entityImportStream) {
         int startEntityPosition = 
importRequest.getImportTrackingInfo().getStartEntityPosition();
 
-        LOG.info("==> skipToStartEntityPosition(atlasAsyncImportRequest={}): 
position={}", importRequest, startEntityPosition);
+        LOG.info("==> skipToPosition(importId={}): startEntityPosition={}", 
importRequest.getImportId(), startEntityPosition);
 
         while (entityImportStream.hasNext() && startEntityPosition > 
entityImportStream.getPosition()) {
             entityImportStream.next();
         }
 
-        LOG.info("<== skipToStartEntityPosition(atlasAsyncImportRequest={}): 
position={}", importRequest, startEntityPosition);
+        LOG.info("<== skipToPosition()");
     }
 
     @VisibleForTesting
@@ -223,20 +226,22 @@ public class AsyncImportTaskExecutor {
                 newImportRequest.setReceivedTime(System.currentTimeMillis());
                 
newImportRequest.getImportDetails().setTotalEntitiesCount(totalEntities);
                 
newImportRequest.getImportDetails().setCreationOrder(creationOrder);
-
-                importService.saveImportRequest(newImportRequest);
-
-                LOG.info("registerRequest(importId={}): registered new request 
{}", importId, newImportRequest);
-
-                return newImportRequest;
+                return withRetry(() -> {
+                    importService.saveImportRequest(newImportRequest);
+                    LOG.info("registerRequest(importId={}): registered new 
request", importId);
+                    return newImportRequest; }, importId);
             } else if (ObjectUtils.equals(existingImportRequest.getStatus(), 
ImportStatus.STAGING)) {
                 // if we are resuming staging, we need to update the latest 
request received at
                 
existingImportRequest.setReceivedTime(System.currentTimeMillis());
 
-                importService.updateImportRequest(existingImportRequest);
+                return withRetry(() -> {
+                    importService.updateImportRequest(existingImportRequest);
+                    LOG.info("registerRequest(importId={}): resumed {}", 
importId, existingImportRequest);
+                    return existingImportRequest;
+                }, importId);
             }
 
-            // handle request in STAGING / WAITING / PROCESSING status as 
resume
+            // handle request in / WAITING / PROCESSING status as resume
             LOG.info("registerRequest(importId={}): not a new request, 
resuming {}", importId, existingImportRequest);
 
             return existingImportRequest;
@@ -249,6 +254,47 @@ public class AsyncImportTaskExecutor {
         }
     }
 
+    // retry to handle JanusGraph locking conflicts
+    private <T> T withRetry(Callable<T> action, String importId) throws 
AtlasBaseException {
+        int attempt = 0;
+
+        while (true) {
+            try {
+                return action.call();
+            } catch (Exception e) {
+                // detect JanusGraph lock contention by walking the cause chain
+                boolean lockingConflict = false;
+                for (Throwable c = e; c != null; c = c.getCause()) {
+                    if 
("org.janusgraph.diskstorage.locking.PermanentLockingException"
+                            .equals(c.getClass().getName())) {
+                        lockingConflict = true;
+                        break;
+                    }
+                }
+
+                boolean canRetry = lockingConflict && attempt < (MAX_RETRIES - 
1);
+                if (canRetry) {
+                    long backoff = (long) BASE_BACKOFF_MS * (attempt + 1);
+                    LOG.warn("Lock conflict for importId={} on attempt {}/{}, 
backing off {} ms",
+                            importId, attempt + 1, MAX_RETRIES, backoff);
+                    try {
+                        Thread.sleep(backoff);
+                    } catch (InterruptedException ignored) {
+                    }
+                    attempt++;
+                    continue; // next attempt
+                }
+
+                // Non-retryable OR last attempt failed
+                LOG.error("Failed to process importId={} on attempt {}/{}", 
importId, attempt + 1, MAX_RETRIES, e);
+                if (e instanceof AtlasBaseException) {
+                    throw (AtlasBaseException) e;
+                }
+                throw new 
AtlasBaseException(AtlasErrorCode.IMPORT_REGISTRATION_FAILED, e);
+            }
+        }
+    }
+
     private void sendToTopic(String topic, HookNotification notification) 
throws AtlasBaseException {
         try {
             notificationInterface.send(topic, 
Collections.singletonList(notification), messageSource);
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutorTest.java
 
b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutorTest.java
index 6c731d016..1fe38885e 100644
--- 
a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutorTest.java
+++ 
b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutorTest.java
@@ -30,6 +30,7 @@ import org.apache.atlas.notification.NotificationException;
 import org.apache.atlas.notification.NotificationInterface;
 import org.apache.atlas.repository.impexp.AsyncImportService;
 import 
org.apache.atlas.repository.store.graph.v2.asyncimport.ImportTaskListener;
+import org.janusgraph.diskstorage.locking.PermanentLockingException;
 import org.mockito.Mock;
 import org.mockito.MockedStatic;
 import org.mockito.Mockito;
@@ -450,6 +451,60 @@ public class AsyncImportTaskExecutorTest {
         assertEquals(exception.getAtlasErrorCode(), 
AtlasErrorCode.IMPORT_REGISTRATION_FAILED);
     }
 
+    @Test
+    public void testWithRetrySucceedsAfterLockingConflict() throws Exception {
+        AtlasImportResult result = mock(AtlasImportResult.class);
+        AtlasAsyncImportRequest newRequest = new 
AtlasAsyncImportRequest(result);
+
+        // First call fails with a PermanentLockingException, second succeeds
+        doThrow(new RuntimeException(new PermanentLockingException("lock 
conflict")))
+                .doNothing()
+                
.when(importService).saveImportRequest(any(AtlasAsyncImportRequest.class));
+
+        
when(importService.fetchImportRequestByImportId("import-id")).thenReturn(null);
+
+        AtlasAsyncImportRequest response =
+                asyncImportTaskExecutor.registerRequest(result, "import-id", 
5, Collections.emptyList());
+
+        assertNotNull(response);
+        verify(importService, 
times(2)).saveImportRequest(any(AtlasAsyncImportRequest.class));
+    }
+
+    @Test
+    public void testWithRetryFailsAfterMaxLockingConflicts() throws Exception {
+        AtlasImportResult result = mock(AtlasImportResult.class);
+
+        // Always fail with PermanentLockingException
+        doThrow(new RuntimeException(new PermanentLockingException("lock 
conflict")))
+                
.when(importService).saveImportRequest(any(AtlasAsyncImportRequest.class));
+
+        
when(importService.fetchImportRequestByImportId("import-id")).thenReturn(null);
+
+        AtlasBaseException ex = expectThrows(
+                AtlasBaseException.class,
+                () -> asyncImportTaskExecutor.registerRequest(result, 
"import-id", 5, Collections.emptyList()));
+
+        assertEquals(ex.getAtlasErrorCode(), 
AtlasErrorCode.IMPORT_REGISTRATION_FAILED);
+        verify(importService, 
times(3)).saveImportRequest(any(AtlasAsyncImportRequest.class));
+    }
+
+    @Test
+    public void testWithRetryFailsOnNonLockingException() throws Exception {
+        AtlasImportResult result = mock(AtlasImportResult.class);
+
+        // Fail with a different exception
+        doThrow(new RuntimeException("Unexpected error"))
+                
.when(importService).saveImportRequest(any(AtlasAsyncImportRequest.class));
+
+        
when(importService.fetchImportRequestByImportId("import-id")).thenReturn(null);
+
+        AtlasBaseException ex = expectThrows(
+                AtlasBaseException.class,
+                () -> asyncImportTaskExecutor.registerRequest(result, 
"import-id", 5, Collections.emptyList()));
+
+        assertEquals(ex.getAtlasErrorCode(), 
AtlasErrorCode.IMPORT_REGISTRATION_FAILED);
+    }
+
     @Test
     public void testAbortAsyncImportRequest() throws AtlasBaseException {
         AtlasAsyncImportRequest mockImportRequest = 
mock(AtlasAsyncImportRequest.class);
diff --git 
a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java 
b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
index 7064cc0da..22e51f77c 100755
--- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
+++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java
@@ -703,19 +703,9 @@ public class AdminResource {
         AtlasAuthorizationUtils.verifyAccess(new 
AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_IMPORT), "asyncImportData");
 
         AtlasAsyncImportRequest asyncImportRequest;
-        boolean                 releaseExportImportLockOnCompletion = false;
-
         try {
-            AtlasImportRequest request                 = 
AtlasType.fromJson(jsonData, AtlasImportRequest.class);
-            boolean            preventMultipleRequests = request != null && 
request.getOptions() != null && 
!request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM);
-
-            if (preventMultipleRequests) {
-                acquireExportImportLock("import");
-
-                releaseExportImportLockOnCompletion = true;
-            }
-
-            asyncImportRequest = importService.run(request, inputStream, 
Servlets.getUserName(httpServletRequest), 
Servlets.getHostName(httpServletRequest), 
AtlasAuthorizationUtils.getRequestIpAddress(httpServletRequest));
+            AtlasImportRequest request  = AtlasType.fromJson(jsonData, 
AtlasImportRequest.class);
+            asyncImportRequest          = importService.run(request, 
inputStream, Servlets.getUserName(httpServletRequest), 
Servlets.getHostName(httpServletRequest), 
AtlasAuthorizationUtils.getRequestIpAddress(httpServletRequest));
         } catch (AtlasBaseException excp) {
             if 
(excp.getAtlasErrorCode().equals(AtlasErrorCode.IMPORT_ATTEMPTING_EMPTY_ZIP)) {
                 LOG.info(excp.getMessage());
@@ -731,10 +721,6 @@ public class AdminResource {
 
             throw new AtlasBaseException(excp);
         } finally {
-            if (releaseExportImportLockOnCompletion) {
-                releaseExportImportLock();
-            }
-
             LOG.debug("<== AdminResource.importAsync(binary)");
         }
 

Reply via email to