mneethiraj commented on code in PR #307:
URL: https://github.com/apache/atlas/pull/307#discussion_r1994164078


##########
atlas-examples/sample-app/src/main/java/org/apache/atlas/examples/sampleapp/SampleApp.java:
##########
@@ -75,6 +75,17 @@ public static void main(String[] args) throws Exception {
             sampleApp.glossaryExample();
 
             entityExample.deleteEntities();
+
+            //Async Import Examples
+            AsyncImportApiExample asyncImportApiExample = new 
AsyncImportApiExample(sampleApp.getClient());
+
+            asyncImportApiExample.testImportAsyncWithZip();
+
+            asyncImportApiExample.testGetAsyncImportStatus();
+
+            String testImportId = "24cbff65a7ed60e02d099ce78cb06efd";

Review Comment:
   Instead of hardcoded importId, consider updating `testImportAsyncWithZip()` 
to return `AtlasAsyncImportRequest`, and use that ID in subsequent calls.



##########
notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java:
##########
@@ -58,6 +59,24 @@ public 
AtlasKafkaConsumer(AtlasNotificationMessageDeserializer<T> deserializer,
         this.pollTimeoutMilliSeconds = pollTimeoutMilliSeconds;
     }
 
+    @Override
+    public Set<TopicPartition> getTopicPartition() {
+        Set<TopicPartition> ret = null;
+        if (kafkaConsumer != null) {
+            ret = kafkaConsumer.assignment();
+        }
+        return ret;
+    }
+
+    @Override
+    public Set<String> subscription() {
+        Set<String> ret = null;

Review Comment:
   Consider simiplifying with:
   ```
     return kafkaConsumer != null ? kafkaConsumer. subscription() : null;
   ```



##########
notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java:
##########
@@ -429,6 +450,43 @@ private KafkaProducer getOrCreateProducerByCriteria(Object 
producerCriteria, Map
         return ret;
     }
 
+    @Override
+    public void addTopicToNotificationType(NotificationType notificationType, 
String topic) {
+        String[] topics = CONSUMER_TOPICS_MAP.get(notificationType);

Review Comment:
   Consider updating `CONSUMER_TOPICS_MAP` to replace `String[]` with `List<>` 
as the value - to simplify add/remove of topics.



##########
intg/src/main/java/org/apache/atlas/AtlasErrorCode.java:
##########
@@ -247,7 +249,12 @@ public enum AtlasErrorCode {
     FAILED_TO_UPLOAD(500, "ATLAS-500-00-015", "Error occurred while uploading 
the file: {0}"),
     FAILED_TO_CREATE_GLOSSARY_TERM(500, "ATLAS-500-00-016", "Error occurred 
while creating glossary term: {0}"),
     FAILED_TO_UPDATE_GLOSSARY_TERM(500, "ATLAS-500-00-017", "Error occurred 
while updating glossary term: {0}"),
-    NOTIFICATION_EXCEPTION(500, "ATLAS-500-00-018", "{0}");
+    NOTIFICATION_EXCEPTION(500, "ATLAS-500-00-018", "{0}"),
+    IMPORT_UPDATE_FAILED(500, "ATLAS-500-00-019", "Failed to update import: 
{0}"),
+    IMPORT_REGISTRATION_FAILED(500, "ATLAS-500-00-020", "Failed to register 
import request"),
+    IMPORT_FAILED(500, "ATLAS-500-00-021", "Given import {0} failed"),

Review Comment:
   I suggest to remove the word "Given" from error messages:
   - `Given import {0} failed` => `Import with id={0} failed`
   - `Failed to abort given import {0}` => `Failed to abort import id={0}`
   -  `Failed to add given import {0} to request queue, please try again later` 
=> `Failed to add import id={0} to request queue, please try again later`



##########
notification/src/main/java/org/apache/atlas/notification/spool/Spooler.java:
##########
@@ -81,6 +82,10 @@ public void sendInternal(NotificationType type, List<String> 
messages) {
         }
     }
 
+    @Override
+    public void sendInternal(String topic, List<String> messages) throws 
NotificationException {
+    }

Review Comment:
   No implementation needed?



##########
repository/src/main/java/org/apache/atlas/repository/impexp/AsyncImportService.java:
##########
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.SortOrder;
+import org.apache.atlas.annotation.GraphTransaction;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasAsyncImportRequest;
+import org.apache.atlas.repository.ogm.DataAccess;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.ObjectUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import javax.inject.Inject;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static 
org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ASYNC_IMPORT_TYPE_NAME;
+import static 
org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ImportStatus.ABORTED;
+import static 
org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ImportStatus.PROCESSING;
+import static 
org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ImportStatus.WAITING;
+
+@Service
+public class AsyncImportService {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AsyncImportService.class);
+
+    private final DataAccess dataAccess;
+
+    @Inject
+    public AsyncImportService(DataAccess dataAccess) {
+        this.dataAccess = dataAccess;
+    }
+
+    public synchronized AtlasAsyncImportRequest 
fetchImportRequestByImportId(String importId) {
+        try {
+            AtlasAsyncImportRequest request = new AtlasAsyncImportRequest();
+            request.setImportId(importId);
+
+            return dataAccess.load(request);
+        } catch (Exception e) {
+            LOG.error("Error fetching request with importId: {}", importId, e);
+            return null;
+        }
+    }
+
+    public void saveImportRequest(AtlasAsyncImportRequest importRequest) 
throws AtlasBaseException {
+        try {
+            dataAccess.save(importRequest);
+            LOG.debug("Save request ID: {} request: {}", 
importRequest.getImportId(), importRequest.toString());
+        } catch (AtlasBaseException e) {
+            throw e;

Review Comment:
   Add a error log here, similar to line 84 below.



##########
repository/src/main/java/org/apache/atlas/repository/impexp/AsyncImportService.java:
##########
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.SortOrder;
+import org.apache.atlas.annotation.GraphTransaction;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasAsyncImportRequest;
+import org.apache.atlas.repository.ogm.DataAccess;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.ObjectUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import javax.inject.Inject;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static 
org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ASYNC_IMPORT_TYPE_NAME;
+import static 
org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ImportStatus.ABORTED;
+import static 
org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ImportStatus.PROCESSING;
+import static 
org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ImportStatus.WAITING;
+
+@Service
+public class AsyncImportService {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AsyncImportService.class);
+
+    private final DataAccess dataAccess;
+
+    @Inject
+    public AsyncImportService(DataAccess dataAccess) {
+        this.dataAccess = dataAccess;
+    }
+
+    public synchronized AtlasAsyncImportRequest 
fetchImportRequestByImportId(String importId) {
+        try {
+            AtlasAsyncImportRequest request = new AtlasAsyncImportRequest();
+            request.setImportId(importId);
+
+            return dataAccess.load(request);
+        } catch (Exception e) {
+            LOG.error("Error fetching request with importId: {}", importId, e);
+            return null;
+        }
+    }
+
+    public void saveImportRequest(AtlasAsyncImportRequest importRequest) 
throws AtlasBaseException {
+        try {
+            dataAccess.save(importRequest);
+            LOG.debug("Save request ID: {} request: {}", 
importRequest.getImportId(), importRequest.toString());
+        } catch (AtlasBaseException e) {
+            throw e;
+        }
+    }
+
+    public void updateImportRequest(AtlasAsyncImportRequest importRequest) {
+        try {
+            saveImportRequest(importRequest);
+        } catch (AtlasBaseException abe) {
+            LOG.error("Failed to update import: {} with request: {}", 
importRequest.getImportId(), importRequest.toString());
+        }
+    }
+
+    public synchronized List<String> fetchInProgressImportIds() {
+        List<String> guids = 
AtlasGraphUtilsV2.findEntityGUIDsByType(ASYNC_IMPORT_TYPE_NAME, 
SortOrder.ASCENDING);

Review Comment:
   Insteaf of retrieving all AtlasAsyncImportRequest instances and looking for 
those in PROCESING status (line 107 below), consider adding a method to only 
retrieve guid of requests having relevant status - something along the lines of:
   
   ```
   AtlaGraphUtilsV2.findByTypeAndAttributes(ASYNC_IMPORT_TYPE_NAME, 
Collections.singletonMap(AtlasAsyncImportRequestDTO.STATUS_PROPERTY, 
PROCESSING))
   ```
   
   Such method could be used in `fetchQueuedImportRequests()` as well, to 
retrieve requests in `WAITING` status.
   
   Also, existing method `findEntityGUIDsByType()` performs sorting by 
`qualifiedName` attribute; is that useful here? Perhaps `createTime` might be 
more useful for import-requests?



##########
intg/src/main/java/org/apache/atlas/model/notification/ImportNotification.java:
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.model.notification;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import java.io.Serializable;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static 
com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+/**
+ * Class representing atlas import notification, extending HookNotification.
+ */
+@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = 
PUBLIC_ONLY, fieldVisibility = NONE)
+@JsonSerialize(include = JsonSerialize.Inclusion.ALWAYS)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.PROPERTY)
+public class ImportNotification extends HookNotification implements 
Serializable {
+    private static final long serialVersionUID = 1L;
+
+    public ImportNotification() {
+    }
+
+    public ImportNotification(HookNotificationType type, String user) {
+        super(type, user);
+    }
+
+    @Override
+    public String toString() {
+        return toString(new StringBuilder()).toString();
+    }
+
+    public StringBuilder toString(StringBuilder sb) {
+        if (sb == null) {
+            sb = new StringBuilder();
+        }
+
+        sb.append("ImportNotification{");
+        sb.append("type=").append(type);
+        sb.append(", user=").append(user);
+        sb.append("}");
+
+        return sb;
+    }
+
+    /**
+     * Notification for type definitions import
+     */
+    @JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = 
PUBLIC_ONLY, fieldVisibility = NONE)
+    @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)

Review Comment:
   `@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)` is depricated. 
Replace it with: `@JsonInclude(JsonInclude.Include.NON_NULL)`



##########
intg/src/main/java/org/apache/atlas/model/impexp/AtlasAsyncImportRequest.java:
##########
@@ -0,0 +1,400 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.model.impexp;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.atlas.model.AtlasBaseModelObject;
+import org.apache.atlas.utils.AtlasEntityUtil;
+
+import java.io.Serializable;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TimeZone;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static 
com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = 
PUBLIC_ONLY, fieldVisibility = NONE)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)

Review Comment:
   `@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)` is depricated. 
Replace it with: `@JsonInclude(JsonInclude.Include.NON_NULL)`
   



##########
notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java:
##########
@@ -58,6 +59,24 @@ public 
AtlasKafkaConsumer(AtlasNotificationMessageDeserializer<T> deserializer,
         this.pollTimeoutMilliSeconds = pollTimeoutMilliSeconds;
     }
 
+    @Override
+    public Set<TopicPartition> getTopicPartition() {
+        Set<TopicPartition> ret = null;

Review Comment:
   Consider simiplifying with:
   ```
     return kafkaConsumer != null ? kafkaConsumer.assignment() : null;
   ```



##########
client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java:
##########
@@ -140,6 +144,15 @@ public class AtlasClientV2 extends AtlasBaseClient {
     //IndexRecovery APIs
     private static final String INDEX_RECOVERY_URI = BASE_URI + 
"v2/indexrecovery";
 
+    // Async Import APIs
+    private static final String ASYNC_IMPORT_URI = BASE_URI + 
"admin/asyncImport";

Review Comment:
   I suggest to keep URL/URL-parts case-insenstive. Replace `asyncImport` with 
`async/import` in REST API endpoints.



##########
client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java:
##########
@@ -1039,6 +1052,41 @@ public API formatPathWithParameter(API api, String... 
params) {
         return formatPathParameters(api, params);
     }
 
+    private FormDataBodyPart getImportRequestBodyPart(AtlasImportRequest 
request) {
+        return new FormDataBodyPart(IMPORT_REQUEST_PARAMTER, 
AtlasType.toJson(request), MediaType.APPLICATION_JSON_TYPE);
+    }
+
+    public AtlasAsyncImportRequest importAsync(AtlasImportRequest request, 
InputStream stream) throws AtlasServiceException {
+        return performAsyncImport(getImportRequestBodyPart(request),
+                new StreamDataBodyPart(IMPORT_DATA_PARAMETER, stream));
+    }
+
+    private AtlasAsyncImportRequest performAsyncImport(BodyPart requestPart, 
BodyPart filePart) throws AtlasServiceException {
+        MultiPart multipartEntity = new FormDataMultiPart()
+                .bodyPart(requestPart)
+                .bodyPart(filePart);
+
+        return callAPI(API_V2.ASYNC_IMPORT, AtlasAsyncImportRequest.class, 
multipartEntity);
+    }
+
+    public List<Map<String, Object>> getAsyncImportStatus() throws 
AtlasServiceException {

Review Comment:
   In a deployment having a large number of async imports, the return value 
from this method could be large. To avoid potential performance impact, this 
method should support pagination by returning `PList<Map<String, Object>>`. 
Also, I suggest replacing `Map<String, Object>` with a class like 
`AsyncImportStatus`.



##########
webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java:
##########
@@ -443,6 +453,27 @@ public int getHandlerOrder() {
         return HandlerOrder.NOTIFICATION_HOOK_CONSUMER.getOrder();
     }
 
+    public void closeImportConsumer(String importId, String topic) {
+        try {
+            LOG.info("==> closeImportConsumer(importId={}, topic={})", 
importId, topic);
+            String consumerName = ATLAS_IMPORT_CONSUMER_THREAD_PREFIX + 
importId;
+            ListIterator<HookConsumer> consumersIterator = 
consumers.listIterator();
+            while (consumersIterator.hasNext()) {
+                HookConsumer consumer = consumersIterator.next();
+                if (consumer.getName().contains(consumerName)) {

Review Comment:
   `contains(consumerName)` => `startsWith(consumerName)`



##########
intg/src/main/java/org/apache/atlas/model/impexp/AtlasAsyncImportRequest.java:
##########
@@ -0,0 +1,400 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.model.impexp;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.atlas.model.AtlasBaseModelObject;
+import org.apache.atlas.utils.AtlasEntityUtil;
+
+import java.io.Serializable;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TimeZone;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static 
com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = 
PUBLIC_ONLY, fieldVisibility = NONE)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AtlasAsyncImportRequest extends AtlasBaseModelObject implements 
Serializable {
+    private static final long serialVersionUID = 1L;
+    public static final String ASYNC_IMPORT_TYPE_NAME = 
"__AtlasAsyncImportRequest";
+    public static final String ASYNC_IMPORT_TOPIC_PREFIX = "ATLAS_IMPORT_";
+    public static final String REQUEST_ID_PREFIX_PROPERTY = "async_import_";
+
+    public enum ImportStatus {
+        STAGING("STAGING"),
+        WAITING("WAITING"),
+        PROCESSING("PROCESSING"),
+        SUCCESSFUL("SUCCESSFUL"),
+        PARTIAL_SUCCESS("PARTIAL_SUCCESS"),
+        ABORTED("ABORTED"),
+        FAILED("FAILED");
+
+        private final String status;
+
+        ImportStatus(String status) {
+            this.status = status;
+        }
+
+        public String getStatus() {
+            return status;
+        }
+
+        @Override
+        public String toString() {
+            return status;
+        }
+    }
+
+    @JsonIgnore

Review Comment:
   It looks like fields `requestId` and `skipTo` are not necessary in REST 
APIs. If yes, I suggest removing them from model class and use another class 
with these fields for internal tracking, including persisting to the store.



##########
intg/src/main/java/org/apache/atlas/model/impexp/AtlasAsyncImportRequest.java:
##########
@@ -0,0 +1,400 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.model.impexp;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.atlas.model.AtlasBaseModelObject;
+import org.apache.atlas.utils.AtlasEntityUtil;
+
+import java.io.Serializable;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TimeZone;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static 
com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = 
PUBLIC_ONLY, fieldVisibility = NONE)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AtlasAsyncImportRequest extends AtlasBaseModelObject implements 
Serializable {
+    private static final long serialVersionUID = 1L;
+    public static final String ASYNC_IMPORT_TYPE_NAME = 
"__AtlasAsyncImportRequest";
+    public static final String ASYNC_IMPORT_TOPIC_PREFIX = "ATLAS_IMPORT_";
+    public static final String REQUEST_ID_PREFIX_PROPERTY = "async_import_";
+
+    public enum ImportStatus {
+        STAGING("STAGING"),
+        WAITING("WAITING"),
+        PROCESSING("PROCESSING"),
+        SUCCESSFUL("SUCCESSFUL"),
+        PARTIAL_SUCCESS("PARTIAL_SUCCESS"),
+        ABORTED("ABORTED"),
+        FAILED("FAILED");
+
+        private final String status;
+
+        ImportStatus(String status) {
+            this.status = status;
+        }
+
+        public String getStatus() {
+            return status;
+        }
+
+        @Override
+        public String toString() {
+            return status;
+        }
+    }
+
+    @JsonIgnore
+    private String requestId;
+
+    @JsonProperty("importId")

Review Comment:
   Are `@JsonProperty` annotations necessary when the field name is same as the 
value passed to the annotation?



##########
client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java:
##########
@@ -1039,6 +1052,41 @@ public API formatPathWithParameter(API api, String... 
params) {
         return formatPathParameters(api, params);
     }
 
+    private FormDataBodyPart getImportRequestBodyPart(AtlasImportRequest 
request) {
+        return new FormDataBodyPart(IMPORT_REQUEST_PARAMTER, 
AtlasType.toJson(request), MediaType.APPLICATION_JSON_TYPE);
+    }
+
+    public AtlasAsyncImportRequest importAsync(AtlasImportRequest request, 
InputStream stream) throws AtlasServiceException {
+        return performAsyncImport(getImportRequestBodyPart(request),
+                new StreamDataBodyPart(IMPORT_DATA_PARAMETER, stream));
+    }
+
+    private AtlasAsyncImportRequest performAsyncImport(BodyPart requestPart, 
BodyPart filePart) throws AtlasServiceException {

Review Comment:
   To be consistent with rest of Atlas code base, keep `private` methods after 
all `public`, `protected` and `package private` methods.
   
   



##########
webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java:
##########
@@ -454,63 +485,85 @@ void startInternal(Configuration configuration, 
ExecutorService executorService)
 
         if (!HAConfiguration.isHAEnabled(configuration)) {
             LOG.info("HA is disabled, starting consumers inline.");
+            startHookConsumers(executorService);
+        }
+    }
 
-            startConsumers(executorService);
+    public void startImportNotificationConsumer(NotificationType 
notificationType, String importId, String topic) {
+        if (topic != null) {
+            notificationInterface.addTopicToNotificationType(notificationType, 
topic);
         }
+        List<NotificationConsumer<HookNotification>> notificationConsumers = 
notificationInterface.createConsumers(notificationType, 1);
+        List<HookConsumer> hookConsumers = new ArrayList<>();
+        for (final NotificationConsumer<HookNotification> consumer : 
notificationConsumers) {
+            String hookConsumerName = ATLAS_IMPORT_CONSUMER_THREAD_PREFIX + 
importId;
+            HookConsumer hookConsumer = new HookConsumer(hookConsumerName, 
consumer);
+            hookConsumers.add(hookConsumer);
+        }
+        startConsumers(executors, hookConsumers);
     }
 
-    private void startConsumers(ExecutorService executorService) {
+    private void startHookConsumers(ExecutorService executorService) {
         int                                                           
numThreads                  = 
applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1);
         Map<NotificationConsumer<HookNotification>, NotificationType> 
notificationConsumersByType = new HashMap<>();
-
         List<NotificationConsumer<HookNotification>> notificationConsumers = 
notificationInterface.createConsumers(NotificationType.HOOK, numThreads);
-
         for (NotificationConsumer<HookNotification> notificationConsumer : 
notificationConsumers) {
             notificationConsumersByType.put(notificationConsumer, 
NotificationType.HOOK);
         }
-
         if (AtlasHook.isHookMsgsSortEnabled) {
             List<NotificationConsumer<HookNotification>> 
unsortedNotificationConsumers = 
notificationInterface.createConsumers(NotificationType.HOOK_UNSORTED, 
numThreads);
 
             for (NotificationConsumer<HookNotification> 
unsortedNotificationConsumer : unsortedNotificationConsumers) {
                 notificationConsumersByType.put(unsortedNotificationConsumer, 
NotificationType.HOOK_UNSORTED);
             }
         }
-
-        if (executorService == null) {
-            executorService = 
Executors.newFixedThreadPool(notificationConsumersByType.size(), new 
ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d").build());
-        }
-
-        executors = executorService;
-
+        List<HookConsumer> hookConsumers = new ArrayList<>();
         for (final NotificationConsumer<HookNotification> consumer : 
notificationConsumersByType.keySet()) {
             String hookConsumerName = ATLAS_HOOK_CONSUMER_THREAD_NAME;
-
             if 
(notificationConsumersByType.get(consumer).equals(NotificationType.HOOK_UNSORTED))
 {
                 hookConsumerName = ATLAS_HOOK_UNSORTED_CONSUMER_THREAD_NAME;
             }
-
             HookConsumer hookConsumer = new HookConsumer(hookConsumerName, 
consumer);
+            hookConsumers.add(hookConsumer);
+        }
+        startConsumers(executorService, hookConsumers);
+    }
 
-            consumers.add(hookConsumer);
-            executors.submit(hookConsumer);
+    private void startConsumers(ExecutorService executorService, 
List<HookConsumer> hookConsumers) {
+        if (consumers == null) {
+            consumers = new ArrayList<>();
+        }
+        if (executorService == null) {
+            executorService = new ThreadPoolExecutor(
+                    0, // Core pool size
+                    Integer.MAX_VALUE, // Maximum pool size (dynamic scaling)
+                    60L, TimeUnit.SECONDS, // Idle thread timeout
+                    new SynchronousQueue<>(), // Direct handoff queue
+                    new ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX 
+ " thread-%d").build());
+        }
+        executors = executorService;

Review Comment:
   Review all writes to `executors` i.e. following methods:
   - startInternal()
   - startConsumers()
   - stop()
   
   Prior to this patch, these methods are to be called only once in an Atlas 
instance. However, now `executors` can be overwritten for every call to async 
import. Review the usage carefully and update to avoid inappropriate overwrites.



##########
intg/src/main/java/org/apache/atlas/model/notification/ImportNotification.java:
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.model.notification;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import java.io.Serializable;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static 
com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+/**
+ * Class representing atlas import notification, extending HookNotification.
+ */
+@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = 
PUBLIC_ONLY, fieldVisibility = NONE)
+@JsonSerialize(include = JsonSerialize.Inclusion.ALWAYS)

Review Comment:
   `@JsonSerialize(include = JsonSerialize.Inclusion.ALWAYS)` is depricated. 
Replace it with: `@JsonInclude(JsonInclude.Include.ALWAYS)`



##########
intg/src/main/java/org/apache/atlas/model/notification/ImportNotification.java:
##########
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.model.notification;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import java.io.Serializable;
+
+import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
+import static 
com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
+
+/**
+ * Class representing atlas import notification, extending HookNotification.
+ */
+@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = 
PUBLIC_ONLY, fieldVisibility = NONE)
+@JsonSerialize(include = JsonSerialize.Inclusion.ALWAYS)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.PROPERTY)
+public class ImportNotification extends HookNotification implements 
Serializable {
+    private static final long serialVersionUID = 1L;
+
+    public ImportNotification() {
+    }
+
+    public ImportNotification(HookNotificationType type, String user) {
+        super(type, user);
+    }
+
+    @Override
+    public String toString() {
+        return toString(new StringBuilder()).toString();
+    }
+
+    public StringBuilder toString(StringBuilder sb) {
+        if (sb == null) {
+            sb = new StringBuilder();
+        }
+
+        sb.append("ImportNotification{");
+        sb.append("type=").append(type);
+        sb.append(", user=").append(user);
+        sb.append("}");
+
+        return sb;
+    }
+
+    /**
+     * Notification for type definitions import
+     */
+    @JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = 
PUBLIC_ONLY, fieldVisibility = NONE)
+    @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+    @JsonIgnoreProperties(ignoreUnknown = true)
+    @XmlRootElement
+    @XmlAccessorType(XmlAccessType.PROPERTY)
+    public static class AtlasTypeDefImportNotification extends 
HookNotification implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        @JsonProperty
+        private String importId;
+
+        @JsonProperty
+        private AtlasTypesDef typeDefinitionMap;
+
+        public AtlasTypeDefImportNotification() {
+        }
+
+        public AtlasTypeDefImportNotification(String importId, String user, 
AtlasTypesDef typeDefinitionMap) {
+            super(HookNotificationType.IMPORT_TYPE_DEF, user);
+            this.typeDefinitionMap = typeDefinitionMap;
+            this.importId = importId;
+        }
+
+        public AtlasTypesDef getTypeDefinitionMap() {
+            return typeDefinitionMap;
+        }
+
+        public String getImportId() {
+            return importId;
+        }
+
+        @Override
+        public String toString() {
+            return "importId=" + importId + (typeDefinitionMap == null ? 
"null" : typeDefinitionMap.toString());
+        }
+    }
+
+    /**
+     * Notification for entities import
+     */
+    @JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = 
PUBLIC_ONLY, fieldVisibility = NONE)
+    @JsonSerialize(include = JsonSerialize.Inclusion.ALWAYS)

Review Comment:
   `@JsonSerialize(include = JsonSerialize.Inclusion.ALWAYS)` is depricated. 
Replace it with: `@JsonInclude(JsonInclude.Include.ALWAYS)`



##########
notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java:
##########
@@ -205,6 +214,16 @@ public void close() {
         LOG.info("<== KafkaNotification.close()");
     }
 
+    @Override
+    public void closeConsumer(NotificationType notificationType) {
+        List<KafkaConsumer> notificationConsumers = 
this.consumers.get(notificationType);
+        for (final KafkaConsumer consumer : notificationConsumers) {

Review Comment:
   Can `notificationConsumers` be `null` here? If yes, update to handle this 
case. Also, consider using `remove()`, instead of `get()`.
   
   ```
     List<KafkaConsumer> notificationConsumers = 
this.consumers.remove(notificationType);
   
     if (notificationConsumers != null) {
       notificationConsumers.forEach(consumer -> {
         consumer.unsubscribe();
         consumer.close();;
       });
     }
   ```



##########
notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java:
##########
@@ -183,7 +188,11 @@ public void stop() {
 
     @Override
     public <T> List<NotificationConsumer<T>> createConsumers(NotificationType 
notificationType, int numConsumers) {
-        return createConsumers(notificationType, numConsumers, 
Boolean.parseBoolean(properties.getProperty("enable.auto.commit", 
properties.getProperty("auto.commit.enable", "false"))));
+        boolean enableAutoCommit = 
Boolean.parseBoolean(properties.getProperty("enable.auto.commit", 
properties.getProperty("auto.commit.enable", "false")));
+        if (notificationType.equals(NotificationType.ASYNC_IMPORT)) {
+            enableAutoCommit = true;
+        }
+        return createConsumers(notificationType, numConsumers, 
enableAutoCommit);

Review Comment:
   If `autoCommit` should be enabled for ASYNC_IMPORT notifications, I suggest 
enforcing it within `createConsumers(notificationType, numConsumers, 
enableAutoCommit)` method, so that it will be enforced for calls to method as 
well (from elsewhere).



##########
notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java:
##########
@@ -281,9 +300,7 @@ public void sendInternal(String topic, List<String> 
messages, boolean isSortNeed
     }
 
     public void sendInternal(String topic, List<String> messages) throws 
NotificationException {
-        KafkaProducer producer = getOrCreateProducer(topic);
-
-        sendInternalToProducer(producer, topic, messages);
+        sendInternal(topic, messages, SORT_NOT_NEEDED);

Review Comment:
   Is update to this method, `sendInternal(topic, messages)` needed? It seems 
earlier implementation is effectively the same as the updated one. If this 
update is not necessary, consider reverting to earlier version and remove 
static member `SORT_NOT_NEEDED`.



##########
client/client-v2/src/main/java/org/apache/atlas/AtlasClientV2.java:
##########
@@ -1039,6 +1052,41 @@ public API formatPathWithParameter(API api, String... 
params) {
         return formatPathParameters(api, params);
     }
 
+    private FormDataBodyPart getImportRequestBodyPart(AtlasImportRequest 
request) {

Review Comment:
   To be consistent with rest of Atlas code base, keep `private` methods after 
all `public`, `protected` and `package private` methods.



##########
notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java:
##########
@@ -301,7 +318,11 @@ public Properties getConsumerProperties(NotificationType 
notificationType) {
         String groupId = 
properties.getProperty(notificationType.toString().toLowerCase() + "." + 
CONSUMER_GROUP_ID_PROPERTY);
 
         if (StringUtils.isEmpty(groupId)) {
-            groupId = "atlas";
+            if (!notificationType.equals(NotificationType.ASYNC_IMPORT)) {

Review Comment:
   Consider simplifying with:
   ```
   groupId = notificationType.equals(NotificationType.ASYNC_IMPORT) ? 
"atlas-import" : "atlas";
   ```



##########
notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java:
##########
@@ -77,8 +80,10 @@ public class KafkaNotification extends AbstractNotification 
implements Service {
     private static final String[] ATLAS_ENTITIES_CONSUMER_TOPICS        = 
AtlasConfiguration.NOTIFICATION_ENTITIES_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_ENTITIES_TOPIC);
     private static final String   DEFAULT_CONSUMER_CLOSED_ERROR_MESSAGE = 
"This consumer has already been closed.";
 
+    private static final boolean SORT_NOT_NEEDED = false;
+
     private static final Map<NotificationType, String> PRODUCER_TOPIC_MAP    = 
new HashMap<>();
-    private static final Map<NotificationType, String[]> CONSUMER_TOPICS_MAP = 
new HashMap<>();
+    private static Map<NotificationType, String[]> CONSUMER_TOPICS_MAP = new 
HashMap<>();

Review Comment:
   `CONSUMER_TOPICS_MAP` is not reassigned; is it necessary to remove `final`?



##########
notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java:
##########
@@ -118,6 +118,10 @@ public void sendInternal(NotificationType 
notificationType, List<String> notific
             messages = notificationMessages;
         }
 
+        @Override
+        public void sendInternal(String topic, List<String> messages) throws 
NotificationException {
+        }

Review Comment:
   Add test covergage for `sendInternal(String, List<String>)` method.



##########
notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java:
##########
@@ -224,6 +234,15 @@ public <T> void send(NotificationType type, List<T> 
messages, MessageSource sour
         sendInternal(type, strMessages);
     }
 
+    @Override
+    public <T> void send(String topic, List<T> messages, MessageSource source) 
throws NotificationException {
+        List<String> strMessages = new ArrayList<>(messages.size());
+        for (int index = 0; index < messages.size(); index++) {

Review Comment:
   ```
   List<String> strMessages = new ArrayList<>(messages.size());
   
   for (T message : messages) {
     createNotificationMessages(message, strMessages, source);
   }
   
   sendInternal(topic, strMessages);
   ```



##########
notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java:
##########
@@ -429,6 +450,43 @@ private KafkaProducer getOrCreateProducerByCriteria(Object 
producerCriteria, Map
         return ret;
     }
 
+    @Override
+    public void addTopicToNotificationType(NotificationType notificationType, 
String topic) {
+        String[] topics = CONSUMER_TOPICS_MAP.get(notificationType);
+        String[] updatedTopics;
+        if (topics == null) {
+            updatedTopics = new String[] {topic};
+        } else {
+            updatedTopics = Stream.concat(Arrays.stream(topics), 
Stream.of(topic)).toArray(String[]::new);
+        }
+        CONSUMER_TOPICS_MAP.put(notificationType, updatedTopics);
+    }
+
+    @Override
+    public void closeProducer(NotificationType notificationType, String topic) 
{
+        KafkaProducer producerToClose = producersByTopic.get(topic);

Review Comment:
   Consider replacing `get()` with `remove()`:
   
   ```
   KafkaProducer producerToClose = producersByTopic.remove(topic);
   
   if (producerToClose != null) {
     producerToClose.close();
   }
   
   PRODUCER_TOPIC_MAP.remove(notificationType, topic);
   ```



##########
notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java:
##########
@@ -111,6 +150,9 @@ enum NotificationType {
         // Notifications from the Atlas integration hooks - unsorted.
         HOOK_UNSORTED(new HookMessageDeserializer()),
 
+        // Notifications from Atlas async importer

Review Comment:
   Move new members to end of the list.



##########
notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java:
##########
@@ -101,6 +101,45 @@ public interface NotificationInterface {
      */
     boolean isReady(NotificationType type);
 
+    /**
+     * Abstract notification wiring for async import messages
+     * @param topic async import topic to publish
+     * @param messages messages to send
+     * @param source source of the message
+     */
+    default <T> void send(String topic, List<T> messages, MessageSource 
source) throws NotificationException {}
+
+    /**
+     * Associates the specified topic with the given notification type.
+     *
+     * @param notificationType The type of notification to which the topic 
should be added.
+     * @param topic The name of the topic to be associated with the 
notification type.
+     */
+    default void addTopicToNotificationType(NotificationType notificationType, 
String topic) {}
+
+    /**
+     * Closes the producer associated with the specified notification type and 
topic.
+     *
+     * @param notificationType The type of notification for which the producer 
is to be closed.
+     * @param topic The name of the topic associated with the producer.
+     */
+    default void closeProducer(NotificationType notificationType, String 
topic) {}
+
+    /**
+     * Deletes the specified topic associated with the given notification type.
+     *
+     * @param notificationType The type of notification related to the topic.
+     * @param topicName The name of the topic to be deleted.
+     */
+    default void deleteTopics(NotificationType notificationType, String 
topicName) {}

Review Comment:
   `deleteTopics()` => `deleteTopic()`



##########
repository/src/main/java/org/apache/atlas/repository/impexp/AsyncImportService.java:
##########
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.SortOrder;
+import org.apache.atlas.annotation.GraphTransaction;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasAsyncImportRequest;
+import org.apache.atlas.repository.ogm.DataAccess;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.ObjectUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import javax.inject.Inject;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static 
org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ASYNC_IMPORT_TYPE_NAME;
+import static 
org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ImportStatus.ABORTED;
+import static 
org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ImportStatus.PROCESSING;
+import static 
org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ImportStatus.WAITING;
+
+@Service
+public class AsyncImportService {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AsyncImportService.class);
+
+    private final DataAccess dataAccess;
+
+    @Inject
+    public AsyncImportService(DataAccess dataAccess) {
+        this.dataAccess = dataAccess;
+    }
+
+    public synchronized AtlasAsyncImportRequest 
fetchImportRequestByImportId(String importId) {
+        try {
+            AtlasAsyncImportRequest request = new AtlasAsyncImportRequest();
+            request.setImportId(importId);
+
+            return dataAccess.load(request);
+        } catch (Exception e) {
+            LOG.error("Error fetching request with importId: {}", importId, e);
+            return null;
+        }
+    }
+
+    public void saveImportRequest(AtlasAsyncImportRequest importRequest) 
throws AtlasBaseException {
+        try {
+            dataAccess.save(importRequest);
+            LOG.debug("Save request ID: {} request: {}", 
importRequest.getImportId(), importRequest.toString());
+        } catch (AtlasBaseException e) {
+            throw e;
+        }
+    }
+
+    public void updateImportRequest(AtlasAsyncImportRequest importRequest) {
+        try {
+            saveImportRequest(importRequest);
+        } catch (AtlasBaseException abe) {
+            LOG.error("Failed to update import: {} with request: {}", 
importRequest.getImportId(), importRequest.toString());

Review Comment:
   `.toString()` is not necessary in lines 74 and 84. Please remove.



##########
notification/src/main/java/org/apache/atlas/notification/rest/RestNotification.java:
##########
@@ -84,6 +84,10 @@ public void sendInternal(NotificationType type, List<String> 
messages) throws No
         }
     }
 
+    @Override
+    public void sendInternal(String topic, List<String> messages) throws 
NotificationException {
+    }

Review Comment:
   No implementation? I suggest refactoring `sendInternal(notificationType, 
messages)`.



##########
repository/src/main/java/org/apache/atlas/repository/impexp/AsyncImportService.java:
##########
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.SortOrder;
+import org.apache.atlas.annotation.GraphTransaction;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasAsyncImportRequest;
+import org.apache.atlas.repository.ogm.DataAccess;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.ObjectUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import javax.inject.Inject;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static 
org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ASYNC_IMPORT_TYPE_NAME;
+import static 
org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ImportStatus.ABORTED;
+import static 
org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ImportStatus.PROCESSING;
+import static 
org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ImportStatus.WAITING;
+
+@Service
+public class AsyncImportService {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AsyncImportService.class);
+
+    private final DataAccess dataAccess;
+
+    @Inject
+    public AsyncImportService(DataAccess dataAccess) {
+        this.dataAccess = dataAccess;
+    }
+
+    public synchronized AtlasAsyncImportRequest 
fetchImportRequestByImportId(String importId) {

Review Comment:
   Why is `synchronized` needed for following methods?
   - `fetchImportRequestByImportId(importId)`
   -  `fetchInProgressImportIds()`
   -  `fetchQueuedImportRequests()`



##########
notification/src/test/java/org/apache/atlas/notification/spool/AtlasFileSpoolTest.java:
##########
@@ -160,6 +160,10 @@ public void sendInternal(NotificationType type, 
List<String> messages) {
             publishedMessages.addAll(messages);
         }
 
+        @Override
+        public void sendInternal(String topic, List<String> messages) throws 
NotificationException {
+        }

Review Comment:
   Add test covergage for sendInternal(String, List<String>) method.



##########
repository/src/main/java/org/apache/atlas/repository/impexp/AsyncImportService.java:
##########
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.SortOrder;
+import org.apache.atlas.annotation.GraphTransaction;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasAsyncImportRequest;
+import org.apache.atlas.repository.ogm.DataAccess;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.ObjectUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import javax.inject.Inject;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static 
org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ASYNC_IMPORT_TYPE_NAME;
+import static 
org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ImportStatus.ABORTED;
+import static 
org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ImportStatus.PROCESSING;
+import static 
org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ImportStatus.WAITING;
+
+@Service
+public class AsyncImportService {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AsyncImportService.class);
+
+    private final DataAccess dataAccess;
+
+    @Inject
+    public AsyncImportService(DataAccess dataAccess) {
+        this.dataAccess = dataAccess;
+    }
+
+    public synchronized AtlasAsyncImportRequest 
fetchImportRequestByImportId(String importId) {
+        try {
+            AtlasAsyncImportRequest request = new AtlasAsyncImportRequest();
+            request.setImportId(importId);
+
+            return dataAccess.load(request);
+        } catch (Exception e) {
+            LOG.error("Error fetching request with importId: {}", importId, e);
+            return null;
+        }
+    }
+
+    public void saveImportRequest(AtlasAsyncImportRequest importRequest) 
throws AtlasBaseException {
+        try {
+            dataAccess.save(importRequest);
+            LOG.debug("Save request ID: {} request: {}", 
importRequest.getImportId(), importRequest.toString());
+        } catch (AtlasBaseException e) {
+            throw e;
+        }
+    }
+
+    public void updateImportRequest(AtlasAsyncImportRequest importRequest) {
+        try {
+            saveImportRequest(importRequest);
+        } catch (AtlasBaseException abe) {
+            LOG.error("Failed to update import: {} with request: {}", 
importRequest.getImportId(), importRequest.toString());
+        }
+    }
+
+    public synchronized List<String> fetchInProgressImportIds() {
+        List<String> guids = 
AtlasGraphUtilsV2.findEntityGUIDsByType(ASYNC_IMPORT_TYPE_NAME, 
SortOrder.ASCENDING);
+
+        List<String> inProgressImportIds = new ArrayList<>();
+
+        List<AtlasAsyncImportRequest> importsToLoad = guids.stream()
+                .map(AtlasAsyncImportRequest::new)
+                .collect(Collectors.toList());
+
+        Iterable<AtlasAsyncImportRequest> allRequests = null;
+        try {
+            allRequests = dataAccess.load(importsToLoad);
+        } catch (AtlasBaseException e) {
+            LOG.error("Could not get in progress import requests.", e);
+        }
+
+        if (allRequests != null) {

Review Comment:
   Block 104 to 112 can be moved inside the `try {` block at 99.



##########
repository/src/main/java/org/apache/atlas/repository/impexp/AsyncImportService.java:
##########
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.repository.impexp;
+
+import org.apache.atlas.AtlasErrorCode;
+import org.apache.atlas.SortOrder;
+import org.apache.atlas.annotation.GraphTransaction;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasAsyncImportRequest;
+import org.apache.atlas.repository.ogm.DataAccess;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.ObjectUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import javax.inject.Inject;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static 
org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ASYNC_IMPORT_TYPE_NAME;
+import static 
org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ImportStatus.ABORTED;
+import static 
org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ImportStatus.PROCESSING;
+import static 
org.apache.atlas.model.impexp.AtlasAsyncImportRequest.ImportStatus.WAITING;
+
+@Service
+public class AsyncImportService {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AsyncImportService.class);
+
+    private final DataAccess dataAccess;
+
+    @Inject
+    public AsyncImportService(DataAccess dataAccess) {
+        this.dataAccess = dataAccess;
+    }
+
+    public synchronized AtlasAsyncImportRequest 
fetchImportRequestByImportId(String importId) {
+        try {
+            AtlasAsyncImportRequest request = new AtlasAsyncImportRequest();
+            request.setImportId(importId);
+
+            return dataAccess.load(request);
+        } catch (Exception e) {
+            LOG.error("Error fetching request with importId: {}", importId, e);
+            return null;
+        }
+    }
+
+    public void saveImportRequest(AtlasAsyncImportRequest importRequest) 
throws AtlasBaseException {
+        try {
+            dataAccess.save(importRequest);
+            LOG.debug("Save request ID: {} request: {}", 
importRequest.getImportId(), importRequest.toString());
+        } catch (AtlasBaseException e) {
+            throw e;
+        }
+    }
+
+    public void updateImportRequest(AtlasAsyncImportRequest importRequest) {
+        try {
+            saveImportRequest(importRequest);
+        } catch (AtlasBaseException abe) {
+            LOG.error("Failed to update import: {} with request: {}", 
importRequest.getImportId(), importRequest.toString());
+        }
+    }
+
+    public synchronized List<String> fetchInProgressImportIds() {
+        List<String> guids = 
AtlasGraphUtilsV2.findEntityGUIDsByType(ASYNC_IMPORT_TYPE_NAME, 
SortOrder.ASCENDING);
+
+        List<String> inProgressImportIds = new ArrayList<>();
+
+        List<AtlasAsyncImportRequest> importsToLoad = guids.stream()
+                .map(AtlasAsyncImportRequest::new)
+                .collect(Collectors.toList());
+
+        Iterable<AtlasAsyncImportRequest> allRequests = null;
+        try {
+            allRequests = dataAccess.load(importsToLoad);
+        } catch (AtlasBaseException e) {
+            LOG.error("Could not get in progress import requests.", e);
+        }
+
+        if (allRequests != null) {
+            for (AtlasAsyncImportRequest request : allRequests) {
+                if (request != null) {
+                    if (ObjectUtils.equals(request.getStatus(), PROCESSING)) {
+                        inProgressImportIds.add(request.getImportId());
+                    }
+                }
+            }
+        }
+
+        return inProgressImportIds;
+    }
+
+    public synchronized List<String> fetchQueuedImportRequests() {
+        List<String> guids = 
AtlasGraphUtilsV2.findEntityGUIDsByType(ASYNC_IMPORT_TYPE_NAME, 
SortOrder.ASCENDING);
+
+        List<String> queuedImportIds = new ArrayList<>();
+
+        List<AtlasAsyncImportRequest> importsToLoad = guids.stream()
+                .map(AtlasAsyncImportRequest::new)
+                .collect(Collectors.toList());
+
+        Iterable<AtlasAsyncImportRequest> allRequests = null;
+        try {
+            allRequests = dataAccess.load(importsToLoad);
+        } catch (AtlasBaseException e) {
+            LOG.error("Could not get all import request to be queued.", e);
+        }
+
+        if (allRequests != null) {
+            for (AtlasAsyncImportRequest request : allRequests) {
+                if (request != null) {
+                    if (ObjectUtils.equals(request.getStatus(), WAITING)) {
+                        queuedImportIds.add(request.getImportId());
+                    }
+                }
+            }
+        }
+
+        return queuedImportIds;
+    }
+
+    public void deleteRequests() {
+        try {
+            
dataAccess.delete(AtlasGraphUtilsV2.findEntityGUIDsByType(ASYNC_IMPORT_TYPE_NAME,
 SortOrder.ASCENDING));
+        } catch (Exception e) {
+            LOG.error("Error deleting import requests");
+        }
+    }
+
+    public AtlasAsyncImportRequest abortImport(String importId) throws 
AtlasBaseException {
+        AtlasAsyncImportRequest importRequestToKill = 
fetchImportRequestByImportId(importId);
+        try {
+            if (importRequestToKill == null) {
+                throw new AtlasBaseException(AtlasErrorCode.IMPORT_NOT_FOUND, 
importId);
+            }
+            if (importRequestToKill.getStatus().equals(WAITING)) {
+                importRequestToKill.setStatus(ABORTED);
+                saveImportRequest(importRequestToKill);
+                LOG.info("Successfully stopped import request: {}", importId);
+            } else {
+                LOG.error("Cannot abort import request {}: request already is 
in status: {}", importId, importRequestToKill.getStatus());
+                throw new 
AtlasBaseException(AtlasErrorCode.IMPORT_ALREADY_IN_PROGRESS, importId);

Review Comment:
   `IMPORT_ALREADY_IN_PROGRESS` may not be the correct status. The error 
message should be like:
   
   `request id={0} it is currently in state {1}, cannot be aborted`



##########
webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java:
##########
@@ -679,6 +681,105 @@ public AtlasImportResult importData(@DefaultValue("{}") 
@FormDataParam("request"
         return result;
     }
 
+    @POST
+    @Path("/asyncImport")
+    @Produces(Servlets.JSON_MEDIA_TYPE)
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public AtlasAsyncImportRequest importAsync(@DefaultValue("{}") 
@FormDataParam("request") String jsonData,
+                                               @FormDataParam("data") 
InputStream inputStream) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AdminResource.importAsync(jsonData={}, 
inputStream={})", jsonData, (inputStream != null));
+        }
+        AtlasAuthorizationUtils.verifyAccess(new 
AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_IMPORT), "asyncImportData");
+        AtlasAsyncImportRequest asyncImportRequest = null;
+        boolean preventMultipleRequests = true;
+        try {
+            AtlasImportRequest request = AtlasType.fromJson(jsonData, 
AtlasImportRequest.class);
+            preventMultipleRequests = request != null && request.getOptions() 
!= null
+                    && 
!request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM);
+
+            if (preventMultipleRequests) {
+                acquireExportImportLock("import");
+            }
+            asyncImportRequest = importService.run(request, inputStream, 
Servlets.getUserName(httpServletRequest),
+                    Servlets.getHostName(httpServletRequest), 
AtlasAuthorizationUtils.getRequestIpAddress(httpServletRequest));
+        } catch (AtlasBaseException excp) {
+            if 
(excp.getAtlasErrorCode().equals(AtlasErrorCode.IMPORT_ATTEMPTING_EMPTY_ZIP)) {
+                LOG.info(excp.getMessage());
+                return new AtlasAsyncImportRequest();
+            } else {
+                LOG.error("importData(binary) failed", excp);
+                throw excp;
+            }
+        } catch (Exception excp) {
+            LOG.error("importData(binary) failed", excp);
+            throw new AtlasBaseException(excp);
+        } finally {
+            if (preventMultipleRequests) {
+                releaseExportImportLock();
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("<== AdminResource.asyncImportData(binary)");
+            }
+        }
+
+        return asyncImportRequest;
+    }
+
+    @DELETE
+    @Path("/asyncImport/{importId}")
+    @Produces(Servlets.JSON_MEDIA_TYPE)
+    @Consumes(MediaType.APPLICATION_JSON)
+    public void deleteAsyncImportById(@PathParam("importId") String importId) 
throws AtlasBaseException {
+        importService.abortAsyncImport(importId);
+    }
+
+    @GET
+    @Path("/asyncImport/status")
+    @Produces(Servlets.JSON_MEDIA_TYPE)
+    public List<Map<String, Object>> getAsyncImportStatus() throws 
AtlasBaseException {
+        AtlasPerfTracer perf = null;
+
+        try {
+            if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+                perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, 
"AdminResource.getAsyncImportStatus()");
+            }
+
+            List<Map<String, Object>> importRequests = 
importService.getAllAsyncImports();
+
+            if (importRequests.isEmpty()) {

Review Comment:
   Is this `if` block necessary? I suggest to replace 748 - 754 with:
   ```
     return importSevice.getAllAsyncImports();
   ```



##########
webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java:
##########
@@ -679,6 +681,105 @@ public AtlasImportResult importData(@DefaultValue("{}") 
@FormDataParam("request"
         return result;
     }
 
+    @POST
+    @Path("/asyncImport")

Review Comment:
   `/asyncImport` => `/async/import` for all methods introduced here:
   - importAsync()
   - deleteAsyncImportById()
   - getAsyncImportStatus()
   - getAsyncImportStatusById()



##########
webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java:
##########
@@ -454,63 +485,85 @@ void startInternal(Configuration configuration, 
ExecutorService executorService)
 
         if (!HAConfiguration.isHAEnabled(configuration)) {
             LOG.info("HA is disabled, starting consumers inline.");
+            startHookConsumers(executorService);
+        }
+    }
 
-            startConsumers(executorService);
+    public void startImportNotificationConsumer(NotificationType 
notificationType, String importId, String topic) {

Review Comment:
   `startImportNotificationConsumer(notificationType, importId, topic)` ==> 
`startAsyncImportConsumer(importId, topic)`



##########
webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java:
##########
@@ -679,6 +681,105 @@ public AtlasImportResult importData(@DefaultValue("{}") 
@FormDataParam("request"
         return result;
     }
 
+    @POST
+    @Path("/asyncImport")
+    @Produces(Servlets.JSON_MEDIA_TYPE)
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public AtlasAsyncImportRequest importAsync(@DefaultValue("{}") 
@FormDataParam("request") String jsonData,
+                                               @FormDataParam("data") 
InputStream inputStream) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AdminResource.importAsync(jsonData={}, 
inputStream={})", jsonData, (inputStream != null));
+        }
+        AtlasAuthorizationUtils.verifyAccess(new 
AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_IMPORT), "asyncImportData");
+        AtlasAsyncImportRequest asyncImportRequest = null;
+        boolean preventMultipleRequests = true;

Review Comment:
   `preventMultipleRequests` should be initialized to `false`, so that any 
error at line 697 will no result in calls to `releaseExportImportLock()` at 
line 719.



##########
webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java:
##########
@@ -679,6 +681,105 @@ public AtlasImportResult importData(@DefaultValue("{}") 
@FormDataParam("request"
         return result;
     }
 
+    @POST
+    @Path("/asyncImport")
+    @Produces(Servlets.JSON_MEDIA_TYPE)
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public AtlasAsyncImportRequest importAsync(@DefaultValue("{}") 
@FormDataParam("request") String jsonData,
+                                               @FormDataParam("data") 
InputStream inputStream) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {

Review Comment:
   With use of parameterized log message, `if (LOG.isDebugEnabled()` is no more 
necessary. Please remove all such occurrences.



##########
webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java:
##########
@@ -679,6 +681,105 @@ public AtlasImportResult importData(@DefaultValue("{}") 
@FormDataParam("request"
         return result;
     }
 
+    @POST
+    @Path("/asyncImport")
+    @Produces(Servlets.JSON_MEDIA_TYPE)
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public AtlasAsyncImportRequest importAsync(@DefaultValue("{}") 
@FormDataParam("request") String jsonData,
+                                               @FormDataParam("data") 
InputStream inputStream) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AdminResource.importAsync(jsonData={}, 
inputStream={})", jsonData, (inputStream != null));
+        }
+        AtlasAuthorizationUtils.verifyAccess(new 
AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_IMPORT), "asyncImportData");
+        AtlasAsyncImportRequest asyncImportRequest = null;
+        boolean preventMultipleRequests = true;
+        try {
+            AtlasImportRequest request = AtlasType.fromJson(jsonData, 
AtlasImportRequest.class);
+            preventMultipleRequests = request != null && request.getOptions() 
!= null
+                    && 
!request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM);
+
+            if (preventMultipleRequests) {
+                acquireExportImportLock("import");
+            }
+            asyncImportRequest = importService.run(request, inputStream, 
Servlets.getUserName(httpServletRequest),
+                    Servlets.getHostName(httpServletRequest), 
AtlasAuthorizationUtils.getRequestIpAddress(httpServletRequest));
+        } catch (AtlasBaseException excp) {
+            if 
(excp.getAtlasErrorCode().equals(AtlasErrorCode.IMPORT_ATTEMPTING_EMPTY_ZIP)) {
+                LOG.info(excp.getMessage());
+                return new AtlasAsyncImportRequest();
+            } else {
+                LOG.error("importData(binary) failed", excp);
+                throw excp;
+            }
+        } catch (Exception excp) {
+            LOG.error("importData(binary) failed", excp);
+            throw new AtlasBaseException(excp);
+        } finally {
+            if (preventMultipleRequests) {
+                releaseExportImportLock();
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("<== AdminResource.asyncImportData(binary)");
+            }
+        }
+
+        return asyncImportRequest;
+    }
+
+    @DELETE
+    @Path("/asyncImport/{importId}")
+    @Produces(Servlets.JSON_MEDIA_TYPE)
+    @Consumes(MediaType.APPLICATION_JSON)
+    public void deleteAsyncImportById(@PathParam("importId") String importId) 
throws AtlasBaseException {
+        importService.abortAsyncImport(importId);
+    }
+
+    @GET
+    @Path("/asyncImport/status")
+    @Produces(Servlets.JSON_MEDIA_TYPE)
+    public List<Map<String, Object>> getAsyncImportStatus() throws 
AtlasBaseException {
+        AtlasPerfTracer perf = null;
+
+        try {
+            if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+                perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, 
"AdminResource.getAsyncImportStatus()");
+            }
+
+            List<Map<String, Object>> importRequests = 
importService.getAllAsyncImports();
+
+            if (importRequests.isEmpty()) {
+                return Collections.emptyList();
+            }
+
+            return importRequests;
+        } finally {
+            AtlasPerfTracer.log(perf);
+        }
+    }
+
+    @GET
+    @Path("/asyncImport/status/{importId}")
+    @Produces(Servlets.JSON_MEDIA_TYPE)
+    public AtlasAsyncImportRequest 
getAsyncImportStatusById(@PathParam("importId") String importId) throws 
AtlasBaseException {
+        AtlasPerfTracer perf = null;
+
+        try {
+            if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
+                perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, 
"AdminResource.getAsyncImportStatusById(importId=" + importId + ")");
+            }
+
+            AtlasAsyncImportRequest importRequest = 
importService.getAsyncImportStatus(importId);

Review Comment:
   getAsyncImportStatus() will not return null, so following `if` block is not 
necessary. I suggest to replace 771 - 777 with:
   ```
     return importService.getAsyncImportStatus(importId);
   ```



##########
webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java:
##########
@@ -679,6 +681,105 @@ public AtlasImportResult importData(@DefaultValue("{}") 
@FormDataParam("request"
         return result;
     }
 
+    @POST
+    @Path("/asyncImport")
+    @Produces(Servlets.JSON_MEDIA_TYPE)
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public AtlasAsyncImportRequest importAsync(@DefaultValue("{}") 
@FormDataParam("request") String jsonData,
+                                               @FormDataParam("data") 
InputStream inputStream) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AdminResource.importAsync(jsonData={}, 
inputStream={})", jsonData, (inputStream != null));
+        }
+        AtlasAuthorizationUtils.verifyAccess(new 
AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_IMPORT), "asyncImportData");
+        AtlasAsyncImportRequest asyncImportRequest = null;
+        boolean preventMultipleRequests = true;
+        try {
+            AtlasImportRequest request = AtlasType.fromJson(jsonData, 
AtlasImportRequest.class);
+            preventMultipleRequests = request != null && request.getOptions() 
!= null
+                    && 
!request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM);
+
+            if (preventMultipleRequests) {
+                acquireExportImportLock("import");
+            }
+            asyncImportRequest = importService.run(request, inputStream, 
Servlets.getUserName(httpServletRequest),
+                    Servlets.getHostName(httpServletRequest), 
AtlasAuthorizationUtils.getRequestIpAddress(httpServletRequest));
+        } catch (AtlasBaseException excp) {
+            if 
(excp.getAtlasErrorCode().equals(AtlasErrorCode.IMPORT_ATTEMPTING_EMPTY_ZIP)) {

Review Comment:
   Why the special treatment for failure with `IMPORT_ATTEMPTING_EMPTY_ZIP`? Is 
this failure treated as success? What is expected from the REST API caller?



##########
webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java:
##########
@@ -679,6 +681,105 @@ public AtlasImportResult importData(@DefaultValue("{}") 
@FormDataParam("request"
         return result;
     }
 
+    @POST
+    @Path("/asyncImport")
+    @Produces(Servlets.JSON_MEDIA_TYPE)
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    public AtlasAsyncImportRequest importAsync(@DefaultValue("{}") 
@FormDataParam("request") String jsonData,
+                                               @FormDataParam("data") 
InputStream inputStream) throws AtlasBaseException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> AdminResource.importAsync(jsonData={}, 
inputStream={})", jsonData, (inputStream != null));
+        }
+        AtlasAuthorizationUtils.verifyAccess(new 
AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_IMPORT), "asyncImportData");
+        AtlasAsyncImportRequest asyncImportRequest = null;
+        boolean preventMultipleRequests = true;
+        try {
+            AtlasImportRequest request = AtlasType.fromJson(jsonData, 
AtlasImportRequest.class);
+            preventMultipleRequests = request != null && request.getOptions() 
!= null
+                    && 
!request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM);
+
+            if (preventMultipleRequests) {
+                acquireExportImportLock("import");
+            }
+            asyncImportRequest = importService.run(request, inputStream, 
Servlets.getUserName(httpServletRequest),
+                    Servlets.getHostName(httpServletRequest), 
AtlasAuthorizationUtils.getRequestIpAddress(httpServletRequest));
+        } catch (AtlasBaseException excp) {
+            if 
(excp.getAtlasErrorCode().equals(AtlasErrorCode.IMPORT_ATTEMPTING_EMPTY_ZIP)) {
+                LOG.info(excp.getMessage());
+                return new AtlasAsyncImportRequest();
+            } else {
+                LOG.error("importData(binary) failed", excp);

Review Comment:
   `importData()` => `importAsync()`.
   
   Please review and update other messages as well.



##########
webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java:
##########
@@ -443,6 +453,27 @@ public int getHandlerOrder() {
         return HandlerOrder.NOTIFICATION_HOOK_CONSUMER.getOrder();
     }
 
+    public void closeImportConsumer(String importId, String topic) {
+        try {
+            LOG.info("==> closeImportConsumer(importId={}, topic={})", 
importId, topic);
+            String consumerName = ATLAS_IMPORT_CONSUMER_THREAD_PREFIX + 
importId;
+            ListIterator<HookConsumer> consumersIterator = 
consumers.listIterator();
+            while (consumersIterator.hasNext()) {
+                HookConsumer consumer = consumersIterator.next();
+                if (consumer.getName().contains(consumerName)) {
+                    consumer.shutdown();
+                    consumersIterator.remove();
+                }
+            }
+            notificationInterface.closeConsumer(ASYNC_IMPORT);

Review Comment:
   This would close all consumers of type `ASYNC_IMPORT`. Shouldn't only 
consumers of `topic` be closed instead? Perhaps only one consumer will be 
active for type `ASYNC_IMPORT` in the current implementation status; but this 
part of code should better handle multiple simultaneous imports.



##########
webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java:
##########
@@ -170,6 +177,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     private static final int    KAFKA_CONSUMER_SHUTDOWN_WAIT                   
 = 30000;
     private static final String ATLAS_HOOK_CONSUMER_THREAD_NAME                
 = "atlas-hook-consumer-thread";
     private static final String ATLAS_HOOK_UNSORTED_CONSUMER_THREAD_NAME       
 = "atlas-hook-unsorted-consumer-thread";
+    private static final String ATLAS_IMPORT_CONSUMER_THREAD_PREFIX            
 = "atlas-import-hook-consumer-thread-";

Review Comment:
   `atlas-import-hook-consumer-thread-` => `atlas-import-consumer-thread-`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@atlas.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to