This is an automated email from the ASF dual-hosted git repository. amestry pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 9cdc31d5851db61296c7ff07ecb9bc85c26939fa Author: Ashutosh Mestry <[email protected]> AuthorDate: Wed Dec 9 14:43:54 2020 -0800 ATLAS-4068: Export/Import: Conditionally Support Simpultaneous Operations. Signed-off-by: Ashutosh Mestry <[email protected]> --- .../apache/atlas/web/resources/AdminResource.java | 79 ++++++++++++++-------- 1 file changed, 52 insertions(+), 27 deletions(-) diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java index f8c953b..b20b404 100755 --- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java @@ -22,7 +22,6 @@ import com.sun.jersey.multipart.FormDataParam; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasErrorCode; -import org.apache.atlas.RequestContext; import org.apache.atlas.authorize.AtlasAdminAccessRequest; import org.apache.atlas.authorize.AtlasAuthorizationUtils; import org.apache.atlas.authorize.AtlasEntityAccessRequest; @@ -102,7 +101,6 @@ import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -392,7 +390,12 @@ public class AdminResource { AtlasAuthorizationUtils.verifyAccess(new AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_EXPORT), "export"); - acquireExportImportLock("export"); + boolean preventMultipleRequests = request != null + && !(request.getOptions().containsKey(AtlasExportRequest.OPTION_SKIP_LINEAGE) + || request.getOptions().containsKey(AtlasExportRequest.OPTION_KEY_REPLICATED_TO)); + if (preventMultipleRequests) { + acquireExportImportLock("export"); + } ZipSink exportSink = null; boolean isSuccessful = false; @@ -419,22 +422,15 @@ public class AdminResource { throw new AtlasBaseException(excp); } finally { - releaseExportImportLock(); + if (preventMultipleRequests) { + releaseExportImportLock(); + } if (exportSink != null) { exportSink.close(); } - if (isSuccessful && CollectionUtils.isNotEmpty(result.getRequest().getItemsToExport())) { - - Map<String, Object> optionMap = result.getRequest().getOptions(); - optionMap.put(OPERATION_STATUS, result.getOperationStatus().name()); - String params = AtlasJson.toJson(optionMap); - - List<AtlasObjectId> objectIds = result.getRequest().getItemsToExport(); - - auditImportExportOperations(objectIds, AuditOperation.EXPORT, params); - } + addToExportOperationAudits(isSuccessful, result); if (LOG.isDebugEnabled()) { LOG.debug("<== AdminResource.export()"); @@ -454,11 +450,15 @@ public class AdminResource { AtlasAuthorizationUtils.verifyAccess(new AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_IMPORT), "importData"); - acquireExportImportLock("import"); AtlasImportResult result = null; + boolean preventMultipleRequests = true; try { AtlasImportRequest request = AtlasType.fromJson(jsonData, AtlasImportRequest.class); + preventMultipleRequests = request != null && !request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM); + if (preventMultipleRequests) { + acquireExportImportLock("import"); + } result = importService.run(inputStream, request, Servlets.getUserName(httpServletRequest), Servlets.getHostName(httpServletRequest), @@ -477,20 +477,16 @@ public class AdminResource { throw new AtlasBaseException(excp); } finally { - releaseExportImportLock(); + if (preventMultipleRequests) { + releaseExportImportLock(); + } if (LOG.isDebugEnabled()) { LOG.debug("<== AdminResource.importData(binary)"); } } - List<AtlasObjectId> objectIds = result.getExportResult().getRequest().getItemsToExport(); - - Map<String, Object> optionMap = new HashMap<>(); - optionMap.put(OPERATION_STATUS, result.getOperationStatus().name()); - String params = AtlasJson.toJson(optionMap); - - auditImportExportOperations(objectIds, AuditOperation.IMPORT, params); + addToImportOperationAudits(result); return result; } @@ -536,13 +532,17 @@ public class AdminResource { } AtlasAuthorizationUtils.verifyAccess(new AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_IMPORT), "importFile"); - - acquireExportImportLock("importFile"); - + boolean preventMultipleRequests = true; AtlasImportResult result; try { AtlasImportRequest request = AtlasType.fromJson(jsonData, AtlasImportRequest.class); + preventMultipleRequests = request != null && request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM); + + if (preventMultipleRequests) { + acquireExportImportLock("importFile"); + } + result = importService.run(request, AtlasAuthorizationUtils.getCurrentUserName(), Servlets.getHostName(httpServletRequest), AtlasAuthorizationUtils.getRequestIpAddress(httpServletRequest)); @@ -559,7 +559,9 @@ public class AdminResource { throw new AtlasBaseException(excp); } finally { - releaseExportImportLock(); + if (preventMultipleRequests) { + releaseExportImportLock(); + } if (LOG.isDebugEnabled()) { LOG.debug("<== AdminResource.importFile()"); @@ -770,6 +772,29 @@ public class AdminResource { importExportOperationLock.lock(); } + private void addToImportOperationAudits(AtlasImportResult result) throws AtlasBaseException { + List<AtlasObjectId> objectIds = result.getExportResult().getRequest().getItemsToExport(); + + Map<String, Object> optionMap = new HashMap<>(); + optionMap.put(OPERATION_STATUS, result.getOperationStatus().name()); + String params = AtlasJson.toJson(optionMap); + + auditImportExportOperations(objectIds, AuditOperation.IMPORT, params); + } + + private void addToExportOperationAudits(boolean isSuccessful, AtlasExportResult result) throws AtlasBaseException { + if (isSuccessful && CollectionUtils.isNotEmpty(result.getRequest().getItemsToExport())) { + + Map<String, Object> optionMap = result.getRequest().getOptions(); + optionMap.put(OPERATION_STATUS, result.getOperationStatus().name()); + String params = AtlasJson.toJson(optionMap); + + List<AtlasObjectId> objectIds = result.getRequest().getItemsToExport(); + + auditImportExportOperations(objectIds, AuditOperation.EXPORT, params); + } + } + private void auditImportExportOperations(List<AtlasObjectId> objectIds, AuditOperation auditOperation, String params) throws AtlasBaseException { Map<String, Long> entityCountByType = objectIds.stream().collect(Collectors.groupingBy(AtlasObjectId::getTypeName, Collectors.counting()));
