mneethiraj commented on code in PR #336:
URL: https://github.com/apache/atlas/pull/336#discussion_r2279269189


##########
addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorService.java:
##########
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.cli;
+
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.trino.client.AtlasClientHelper;
+import org.apache.atlas.trino.client.TrinoClientHelper;
+import org.apache.atlas.trino.model.Catalog;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class ExtractorService {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ExtractorService.class);
+
+    public static final  int               THREAD_POOL_SIZE         = 5;
+    public static final  int               CATALOG_EXECUTION_TIMEOUT = 60;
+    public static final  String            TRINO_NAME_ATTRIBUTE = "name";
+    private static final String            TRINO_CATALOG_REGISTERED = 
"atlas.trino.catalogs.registered";
+    private static final String            TRINO_CATALOG_HOOK_ENABLED_PREFIX = 
"atlas.trino.catalog.hook.enabled.";
+    private static final String            TRINO_CATALOG_HOOK_ENABLED_SUFFIX = 
".namespace";
+    private static       Configuration     atlasProperties;
+    private static       TrinoClientHelper trinoClientHelper;
+    private static       String            trinoNamespace;
+    private              ExtractorContext  context;
+
+    public boolean execute(ExtractorContext context) throws Exception {
+        this.context      = context;
+        atlasProperties   = context.getAtlasConf();
+        trinoClientHelper = context.getTrinoConnector();
+        trinoNamespace    = context.getNamespace();
+
+        Map<String, String> catalogs = trinoClientHelper.getAllTrinoCatalogs();
+        LOG.info("Found {} catalogs in Trino: {}", catalogs.size(), 
catalogs.keySet());
+
+        try {
+            processCatalogs(context, catalogs);
+            deleteCatalogs(context, catalogs);
+        } catch (AtlasServiceException e) {
+            throw new AtlasServiceException(e);
+        }
+        return true;
+    }
+
+    public void processCatalogs(ExtractorContext context, Map<String, String> 
catalogInTrino) throws AtlasServiceException {

Review Comment:
   - make `processCatalog()` as `private` as this method is not called outside 
this class



##########
addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorService.java:
##########
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.cli;
+
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.trino.client.AtlasClientHelper;
+import org.apache.atlas.trino.client.TrinoClientHelper;
+import org.apache.atlas.trino.model.Catalog;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class ExtractorService {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ExtractorService.class);
+
+    public static final  int               THREAD_POOL_SIZE         = 5;
+    public static final  int               CATALOG_EXECUTION_TIMEOUT = 60;
+    public static final  String            TRINO_NAME_ATTRIBUTE = "name";
+    private static final String            TRINO_CATALOG_REGISTERED = 
"atlas.trino.catalogs.registered";
+    private static final String            TRINO_CATALOG_HOOK_ENABLED_PREFIX = 
"atlas.trino.catalog.hook.enabled.";
+    private static final String            TRINO_CATALOG_HOOK_ENABLED_SUFFIX = 
".namespace";
+    private static       Configuration     atlasProperties;
+    private static       TrinoClientHelper trinoClientHelper;
+    private static       String            trinoNamespace;
+    private              ExtractorContext  context;
+
+    public boolean execute(ExtractorContext context) throws Exception {
+        this.context      = context;
+        atlasProperties   = context.getAtlasConf();
+        trinoClientHelper = context.getTrinoConnector();
+        trinoNamespace    = context.getNamespace();
+
+        Map<String, String> catalogs = trinoClientHelper.getAllTrinoCatalogs();
+        LOG.info("Found {} catalogs in Trino: {}", catalogs.size(), 
catalogs.keySet());
+
+        try {
+            processCatalogs(context, catalogs);
+            deleteCatalogs(context, catalogs);
+        } catch (AtlasServiceException e) {
+            throw new AtlasServiceException(e);
+        }
+        return true;
+    }
+
+    public void processCatalogs(ExtractorContext context, Map<String, String> 
catalogInTrino) throws AtlasServiceException {
+        if (MapUtils.isEmpty(catalogInTrino)) {
+            LOG.debug("No catalogs found under Trino");
+            return;
+        }
+
+        List<Catalog> catalogsToProcess = new ArrayList<>();
+
+        if (StringUtils.isEmpty(context.getCatalog())) {
+            String[] registeredCatalogs = 
atlasProperties.getStringArray(TRINO_CATALOG_REGISTERED);
+
+            if (registeredCatalogs != null) {
+                for (String registeredCatalog : registeredCatalogs) {
+                    if (catalogInTrino.containsKey(registeredCatalog)) {
+                        
catalogsToProcess.add(getCatalogInstance(registeredCatalog, 
catalogInTrino.get(registeredCatalog)));
+                    }
+                }
+            }
+        } else {
+            if (catalogInTrino.containsKey(context.getCatalog())) {
+                Catalog catalog = getCatalogInstance(context.getCatalog(), 
catalogInTrino.get(context.getCatalog()));
+                catalog.setSchemaToImport(context.getSchema());
+                catalog.setTableToImport(context.getTable());
+                catalogsToProcess.add(catalog);
+            }
+        }
+
+        if (CollectionUtils.isEmpty(catalogsToProcess)) {
+            LOG.warn("No catalogs found to process");
+            return;
+        } else {
+            LOG.info("{} catalogs to be extracted", 
catalogsToProcess.stream().map(Catalog::getName).collect(Collectors.toList()));
+        }
+
+        AtlasEntity.AtlasEntityWithExtInfo trinoInstanceEntity = 
AtlasClientHelper.createOrUpdateInstanceEntity(trinoNamespace);
+
+        ExecutorService catalogExecutor = 
Executors.newFixedThreadPool(Math.min(catalogsToProcess.size(), 
THREAD_POOL_SIZE));
+        List<Future<?>> futures         = new ArrayList<>();
+
+        for (Catalog currentCatalog : catalogsToProcess) {
+            futures.add(catalogExecutor.submit(() -> {
+                try {
+                    currentCatalog.setTrinoInstanceEntity(trinoInstanceEntity);
+                    processCatalog(currentCatalog);
+                } catch (Exception e) {
+                    LOG.error("Error processing catalog: {}", currentCatalog, 
e);
+                }
+            }));
+        }
+        catalogExecutor.shutdown();
+
+        try {
+            if (!catalogExecutor.awaitTermination(CATALOG_EXECUTION_TIMEOUT, 
TimeUnit.MINUTES)) {
+                LOG.warn("Catalog processing did not complete within the 
timeout. {} minutes", CATALOG_EXECUTION_TIMEOUT);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            LOG.error("Catalog processing was interrupted", e);
+        }
+
+        LOG.info("Catalogs scanned for creation/updation completed");
+    }
+
+    public void processCatalog(Catalog catalog) throws AtlasServiceException, 
SQLException {

Review Comment:
   - make `processCatalog()` as `private` as this method is not called outside 
this class



##########
addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorService.java:
##########
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.cli;
+
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.trino.client.AtlasClientHelper;
+import org.apache.atlas.trino.client.TrinoClientHelper;
+import org.apache.atlas.trino.model.Catalog;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class ExtractorService {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ExtractorService.class);
+
+    public static final  int               THREAD_POOL_SIZE         = 5;
+    public static final  int               CATALOG_EXECUTION_TIMEOUT = 60;
+    public static final  String            TRINO_NAME_ATTRIBUTE = "name";
+    private static final String            TRINO_CATALOG_REGISTERED = 
"atlas.trino.catalogs.registered";
+    private static final String            TRINO_CATALOG_HOOK_ENABLED_PREFIX = 
"atlas.trino.catalog.hook.enabled.";
+    private static final String            TRINO_CATALOG_HOOK_ENABLED_SUFFIX = 
".namespace";
+    private static       Configuration     atlasProperties;

Review Comment:
   - Following static/instance members derive value from the parameter passed 
to `execute()` method, hence their state shouldn't be preserved beyond a call 
to `execute()`. Please replace these members with appropriate methods on 
`context` passed to `execute()` method:
     -  `atlasProperties`
     - `trinoClientHelper`
     - `trinoNamespace`
     - `context`



##########
addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorService.java:
##########
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.cli;
+
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.trino.client.AtlasClientHelper;
+import org.apache.atlas.trino.client.TrinoClientHelper;
+import org.apache.atlas.trino.model.Catalog;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class ExtractorService {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ExtractorService.class);
+
+    public static final  int               THREAD_POOL_SIZE         = 5;
+    public static final  int               CATALOG_EXECUTION_TIMEOUT = 60;
+    public static final  String            TRINO_NAME_ATTRIBUTE = "name";
+    private static final String            TRINO_CATALOG_REGISTERED = 
"atlas.trino.catalogs.registered";
+    private static final String            TRINO_CATALOG_HOOK_ENABLED_PREFIX = 
"atlas.trino.catalog.hook.enabled.";
+    private static final String            TRINO_CATALOG_HOOK_ENABLED_SUFFIX = 
".namespace";
+    private static       Configuration     atlasProperties;
+    private static       TrinoClientHelper trinoClientHelper;
+    private static       String            trinoNamespace;
+    private              ExtractorContext  context;
+
+    public boolean execute(ExtractorContext context) throws Exception {
+        this.context      = context;
+        atlasProperties   = context.getAtlasConf();
+        trinoClientHelper = context.getTrinoConnector();
+        trinoNamespace    = context.getNamespace();
+
+        Map<String, String> catalogs = trinoClientHelper.getAllTrinoCatalogs();
+        LOG.info("Found {} catalogs in Trino: {}", catalogs.size(), 
catalogs.keySet());
+
+        try {
+            processCatalogs(context, catalogs);
+            deleteCatalogs(context, catalogs);
+        } catch (AtlasServiceException e) {
+            throw new AtlasServiceException(e);
+        }
+        return true;
+    }
+
+    public void processCatalogs(ExtractorContext context, Map<String, String> 
catalogInTrino) throws AtlasServiceException {
+        if (MapUtils.isEmpty(catalogInTrino)) {
+            LOG.debug("No catalogs found under Trino");
+            return;
+        }
+
+        List<Catalog> catalogsToProcess = new ArrayList<>();
+
+        if (StringUtils.isEmpty(context.getCatalog())) {
+            String[] registeredCatalogs = 
atlasProperties.getStringArray(TRINO_CATALOG_REGISTERED);
+
+            if (registeredCatalogs != null) {
+                for (String registeredCatalog : registeredCatalogs) {
+                    if (catalogInTrino.containsKey(registeredCatalog)) {
+                        
catalogsToProcess.add(getCatalogInstance(registeredCatalog, 
catalogInTrino.get(registeredCatalog)));
+                    }
+                }
+            }
+        } else {
+            if (catalogInTrino.containsKey(context.getCatalog())) {
+                Catalog catalog = getCatalogInstance(context.getCatalog(), 
catalogInTrino.get(context.getCatalog()));
+                catalog.setSchemaToImport(context.getSchema());
+                catalog.setTableToImport(context.getTable());
+                catalogsToProcess.add(catalog);
+            }
+        }
+
+        if (CollectionUtils.isEmpty(catalogsToProcess)) {
+            LOG.warn("No catalogs found to process");
+            return;
+        } else {
+            LOG.info("{} catalogs to be extracted", 
catalogsToProcess.stream().map(Catalog::getName).collect(Collectors.toList()));
+        }
+
+        AtlasEntity.AtlasEntityWithExtInfo trinoInstanceEntity = 
AtlasClientHelper.createOrUpdateInstanceEntity(trinoNamespace);
+
+        ExecutorService catalogExecutor = 
Executors.newFixedThreadPool(Math.min(catalogsToProcess.size(), 
THREAD_POOL_SIZE));
+        List<Future<?>> futures         = new ArrayList<>();
+
+        for (Catalog currentCatalog : catalogsToProcess) {
+            futures.add(catalogExecutor.submit(() -> {
+                try {
+                    currentCatalog.setTrinoInstanceEntity(trinoInstanceEntity);
+                    processCatalog(currentCatalog);
+                } catch (Exception e) {
+                    LOG.error("Error processing catalog: {}", currentCatalog, 
e);
+                }
+            }));
+        }
+        catalogExecutor.shutdown();
+
+        try {
+            if (!catalogExecutor.awaitTermination(CATALOG_EXECUTION_TIMEOUT, 
TimeUnit.MINUTES)) {
+                LOG.warn("Catalog processing did not complete within the 
timeout. {} minutes", CATALOG_EXECUTION_TIMEOUT);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            LOG.error("Catalog processing was interrupted", e);
+        }
+
+        LOG.info("Catalogs scanned for creation/updation completed");
+    }
+
+    public void processCatalog(Catalog catalog) throws AtlasServiceException, 
SQLException {
+        if (catalog != null) {
+            LOG.info("Started extracting {} catalog:", catalog.getName());
+            String catalogName = catalog.getName();
+
+            AtlasEntity.AtlasEntityWithExtInfo trinoCatalogEntity = 
AtlasClientHelper.createOrUpdateCatalogEntity(catalog);
+
+            List<String> schemas = 
trinoClientHelper.getTrinoSchemas(catalogName, catalog.getSchemaToImport());
+            LOG.info("Found {} schema under {} catalog", schemas.size(), 
catalogName);
+
+            processSchemas(catalog, trinoCatalogEntity.getEntity(), schemas);
+
+            if (StringUtils.isEmpty(context.getSchema())) {
+                deleteSchemas(schemas, 
trinoCatalogEntity.getEntity().getGuid());
+            }
+        }
+    }
+
+    public void processSchemas(Catalog catalog, AtlasEntity 
trinoCatalogEntity, List<String> schemaToImport) {
+        for (String schemaName : schemaToImport) {
+            LOG.info("Started extracting {} schema:", schemaName);
+            try {
+                AtlasEntity.AtlasEntityWithExtInfo schemaEntity = 
AtlasClientHelper.createOrUpdateSchemaEntity(catalog, trinoCatalogEntity, 
schemaName);
+
+                Map<String, Map<String, Object>> tables = 
trinoClientHelper.getTrinoTables(catalog.getName(), schemaName, 
catalog.getTableToImport());
+                LOG.info("Found {} tables under {}.{} catalog.schema", 
tables.size(), catalog.getName(), schemaName);
+
+                processTables(catalog, schemaName, schemaEntity.getEntity(), 
tables);
+
+                if (StringUtils.isEmpty(context.getTable())) {
+                    deleteTables(new ArrayList<>(tables.keySet()), 
schemaEntity.getEntity().getGuid());
+                }
+            } catch (Exception e) {
+                LOG.error("Error processing schema: {}", schemaName);
+            }
+        }
+    }
+
+    public void processTables(Catalog catalog, String schemaName, AtlasEntity 
schemaEntity,  Map<String, Map<String, Object>> trinoTables) {
+        for (String trinoTableName : trinoTables.keySet()) {
+            LOG.info("Started extracting {} table:", trinoTableName);
+
+            try {
+                Map<String, Map<String, Object>> trinoColumns = 
trinoClientHelper.getTrinoColumns(catalog.getName(), schemaName, 
trinoTableName);
+                LOG.info("Found {} columns under {}.{}.{} 
catalog.schema.table", trinoColumns.size(), catalog.getName(), schemaName, 
trinoTableName);
+
+                AtlasClientHelper.createOrUpdateTableEntity(catalog, 
schemaName, trinoTableName, trinoTables.get(trinoTableName), trinoColumns, 
schemaEntity);
+            } catch (Exception e) {
+                LOG.error("Error processing table: {}", trinoTableName, e);
+            }
+        }
+    }
+
+    public void deleteCatalogs(ExtractorContext context, Map<String, String> 
catalogInTrino) throws AtlasServiceException {

Review Comment:
   - make `deleteCatalogs()` as `private` as this method is not called outside 
this class
   



##########
addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorService.java:
##########
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.cli;
+
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.trino.client.AtlasClientHelper;
+import org.apache.atlas.trino.client.TrinoClientHelper;
+import org.apache.atlas.trino.model.Catalog;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class ExtractorService {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ExtractorService.class);
+
+    public static final  int               THREAD_POOL_SIZE         = 5;
+    public static final  int               CATALOG_EXECUTION_TIMEOUT = 60;
+    public static final  String            TRINO_NAME_ATTRIBUTE = "name";
+    private static final String            TRINO_CATALOG_REGISTERED = 
"atlas.trino.catalogs.registered";
+    private static final String            TRINO_CATALOG_HOOK_ENABLED_PREFIX = 
"atlas.trino.catalog.hook.enabled.";
+    private static final String            TRINO_CATALOG_HOOK_ENABLED_SUFFIX = 
".namespace";
+    private static       Configuration     atlasProperties;
+    private static       TrinoClientHelper trinoClientHelper;
+    private static       String            trinoNamespace;
+    private              ExtractorContext  context;
+
+    public boolean execute(ExtractorContext context) throws Exception {
+        this.context      = context;
+        atlasProperties   = context.getAtlasConf();
+        trinoClientHelper = context.getTrinoConnector();
+        trinoNamespace    = context.getNamespace();
+
+        Map<String, String> catalogs = trinoClientHelper.getAllTrinoCatalogs();
+        LOG.info("Found {} catalogs in Trino: {}", catalogs.size(), 
catalogs.keySet());
+
+        try {
+            processCatalogs(context, catalogs);
+            deleteCatalogs(context, catalogs);
+        } catch (AtlasServiceException e) {
+            throw new AtlasServiceException(e);
+        }
+        return true;
+    }
+
+    public void processCatalogs(ExtractorContext context, Map<String, String> 
catalogInTrino) throws AtlasServiceException {
+        if (MapUtils.isEmpty(catalogInTrino)) {
+            LOG.debug("No catalogs found under Trino");
+            return;
+        }
+
+        List<Catalog> catalogsToProcess = new ArrayList<>();
+
+        if (StringUtils.isEmpty(context.getCatalog())) {
+            String[] registeredCatalogs = 
atlasProperties.getStringArray(TRINO_CATALOG_REGISTERED);
+
+            if (registeredCatalogs != null) {
+                for (String registeredCatalog : registeredCatalogs) {
+                    if (catalogInTrino.containsKey(registeredCatalog)) {
+                        
catalogsToProcess.add(getCatalogInstance(registeredCatalog, 
catalogInTrino.get(registeredCatalog)));
+                    }
+                }
+            }
+        } else {
+            if (catalogInTrino.containsKey(context.getCatalog())) {
+                Catalog catalog = getCatalogInstance(context.getCatalog(), 
catalogInTrino.get(context.getCatalog()));
+                catalog.setSchemaToImport(context.getSchema());
+                catalog.setTableToImport(context.getTable());
+                catalogsToProcess.add(catalog);
+            }
+        }
+
+        if (CollectionUtils.isEmpty(catalogsToProcess)) {
+            LOG.warn("No catalogs found to process");
+            return;
+        } else {
+            LOG.info("{} catalogs to be extracted", 
catalogsToProcess.stream().map(Catalog::getName).collect(Collectors.toList()));
+        }
+
+        AtlasEntity.AtlasEntityWithExtInfo trinoInstanceEntity = 
AtlasClientHelper.createOrUpdateInstanceEntity(trinoNamespace);
+
+        ExecutorService catalogExecutor = 
Executors.newFixedThreadPool(Math.min(catalogsToProcess.size(), 
THREAD_POOL_SIZE));
+        List<Future<?>> futures         = new ArrayList<>();
+
+        for (Catalog currentCatalog : catalogsToProcess) {
+            futures.add(catalogExecutor.submit(() -> {
+                try {
+                    currentCatalog.setTrinoInstanceEntity(trinoInstanceEntity);
+                    processCatalog(currentCatalog);
+                } catch (Exception e) {
+                    LOG.error("Error processing catalog: {}", currentCatalog, 
e);
+                }
+            }));
+        }
+        catalogExecutor.shutdown();
+
+        try {
+            if (!catalogExecutor.awaitTermination(CATALOG_EXECUTION_TIMEOUT, 
TimeUnit.MINUTES)) {
+                LOG.warn("Catalog processing did not complete within the 
timeout. {} minutes", CATALOG_EXECUTION_TIMEOUT);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            LOG.error("Catalog processing was interrupted", e);
+        }
+
+        LOG.info("Catalogs scanned for creation/updation completed");
+    }
+
+    public void processCatalog(Catalog catalog) throws AtlasServiceException, 
SQLException {
+        if (catalog != null) {
+            LOG.info("Started extracting {} catalog:", catalog.getName());
+            String catalogName = catalog.getName();
+
+            AtlasEntity.AtlasEntityWithExtInfo trinoCatalogEntity = 
AtlasClientHelper.createOrUpdateCatalogEntity(catalog);
+
+            List<String> schemas = 
trinoClientHelper.getTrinoSchemas(catalogName, catalog.getSchemaToImport());
+            LOG.info("Found {} schema under {} catalog", schemas.size(), 
catalogName);
+
+            processSchemas(catalog, trinoCatalogEntity.getEntity(), schemas);
+
+            if (StringUtils.isEmpty(context.getSchema())) {
+                deleteSchemas(schemas, 
trinoCatalogEntity.getEntity().getGuid());
+            }
+        }
+    }
+
+    public void processSchemas(Catalog catalog, AtlasEntity 
trinoCatalogEntity, List<String> schemaToImport) {
+        for (String schemaName : schemaToImport) {
+            LOG.info("Started extracting {} schema:", schemaName);
+            try {
+                AtlasEntity.AtlasEntityWithExtInfo schemaEntity = 
AtlasClientHelper.createOrUpdateSchemaEntity(catalog, trinoCatalogEntity, 
schemaName);
+
+                Map<String, Map<String, Object>> tables = 
trinoClientHelper.getTrinoTables(catalog.getName(), schemaName, 
catalog.getTableToImport());
+                LOG.info("Found {} tables under {}.{} catalog.schema", 
tables.size(), catalog.getName(), schemaName);
+
+                processTables(catalog, schemaName, schemaEntity.getEntity(), 
tables);
+
+                if (StringUtils.isEmpty(context.getTable())) {
+                    deleteTables(new ArrayList<>(tables.keySet()), 
schemaEntity.getEntity().getGuid());
+                }
+            } catch (Exception e) {
+                LOG.error("Error processing schema: {}", schemaName);
+            }
+        }
+    }
+
+    public void processTables(Catalog catalog, String schemaName, AtlasEntity 
schemaEntity,  Map<String, Map<String, Object>> trinoTables) {

Review Comment:
   - make `processTables()` as `private` as this method is not called outside 
this class



##########
addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorContext.java:
##########
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.cli;
+
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.trino.client.AtlasClientHelper;
+import org.apache.atlas.trino.client.TrinoClientHelper;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.configuration.Configuration;
+
+import java.io.IOException;
+
+public class ExtractorContext {
+
+    static final String TRINO_NAMESPACE_CONF         = "atlas.trino.namespace";
+    static final String DEFAULT_TRINO_NAMESPACE      = "cm";
+    static final String OPTION_CATALOG_SHORT         = "c";
+    static final String OPTION_CATALOG_LONG          = "catalog";
+    static final String OPTION_SCHEMA_SHORT          = "s";
+    static final String OPTION_SCHEMA_LONG           = "schema";
+    static final String OPTION_TABLE_SHORT           = "t";
+    static final String OPTION_TABLE_LONG            = "table";
+    static final String OPTION_CRON_EXPRESSION_SHORT = "cx";
+    static final String OPTION_CRON_EXPRESSION_LONG  = "cronExpression";
+    static final String OPTION_HELP_SHORT            = "h";
+    static final String OPTION_HELP_LONG             = "help";
+    private final Configuration     atlasConf;
+    private       String            namespace;

Review Comment:
   All members can be made `final`.



##########
addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorService.java:
##########
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.cli;
+
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.trino.client.AtlasClientHelper;
+import org.apache.atlas.trino.client.TrinoClientHelper;
+import org.apache.atlas.trino.model.Catalog;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class ExtractorService {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ExtractorService.class);
+
+    public static final  int               THREAD_POOL_SIZE         = 5;
+    public static final  int               CATALOG_EXECUTION_TIMEOUT = 60;
+    public static final  String            TRINO_NAME_ATTRIBUTE = "name";
+    private static final String            TRINO_CATALOG_REGISTERED = 
"atlas.trino.catalogs.registered";
+    private static final String            TRINO_CATALOG_HOOK_ENABLED_PREFIX = 
"atlas.trino.catalog.hook.enabled.";
+    private static final String            TRINO_CATALOG_HOOK_ENABLED_SUFFIX = 
".namespace";
+    private static       Configuration     atlasProperties;
+    private static       TrinoClientHelper trinoClientHelper;
+    private static       String            trinoNamespace;
+    private              ExtractorContext  context;
+
+    public boolean execute(ExtractorContext context) throws Exception {
+        this.context      = context;
+        atlasProperties   = context.getAtlasConf();
+        trinoClientHelper = context.getTrinoConnector();
+        trinoNamespace    = context.getNamespace();
+
+        Map<String, String> catalogs = trinoClientHelper.getAllTrinoCatalogs();
+        LOG.info("Found {} catalogs in Trino: {}", catalogs.size(), 
catalogs.keySet());
+
+        try {
+            processCatalogs(context, catalogs);
+            deleteCatalogs(context, catalogs);
+        } catch (AtlasServiceException e) {
+            throw new AtlasServiceException(e);
+        }
+        return true;
+    }
+
+    public void processCatalogs(ExtractorContext context, Map<String, String> 
catalogInTrino) throws AtlasServiceException {
+        if (MapUtils.isEmpty(catalogInTrino)) {
+            LOG.debug("No catalogs found under Trino");
+            return;
+        }
+
+        List<Catalog> catalogsToProcess = new ArrayList<>();
+
+        if (StringUtils.isEmpty(context.getCatalog())) {
+            String[] registeredCatalogs = 
atlasProperties.getStringArray(TRINO_CATALOG_REGISTERED);
+
+            if (registeredCatalogs != null) {
+                for (String registeredCatalog : registeredCatalogs) {
+                    if (catalogInTrino.containsKey(registeredCatalog)) {
+                        
catalogsToProcess.add(getCatalogInstance(registeredCatalog, 
catalogInTrino.get(registeredCatalog)));
+                    }
+                }
+            }
+        } else {
+            if (catalogInTrino.containsKey(context.getCatalog())) {
+                Catalog catalog = getCatalogInstance(context.getCatalog(), 
catalogInTrino.get(context.getCatalog()));
+                catalog.setSchemaToImport(context.getSchema());
+                catalog.setTableToImport(context.getTable());
+                catalogsToProcess.add(catalog);
+            }
+        }
+
+        if (CollectionUtils.isEmpty(catalogsToProcess)) {
+            LOG.warn("No catalogs found to process");
+            return;
+        } else {
+            LOG.info("{} catalogs to be extracted", 
catalogsToProcess.stream().map(Catalog::getName).collect(Collectors.toList()));
+        }
+
+        AtlasEntity.AtlasEntityWithExtInfo trinoInstanceEntity = 
AtlasClientHelper.createOrUpdateInstanceEntity(trinoNamespace);
+
+        ExecutorService catalogExecutor = 
Executors.newFixedThreadPool(Math.min(catalogsToProcess.size(), 
THREAD_POOL_SIZE));
+        List<Future<?>> futures         = new ArrayList<>();
+
+        for (Catalog currentCatalog : catalogsToProcess) {
+            futures.add(catalogExecutor.submit(() -> {
+                try {
+                    currentCatalog.setTrinoInstanceEntity(trinoInstanceEntity);
+                    processCatalog(currentCatalog);
+                } catch (Exception e) {
+                    LOG.error("Error processing catalog: {}", currentCatalog, 
e);
+                }
+            }));
+        }
+        catalogExecutor.shutdown();
+
+        try {
+            if (!catalogExecutor.awaitTermination(CATALOG_EXECUTION_TIMEOUT, 
TimeUnit.MINUTES)) {
+                LOG.warn("Catalog processing did not complete within the 
timeout. {} minutes", CATALOG_EXECUTION_TIMEOUT);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            LOG.error("Catalog processing was interrupted", e);
+        }
+
+        LOG.info("Catalogs scanned for creation/updation completed");
+    }
+
+    public void processCatalog(Catalog catalog) throws AtlasServiceException, 
SQLException {
+        if (catalog != null) {
+            LOG.info("Started extracting {} catalog:", catalog.getName());
+            String catalogName = catalog.getName();
+
+            AtlasEntity.AtlasEntityWithExtInfo trinoCatalogEntity = 
AtlasClientHelper.createOrUpdateCatalogEntity(catalog);
+
+            List<String> schemas = 
trinoClientHelper.getTrinoSchemas(catalogName, catalog.getSchemaToImport());
+            LOG.info("Found {} schema under {} catalog", schemas.size(), 
catalogName);
+
+            processSchemas(catalog, trinoCatalogEntity.getEntity(), schemas);
+
+            if (StringUtils.isEmpty(context.getSchema())) {
+                deleteSchemas(schemas, 
trinoCatalogEntity.getEntity().getGuid());
+            }
+        }
+    }
+
+    public void processSchemas(Catalog catalog, AtlasEntity 
trinoCatalogEntity, List<String> schemaToImport) {

Review Comment:
   - make `processSchemas()` as `private` as this method is not called outside 
this class



##########
addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/AtlasClientHelper.java:
##########
@@ -0,0 +1,430 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.client;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.discovery.AtlasSearchResult;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.instance.EntityMutations;
+import org.apache.atlas.trino.model.Catalog;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.atlas.utils.AuthenticationUtil;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.atlas.type.AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME;
+
+public class AtlasClientHelper {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AtlasClientHelper.class);
+
+    public static final String TRINO_INSTANCE                            = 
"trino_instance";
+    public static final String TRINO_CATALOG                             = 
"trino_catalog";
+    public static final String TRINO_SCHEMA                              = 
"trino_schema";
+    public static final String TRINO_TABLE                               = 
"trino_table";
+    public static final String TRINO_COLUMN                              = 
"trino_column";
+    public static final String TRINO_INSTANCE_CATALOG_ATTRIBUTE          = 
"catalogs";
+    public static final String TRINO_CATALOG_SCHEMA_ATTRIBUTE            = 
"schemas";
+    public static final String TRINO_SCHEMA_TABLE_ATTRIBUTE              = 
"tables";
+    public static final String QUALIFIED_NAME_ATTRIBUTE                  = 
"qualifiedName";
+    public static final String  NAME_ATTRIBUTE                           = 
"name";
+    public static final  int    DEFAULT_PAGE_LIMIT                       = 
10000;
+    private static final String DEFAULT_ATLAS_URL                        = 
"http://localhost:21000/";;
+    private static final String APPLICATION_PROPERTY_ATLAS_ENDPOINT      = 
"atlas.rest.address";
+    private static final String TRINO_CATALOG_CONNECTOR_TYPE_ATTRIBUTE   = 
"connectorType";
+    private static final String TRINO_CATALOG_INSTANCE_ATTRIBUTE         = 
"instance";
+    private static final String TRINO_CATALOG_INSTANCE_RELATIONSHIP      = 
"trino_instance_catalog";
+    private static final String TRINO_SCHEMA_CATALOG_ATTRIBUTE           = 
"catalog";
+    private static final String TRINO_SCHEMA_CATALOG_RELATIONSHIP        = 
"trino_schema_catalog";
+    private static final String TRINO_COLUMN_DATA_TYPE_ATTRIBUTE         = 
"data_type";
+    private static final String TRINO_COLUMN_ORIDINAL_POSITION_ATTRIBUTE = 
"ordinal_position";
+    private static final String TRINO_COLUMN_COLUMN_DEFAULT_ATTRIBUTE    = 
"column_default";
+    private static final String TRINO_COLUMN_IS_NULLABLE_ATTRIBUTE       = 
"is_nullable";
+    private static final String TRINO_COLUMN_TABLE_ATTRIBUTE             = 
"table";
+    private static final String TRINO_TABLE_TYPE                         = 
"table_type";
+    private static final String TRINO_TABLE_COLUMN_RELATIONSHIP          = 
"trino_table_columns";
+    private static final String TRINO_TABLE_SCHEMA_RELATIONSHIP          = 
"trino_table_schema";
+    private static final String TRINO_TABLE_SCHEMA_ATTRIBUTE             = 
"trinoschema";
+    private static final String TRINO_TABLE_COLUMN_ATTRIBUTE             = 
"columns";
+
+    private static AtlasClientV2 atlasClientV2;
+
+    public AtlasClientHelper(Configuration atlasConf) throws IOException {
+        atlasClientV2 = getAtlasClientV2Instance(atlasConf);
+    }
+
+    public static synchronized AtlasClientV2 
getAtlasClientV2Instance(Configuration atlasConf) throws IOException {

Review Comment:
   `getAtlasClientV2Instance()` is not called from outside this class. Consider 
marking this as `private`.



##########
addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorService.java:
##########
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.cli;
+
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.trino.client.AtlasClientHelper;
+import org.apache.atlas.trino.client.TrinoClientHelper;
+import org.apache.atlas.trino.model.Catalog;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class ExtractorService {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ExtractorService.class);
+
+    public static final  int               THREAD_POOL_SIZE         = 5;
+    public static final  int               CATALOG_EXECUTION_TIMEOUT = 60;
+    public static final  String            TRINO_NAME_ATTRIBUTE = "name";
+    private static final String            TRINO_CATALOG_REGISTERED = 
"atlas.trino.catalogs.registered";
+    private static final String            TRINO_CATALOG_HOOK_ENABLED_PREFIX = 
"atlas.trino.catalog.hook.enabled.";
+    private static final String            TRINO_CATALOG_HOOK_ENABLED_SUFFIX = 
".namespace";
+    private static       Configuration     atlasProperties;
+    private static       TrinoClientHelper trinoClientHelper;
+    private static       String            trinoNamespace;
+    private              ExtractorContext  context;
+
+    public boolean execute(ExtractorContext context) throws Exception {
+        this.context      = context;
+        atlasProperties   = context.getAtlasConf();
+        trinoClientHelper = context.getTrinoConnector();
+        trinoNamespace    = context.getNamespace();
+
+        Map<String, String> catalogs = trinoClientHelper.getAllTrinoCatalogs();
+        LOG.info("Found {} catalogs in Trino: {}", catalogs.size(), 
catalogs.keySet());
+
+        try {
+            processCatalogs(context, catalogs);
+            deleteCatalogs(context, catalogs);
+        } catch (AtlasServiceException e) {
+            throw new AtlasServiceException(e);
+        }
+        return true;
+    }
+
+    public void processCatalogs(ExtractorContext context, Map<String, String> 
catalogInTrino) throws AtlasServiceException {
+        if (MapUtils.isEmpty(catalogInTrino)) {
+            LOG.debug("No catalogs found under Trino");
+            return;
+        }
+
+        List<Catalog> catalogsToProcess = new ArrayList<>();
+
+        if (StringUtils.isEmpty(context.getCatalog())) {
+            String[] registeredCatalogs = 
atlasProperties.getStringArray(TRINO_CATALOG_REGISTERED);
+
+            if (registeredCatalogs != null) {
+                for (String registeredCatalog : registeredCatalogs) {
+                    if (catalogInTrino.containsKey(registeredCatalog)) {
+                        
catalogsToProcess.add(getCatalogInstance(registeredCatalog, 
catalogInTrino.get(registeredCatalog)));
+                    }
+                }
+            }
+        } else {
+            if (catalogInTrino.containsKey(context.getCatalog())) {
+                Catalog catalog = getCatalogInstance(context.getCatalog(), 
catalogInTrino.get(context.getCatalog()));
+                catalog.setSchemaToImport(context.getSchema());
+                catalog.setTableToImport(context.getTable());
+                catalogsToProcess.add(catalog);
+            }
+        }
+
+        if (CollectionUtils.isEmpty(catalogsToProcess)) {
+            LOG.warn("No catalogs found to process");
+            return;
+        } else {
+            LOG.info("{} catalogs to be extracted", 
catalogsToProcess.stream().map(Catalog::getName).collect(Collectors.toList()));
+        }
+
+        AtlasEntity.AtlasEntityWithExtInfo trinoInstanceEntity = 
AtlasClientHelper.createOrUpdateInstanceEntity(trinoNamespace);
+
+        ExecutorService catalogExecutor = 
Executors.newFixedThreadPool(Math.min(catalogsToProcess.size(), 
THREAD_POOL_SIZE));
+        List<Future<?>> futures         = new ArrayList<>();
+
+        for (Catalog currentCatalog : catalogsToProcess) {
+            futures.add(catalogExecutor.submit(() -> {
+                try {
+                    currentCatalog.setTrinoInstanceEntity(trinoInstanceEntity);
+                    processCatalog(currentCatalog);
+                } catch (Exception e) {
+                    LOG.error("Error processing catalog: {}", currentCatalog, 
e);
+                }
+            }));
+        }
+        catalogExecutor.shutdown();
+
+        try {
+            if (!catalogExecutor.awaitTermination(CATALOG_EXECUTION_TIMEOUT, 
TimeUnit.MINUTES)) {
+                LOG.warn("Catalog processing did not complete within the 
timeout. {} minutes", CATALOG_EXECUTION_TIMEOUT);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            LOG.error("Catalog processing was interrupted", e);
+        }
+
+        LOG.info("Catalogs scanned for creation/updation completed");
+    }
+
+    public void processCatalog(Catalog catalog) throws AtlasServiceException, 
SQLException {
+        if (catalog != null) {
+            LOG.info("Started extracting {} catalog:", catalog.getName());
+            String catalogName = catalog.getName();
+
+            AtlasEntity.AtlasEntityWithExtInfo trinoCatalogEntity = 
AtlasClientHelper.createOrUpdateCatalogEntity(catalog);
+
+            List<String> schemas = 
trinoClientHelper.getTrinoSchemas(catalogName, catalog.getSchemaToImport());
+            LOG.info("Found {} schema under {} catalog", schemas.size(), 
catalogName);
+
+            processSchemas(catalog, trinoCatalogEntity.getEntity(), schemas);
+
+            if (StringUtils.isEmpty(context.getSchema())) {
+                deleteSchemas(schemas, 
trinoCatalogEntity.getEntity().getGuid());
+            }
+        }
+    }
+
+    public void processSchemas(Catalog catalog, AtlasEntity 
trinoCatalogEntity, List<String> schemaToImport) {
+        for (String schemaName : schemaToImport) {
+            LOG.info("Started extracting {} schema:", schemaName);
+            try {
+                AtlasEntity.AtlasEntityWithExtInfo schemaEntity = 
AtlasClientHelper.createOrUpdateSchemaEntity(catalog, trinoCatalogEntity, 
schemaName);
+
+                Map<String, Map<String, Object>> tables = 
trinoClientHelper.getTrinoTables(catalog.getName(), schemaName, 
catalog.getTableToImport());
+                LOG.info("Found {} tables under {}.{} catalog.schema", 
tables.size(), catalog.getName(), schemaName);
+
+                processTables(catalog, schemaName, schemaEntity.getEntity(), 
tables);
+
+                if (StringUtils.isEmpty(context.getTable())) {
+                    deleteTables(new ArrayList<>(tables.keySet()), 
schemaEntity.getEntity().getGuid());
+                }
+            } catch (Exception e) {
+                LOG.error("Error processing schema: {}", schemaName);
+            }
+        }
+    }
+
+    public void processTables(Catalog catalog, String schemaName, AtlasEntity 
schemaEntity,  Map<String, Map<String, Object>> trinoTables) {
+        for (String trinoTableName : trinoTables.keySet()) {
+            LOG.info("Started extracting {} table:", trinoTableName);
+
+            try {
+                Map<String, Map<String, Object>> trinoColumns = 
trinoClientHelper.getTrinoColumns(catalog.getName(), schemaName, 
trinoTableName);
+                LOG.info("Found {} columns under {}.{}.{} 
catalog.schema.table", trinoColumns.size(), catalog.getName(), schemaName, 
trinoTableName);
+
+                AtlasClientHelper.createOrUpdateTableEntity(catalog, 
schemaName, trinoTableName, trinoTables.get(trinoTableName), trinoColumns, 
schemaEntity);
+            } catch (Exception e) {
+                LOG.error("Error processing table: {}", trinoTableName, e);
+            }
+        }
+    }
+
+    public void deleteCatalogs(ExtractorContext context, Map<String, String> 
catalogInTrino) throws AtlasServiceException {
+        if (StringUtils.isNotEmpty(context.getCatalog())) {
+            return;
+        }
+
+        AtlasEntityHeader trinoInstance = 
AtlasClientHelper.getTrinoInstance(trinoNamespace);
+        if (trinoInstance != null) {
+            Set<String> catalogsToDelete = getCatalogsToDelete(catalogInTrino, 
trinoInstance.getGuid());
+
+            if (CollectionUtils.isNotEmpty(catalogsToDelete)) {
+                LOG.info("{} non existing catalogs to be deleted", 
catalogsToDelete, trinoInstance.getGuid());
+
+                for (String catalogGuid : catalogsToDelete) {
+                    try {
+                        deleteSchemas(null, catalogGuid);
+                        
AtlasClientHelper.deleteByGuid(Collections.singleton(catalogGuid));
+                    } catch (AtlasServiceException e) {
+                        LOG.error("Error deleting catalog: {}", catalogGuid, 
e);
+                    }
+                }
+            } else {
+                LOG.info("No catalogs found to delete");
+            }
+        }
+
+        LOG.info("Catalogs scanned for deletion completed");
+    }
+
+    public void deleteSchemas(List<String> schemasInTrino, String catalogGuid) 
{

Review Comment:
   - make `deleteSchemas()` as `private` as this method is not called outside 
this class
   



##########
addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/AtlasClientHelper.java:
##########
@@ -0,0 +1,430 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.trino.client;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.model.discovery.AtlasSearchResult;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.model.instance.EntityMutations;
+import org.apache.atlas.trino.model.Catalog;
+import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.atlas.utils.AuthenticationUtil;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.atlas.type.AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME;
+
+public class AtlasClientHelper {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AtlasClientHelper.class);
+
+    public static final String TRINO_INSTANCE                            = 
"trino_instance";
+    public static final String TRINO_CATALOG                             = 
"trino_catalog";
+    public static final String TRINO_SCHEMA                              = 
"trino_schema";
+    public static final String TRINO_TABLE                               = 
"trino_table";
+    public static final String TRINO_COLUMN                              = 
"trino_column";
+    public static final String TRINO_INSTANCE_CATALOG_ATTRIBUTE          = 
"catalogs";
+    public static final String TRINO_CATALOG_SCHEMA_ATTRIBUTE            = 
"schemas";
+    public static final String TRINO_SCHEMA_TABLE_ATTRIBUTE              = 
"tables";
+    public static final String QUALIFIED_NAME_ATTRIBUTE                  = 
"qualifiedName";
+    public static final String  NAME_ATTRIBUTE                           = 
"name";
+    public static final  int    DEFAULT_PAGE_LIMIT                       = 
10000;
+    private static final String DEFAULT_ATLAS_URL                        = 
"http://localhost:21000/";;
+    private static final String APPLICATION_PROPERTY_ATLAS_ENDPOINT      = 
"atlas.rest.address";
+    private static final String TRINO_CATALOG_CONNECTOR_TYPE_ATTRIBUTE   = 
"connectorType";
+    private static final String TRINO_CATALOG_INSTANCE_ATTRIBUTE         = 
"instance";
+    private static final String TRINO_CATALOG_INSTANCE_RELATIONSHIP      = 
"trino_instance_catalog";
+    private static final String TRINO_SCHEMA_CATALOG_ATTRIBUTE           = 
"catalog";
+    private static final String TRINO_SCHEMA_CATALOG_RELATIONSHIP        = 
"trino_schema_catalog";
+    private static final String TRINO_COLUMN_DATA_TYPE_ATTRIBUTE         = 
"data_type";
+    private static final String TRINO_COLUMN_ORIDINAL_POSITION_ATTRIBUTE = 
"ordinal_position";
+    private static final String TRINO_COLUMN_COLUMN_DEFAULT_ATTRIBUTE    = 
"column_default";
+    private static final String TRINO_COLUMN_IS_NULLABLE_ATTRIBUTE       = 
"is_nullable";
+    private static final String TRINO_COLUMN_TABLE_ATTRIBUTE             = 
"table";
+    private static final String TRINO_TABLE_TYPE                         = 
"table_type";
+    private static final String TRINO_TABLE_COLUMN_RELATIONSHIP          = 
"trino_table_columns";
+    private static final String TRINO_TABLE_SCHEMA_RELATIONSHIP          = 
"trino_table_schema";
+    private static final String TRINO_TABLE_SCHEMA_ATTRIBUTE             = 
"trinoschema";
+    private static final String TRINO_TABLE_COLUMN_ATTRIBUTE             = 
"columns";
+
+    private static AtlasClientV2 atlasClientV2;
+
+    public AtlasClientHelper(Configuration atlasConf) throws IOException {
+        atlasClientV2 = getAtlasClientV2Instance(atlasConf);
+    }
+
+    public static synchronized AtlasClientV2 
getAtlasClientV2Instance(Configuration atlasConf) throws IOException {
+        if (atlasClientV2 == null) {
+            String[] atlasEndpoint = new String[] {DEFAULT_ATLAS_URL};
+
+            if (atlasConf != null && 
ArrayUtils.isNotEmpty(atlasConf.getStringArray(APPLICATION_PROPERTY_ATLAS_ENDPOINT)))
 {
+                atlasEndpoint = 
atlasConf.getStringArray(APPLICATION_PROPERTY_ATLAS_ENDPOINT);
+            }
+
+            if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) {
+                String[] basicAuthUsernamePassword = 
AuthenticationUtil.getBasicAuthenticationInput();
+                atlasClientV2 = new AtlasClientV2(atlasEndpoint, 
basicAuthUsernamePassword);
+            } else {
+                UserGroupInformation ugi = 
UserGroupInformation.getCurrentUser();
+                atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), 
atlasEndpoint);
+            }
+        }
+        return atlasClientV2;
+    }
+
+    public static List<AtlasEntityHeader> getAllCatalogsInInstance(String 
instanceGuid) throws AtlasServiceException {

Review Comment:
   Most static methods in this class can be made as non-static, by making 
AtlasClientHelper instance available to the callers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@atlas.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to