mneethiraj commented on code in PR #336: URL: https://github.com/apache/atlas/pull/336#discussion_r2279269189
########## addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorService.java: ########## @@ -0,0 +1,356 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.trino.cli; + +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.trino.client.AtlasClientHelper; +import org.apache.atlas.trino.client.TrinoClientHelper; +import org.apache.atlas.trino.model.Catalog; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class ExtractorService { + private static final Logger LOG = LoggerFactory.getLogger(ExtractorService.class); + + public static final int THREAD_POOL_SIZE = 5; + public static final int CATALOG_EXECUTION_TIMEOUT = 60; + public static final String TRINO_NAME_ATTRIBUTE = "name"; + private static final String TRINO_CATALOG_REGISTERED = "atlas.trino.catalogs.registered"; + private static final String TRINO_CATALOG_HOOK_ENABLED_PREFIX = "atlas.trino.catalog.hook.enabled."; + private static final String TRINO_CATALOG_HOOK_ENABLED_SUFFIX = ".namespace"; + private static Configuration atlasProperties; + private static TrinoClientHelper trinoClientHelper; + private static String trinoNamespace; + private ExtractorContext context; + + public boolean execute(ExtractorContext context) throws Exception { + this.context = context; + atlasProperties = context.getAtlasConf(); + trinoClientHelper = context.getTrinoConnector(); + trinoNamespace = context.getNamespace(); + + Map<String, String> catalogs = trinoClientHelper.getAllTrinoCatalogs(); + LOG.info("Found {} catalogs in Trino: {}", catalogs.size(), catalogs.keySet()); + + try { + processCatalogs(context, catalogs); + deleteCatalogs(context, catalogs); + } catch (AtlasServiceException e) { + throw new AtlasServiceException(e); + } + return true; + } + + public void processCatalogs(ExtractorContext context, Map<String, String> catalogInTrino) throws AtlasServiceException { Review Comment: - make `processCatalog()` as `private` as this method is not called outside this class ########## addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorService.java: ########## @@ -0,0 +1,356 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.trino.cli; + +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.trino.client.AtlasClientHelper; +import org.apache.atlas.trino.client.TrinoClientHelper; +import org.apache.atlas.trino.model.Catalog; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class ExtractorService { + private static final Logger LOG = LoggerFactory.getLogger(ExtractorService.class); + + public static final int THREAD_POOL_SIZE = 5; + public static final int CATALOG_EXECUTION_TIMEOUT = 60; + public static final String TRINO_NAME_ATTRIBUTE = "name"; + private static final String TRINO_CATALOG_REGISTERED = "atlas.trino.catalogs.registered"; + private static final String TRINO_CATALOG_HOOK_ENABLED_PREFIX = "atlas.trino.catalog.hook.enabled."; + private static final String TRINO_CATALOG_HOOK_ENABLED_SUFFIX = ".namespace"; + private static Configuration atlasProperties; + private static TrinoClientHelper trinoClientHelper; + private static String trinoNamespace; + private ExtractorContext context; + + public boolean execute(ExtractorContext context) throws Exception { + this.context = context; + atlasProperties = context.getAtlasConf(); + trinoClientHelper = context.getTrinoConnector(); + trinoNamespace = context.getNamespace(); + + Map<String, String> catalogs = trinoClientHelper.getAllTrinoCatalogs(); + LOG.info("Found {} catalogs in Trino: {}", catalogs.size(), catalogs.keySet()); + + try { + processCatalogs(context, catalogs); + deleteCatalogs(context, catalogs); + } catch (AtlasServiceException e) { + throw new AtlasServiceException(e); + } + return true; + } + + public void processCatalogs(ExtractorContext context, Map<String, String> catalogInTrino) throws AtlasServiceException { + if (MapUtils.isEmpty(catalogInTrino)) { + LOG.debug("No catalogs found under Trino"); + return; + } + + List<Catalog> catalogsToProcess = new ArrayList<>(); + + if (StringUtils.isEmpty(context.getCatalog())) { + String[] registeredCatalogs = atlasProperties.getStringArray(TRINO_CATALOG_REGISTERED); + + if (registeredCatalogs != null) { + for (String registeredCatalog : registeredCatalogs) { + if (catalogInTrino.containsKey(registeredCatalog)) { + catalogsToProcess.add(getCatalogInstance(registeredCatalog, catalogInTrino.get(registeredCatalog))); + } + } + } + } else { + if (catalogInTrino.containsKey(context.getCatalog())) { + Catalog catalog = getCatalogInstance(context.getCatalog(), catalogInTrino.get(context.getCatalog())); + catalog.setSchemaToImport(context.getSchema()); + catalog.setTableToImport(context.getTable()); + catalogsToProcess.add(catalog); + } + } + + if (CollectionUtils.isEmpty(catalogsToProcess)) { + LOG.warn("No catalogs found to process"); + return; + } else { + LOG.info("{} catalogs to be extracted", catalogsToProcess.stream().map(Catalog::getName).collect(Collectors.toList())); + } + + AtlasEntity.AtlasEntityWithExtInfo trinoInstanceEntity = AtlasClientHelper.createOrUpdateInstanceEntity(trinoNamespace); + + ExecutorService catalogExecutor = Executors.newFixedThreadPool(Math.min(catalogsToProcess.size(), THREAD_POOL_SIZE)); + List<Future<?>> futures = new ArrayList<>(); + + for (Catalog currentCatalog : catalogsToProcess) { + futures.add(catalogExecutor.submit(() -> { + try { + currentCatalog.setTrinoInstanceEntity(trinoInstanceEntity); + processCatalog(currentCatalog); + } catch (Exception e) { + LOG.error("Error processing catalog: {}", currentCatalog, e); + } + })); + } + catalogExecutor.shutdown(); + + try { + if (!catalogExecutor.awaitTermination(CATALOG_EXECUTION_TIMEOUT, TimeUnit.MINUTES)) { + LOG.warn("Catalog processing did not complete within the timeout. {} minutes", CATALOG_EXECUTION_TIMEOUT); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Catalog processing was interrupted", e); + } + + LOG.info("Catalogs scanned for creation/updation completed"); + } + + public void processCatalog(Catalog catalog) throws AtlasServiceException, SQLException { Review Comment: - make `processCatalog()` as `private` as this method is not called outside this class ########## addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorService.java: ########## @@ -0,0 +1,356 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.trino.cli; + +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.trino.client.AtlasClientHelper; +import org.apache.atlas.trino.client.TrinoClientHelper; +import org.apache.atlas.trino.model.Catalog; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class ExtractorService { + private static final Logger LOG = LoggerFactory.getLogger(ExtractorService.class); + + public static final int THREAD_POOL_SIZE = 5; + public static final int CATALOG_EXECUTION_TIMEOUT = 60; + public static final String TRINO_NAME_ATTRIBUTE = "name"; + private static final String TRINO_CATALOG_REGISTERED = "atlas.trino.catalogs.registered"; + private static final String TRINO_CATALOG_HOOK_ENABLED_PREFIX = "atlas.trino.catalog.hook.enabled."; + private static final String TRINO_CATALOG_HOOK_ENABLED_SUFFIX = ".namespace"; + private static Configuration atlasProperties; Review Comment: - Following static/instance members derive value from the parameter passed to `execute()` method, hence their state shouldn't be preserved beyond a call to `execute()`. Please replace these members with appropriate methods on `context` passed to `execute()` method: - `atlasProperties` - `trinoClientHelper` - `trinoNamespace` - `context` ########## addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorService.java: ########## @@ -0,0 +1,356 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.trino.cli; + +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.trino.client.AtlasClientHelper; +import org.apache.atlas.trino.client.TrinoClientHelper; +import org.apache.atlas.trino.model.Catalog; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class ExtractorService { + private static final Logger LOG = LoggerFactory.getLogger(ExtractorService.class); + + public static final int THREAD_POOL_SIZE = 5; + public static final int CATALOG_EXECUTION_TIMEOUT = 60; + public static final String TRINO_NAME_ATTRIBUTE = "name"; + private static final String TRINO_CATALOG_REGISTERED = "atlas.trino.catalogs.registered"; + private static final String TRINO_CATALOG_HOOK_ENABLED_PREFIX = "atlas.trino.catalog.hook.enabled."; + private static final String TRINO_CATALOG_HOOK_ENABLED_SUFFIX = ".namespace"; + private static Configuration atlasProperties; + private static TrinoClientHelper trinoClientHelper; + private static String trinoNamespace; + private ExtractorContext context; + + public boolean execute(ExtractorContext context) throws Exception { + this.context = context; + atlasProperties = context.getAtlasConf(); + trinoClientHelper = context.getTrinoConnector(); + trinoNamespace = context.getNamespace(); + + Map<String, String> catalogs = trinoClientHelper.getAllTrinoCatalogs(); + LOG.info("Found {} catalogs in Trino: {}", catalogs.size(), catalogs.keySet()); + + try { + processCatalogs(context, catalogs); + deleteCatalogs(context, catalogs); + } catch (AtlasServiceException e) { + throw new AtlasServiceException(e); + } + return true; + } + + public void processCatalogs(ExtractorContext context, Map<String, String> catalogInTrino) throws AtlasServiceException { + if (MapUtils.isEmpty(catalogInTrino)) { + LOG.debug("No catalogs found under Trino"); + return; + } + + List<Catalog> catalogsToProcess = new ArrayList<>(); + + if (StringUtils.isEmpty(context.getCatalog())) { + String[] registeredCatalogs = atlasProperties.getStringArray(TRINO_CATALOG_REGISTERED); + + if (registeredCatalogs != null) { + for (String registeredCatalog : registeredCatalogs) { + if (catalogInTrino.containsKey(registeredCatalog)) { + catalogsToProcess.add(getCatalogInstance(registeredCatalog, catalogInTrino.get(registeredCatalog))); + } + } + } + } else { + if (catalogInTrino.containsKey(context.getCatalog())) { + Catalog catalog = getCatalogInstance(context.getCatalog(), catalogInTrino.get(context.getCatalog())); + catalog.setSchemaToImport(context.getSchema()); + catalog.setTableToImport(context.getTable()); + catalogsToProcess.add(catalog); + } + } + + if (CollectionUtils.isEmpty(catalogsToProcess)) { + LOG.warn("No catalogs found to process"); + return; + } else { + LOG.info("{} catalogs to be extracted", catalogsToProcess.stream().map(Catalog::getName).collect(Collectors.toList())); + } + + AtlasEntity.AtlasEntityWithExtInfo trinoInstanceEntity = AtlasClientHelper.createOrUpdateInstanceEntity(trinoNamespace); + + ExecutorService catalogExecutor = Executors.newFixedThreadPool(Math.min(catalogsToProcess.size(), THREAD_POOL_SIZE)); + List<Future<?>> futures = new ArrayList<>(); + + for (Catalog currentCatalog : catalogsToProcess) { + futures.add(catalogExecutor.submit(() -> { + try { + currentCatalog.setTrinoInstanceEntity(trinoInstanceEntity); + processCatalog(currentCatalog); + } catch (Exception e) { + LOG.error("Error processing catalog: {}", currentCatalog, e); + } + })); + } + catalogExecutor.shutdown(); + + try { + if (!catalogExecutor.awaitTermination(CATALOG_EXECUTION_TIMEOUT, TimeUnit.MINUTES)) { + LOG.warn("Catalog processing did not complete within the timeout. {} minutes", CATALOG_EXECUTION_TIMEOUT); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Catalog processing was interrupted", e); + } + + LOG.info("Catalogs scanned for creation/updation completed"); + } + + public void processCatalog(Catalog catalog) throws AtlasServiceException, SQLException { + if (catalog != null) { + LOG.info("Started extracting {} catalog:", catalog.getName()); + String catalogName = catalog.getName(); + + AtlasEntity.AtlasEntityWithExtInfo trinoCatalogEntity = AtlasClientHelper.createOrUpdateCatalogEntity(catalog); + + List<String> schemas = trinoClientHelper.getTrinoSchemas(catalogName, catalog.getSchemaToImport()); + LOG.info("Found {} schema under {} catalog", schemas.size(), catalogName); + + processSchemas(catalog, trinoCatalogEntity.getEntity(), schemas); + + if (StringUtils.isEmpty(context.getSchema())) { + deleteSchemas(schemas, trinoCatalogEntity.getEntity().getGuid()); + } + } + } + + public void processSchemas(Catalog catalog, AtlasEntity trinoCatalogEntity, List<String> schemaToImport) { + for (String schemaName : schemaToImport) { + LOG.info("Started extracting {} schema:", schemaName); + try { + AtlasEntity.AtlasEntityWithExtInfo schemaEntity = AtlasClientHelper.createOrUpdateSchemaEntity(catalog, trinoCatalogEntity, schemaName); + + Map<String, Map<String, Object>> tables = trinoClientHelper.getTrinoTables(catalog.getName(), schemaName, catalog.getTableToImport()); + LOG.info("Found {} tables under {}.{} catalog.schema", tables.size(), catalog.getName(), schemaName); + + processTables(catalog, schemaName, schemaEntity.getEntity(), tables); + + if (StringUtils.isEmpty(context.getTable())) { + deleteTables(new ArrayList<>(tables.keySet()), schemaEntity.getEntity().getGuid()); + } + } catch (Exception e) { + LOG.error("Error processing schema: {}", schemaName); + } + } + } + + public void processTables(Catalog catalog, String schemaName, AtlasEntity schemaEntity, Map<String, Map<String, Object>> trinoTables) { + for (String trinoTableName : trinoTables.keySet()) { + LOG.info("Started extracting {} table:", trinoTableName); + + try { + Map<String, Map<String, Object>> trinoColumns = trinoClientHelper.getTrinoColumns(catalog.getName(), schemaName, trinoTableName); + LOG.info("Found {} columns under {}.{}.{} catalog.schema.table", trinoColumns.size(), catalog.getName(), schemaName, trinoTableName); + + AtlasClientHelper.createOrUpdateTableEntity(catalog, schemaName, trinoTableName, trinoTables.get(trinoTableName), trinoColumns, schemaEntity); + } catch (Exception e) { + LOG.error("Error processing table: {}", trinoTableName, e); + } + } + } + + public void deleteCatalogs(ExtractorContext context, Map<String, String> catalogInTrino) throws AtlasServiceException { Review Comment: - make `deleteCatalogs()` as `private` as this method is not called outside this class ########## addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorService.java: ########## @@ -0,0 +1,356 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.trino.cli; + +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.trino.client.AtlasClientHelper; +import org.apache.atlas.trino.client.TrinoClientHelper; +import org.apache.atlas.trino.model.Catalog; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class ExtractorService { + private static final Logger LOG = LoggerFactory.getLogger(ExtractorService.class); + + public static final int THREAD_POOL_SIZE = 5; + public static final int CATALOG_EXECUTION_TIMEOUT = 60; + public static final String TRINO_NAME_ATTRIBUTE = "name"; + private static final String TRINO_CATALOG_REGISTERED = "atlas.trino.catalogs.registered"; + private static final String TRINO_CATALOG_HOOK_ENABLED_PREFIX = "atlas.trino.catalog.hook.enabled."; + private static final String TRINO_CATALOG_HOOK_ENABLED_SUFFIX = ".namespace"; + private static Configuration atlasProperties; + private static TrinoClientHelper trinoClientHelper; + private static String trinoNamespace; + private ExtractorContext context; + + public boolean execute(ExtractorContext context) throws Exception { + this.context = context; + atlasProperties = context.getAtlasConf(); + trinoClientHelper = context.getTrinoConnector(); + trinoNamespace = context.getNamespace(); + + Map<String, String> catalogs = trinoClientHelper.getAllTrinoCatalogs(); + LOG.info("Found {} catalogs in Trino: {}", catalogs.size(), catalogs.keySet()); + + try { + processCatalogs(context, catalogs); + deleteCatalogs(context, catalogs); + } catch (AtlasServiceException e) { + throw new AtlasServiceException(e); + } + return true; + } + + public void processCatalogs(ExtractorContext context, Map<String, String> catalogInTrino) throws AtlasServiceException { + if (MapUtils.isEmpty(catalogInTrino)) { + LOG.debug("No catalogs found under Trino"); + return; + } + + List<Catalog> catalogsToProcess = new ArrayList<>(); + + if (StringUtils.isEmpty(context.getCatalog())) { + String[] registeredCatalogs = atlasProperties.getStringArray(TRINO_CATALOG_REGISTERED); + + if (registeredCatalogs != null) { + for (String registeredCatalog : registeredCatalogs) { + if (catalogInTrino.containsKey(registeredCatalog)) { + catalogsToProcess.add(getCatalogInstance(registeredCatalog, catalogInTrino.get(registeredCatalog))); + } + } + } + } else { + if (catalogInTrino.containsKey(context.getCatalog())) { + Catalog catalog = getCatalogInstance(context.getCatalog(), catalogInTrino.get(context.getCatalog())); + catalog.setSchemaToImport(context.getSchema()); + catalog.setTableToImport(context.getTable()); + catalogsToProcess.add(catalog); + } + } + + if (CollectionUtils.isEmpty(catalogsToProcess)) { + LOG.warn("No catalogs found to process"); + return; + } else { + LOG.info("{} catalogs to be extracted", catalogsToProcess.stream().map(Catalog::getName).collect(Collectors.toList())); + } + + AtlasEntity.AtlasEntityWithExtInfo trinoInstanceEntity = AtlasClientHelper.createOrUpdateInstanceEntity(trinoNamespace); + + ExecutorService catalogExecutor = Executors.newFixedThreadPool(Math.min(catalogsToProcess.size(), THREAD_POOL_SIZE)); + List<Future<?>> futures = new ArrayList<>(); + + for (Catalog currentCatalog : catalogsToProcess) { + futures.add(catalogExecutor.submit(() -> { + try { + currentCatalog.setTrinoInstanceEntity(trinoInstanceEntity); + processCatalog(currentCatalog); + } catch (Exception e) { + LOG.error("Error processing catalog: {}", currentCatalog, e); + } + })); + } + catalogExecutor.shutdown(); + + try { + if (!catalogExecutor.awaitTermination(CATALOG_EXECUTION_TIMEOUT, TimeUnit.MINUTES)) { + LOG.warn("Catalog processing did not complete within the timeout. {} minutes", CATALOG_EXECUTION_TIMEOUT); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Catalog processing was interrupted", e); + } + + LOG.info("Catalogs scanned for creation/updation completed"); + } + + public void processCatalog(Catalog catalog) throws AtlasServiceException, SQLException { + if (catalog != null) { + LOG.info("Started extracting {} catalog:", catalog.getName()); + String catalogName = catalog.getName(); + + AtlasEntity.AtlasEntityWithExtInfo trinoCatalogEntity = AtlasClientHelper.createOrUpdateCatalogEntity(catalog); + + List<String> schemas = trinoClientHelper.getTrinoSchemas(catalogName, catalog.getSchemaToImport()); + LOG.info("Found {} schema under {} catalog", schemas.size(), catalogName); + + processSchemas(catalog, trinoCatalogEntity.getEntity(), schemas); + + if (StringUtils.isEmpty(context.getSchema())) { + deleteSchemas(schemas, trinoCatalogEntity.getEntity().getGuid()); + } + } + } + + public void processSchemas(Catalog catalog, AtlasEntity trinoCatalogEntity, List<String> schemaToImport) { + for (String schemaName : schemaToImport) { + LOG.info("Started extracting {} schema:", schemaName); + try { + AtlasEntity.AtlasEntityWithExtInfo schemaEntity = AtlasClientHelper.createOrUpdateSchemaEntity(catalog, trinoCatalogEntity, schemaName); + + Map<String, Map<String, Object>> tables = trinoClientHelper.getTrinoTables(catalog.getName(), schemaName, catalog.getTableToImport()); + LOG.info("Found {} tables under {}.{} catalog.schema", tables.size(), catalog.getName(), schemaName); + + processTables(catalog, schemaName, schemaEntity.getEntity(), tables); + + if (StringUtils.isEmpty(context.getTable())) { + deleteTables(new ArrayList<>(tables.keySet()), schemaEntity.getEntity().getGuid()); + } + } catch (Exception e) { + LOG.error("Error processing schema: {}", schemaName); + } + } + } + + public void processTables(Catalog catalog, String schemaName, AtlasEntity schemaEntity, Map<String, Map<String, Object>> trinoTables) { Review Comment: - make `processTables()` as `private` as this method is not called outside this class ########## addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorContext.java: ########## @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.trino.cli; + +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasException; +import org.apache.atlas.trino.client.AtlasClientHelper; +import org.apache.atlas.trino.client.TrinoClientHelper; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.configuration.Configuration; + +import java.io.IOException; + +public class ExtractorContext { + + static final String TRINO_NAMESPACE_CONF = "atlas.trino.namespace"; + static final String DEFAULT_TRINO_NAMESPACE = "cm"; + static final String OPTION_CATALOG_SHORT = "c"; + static final String OPTION_CATALOG_LONG = "catalog"; + static final String OPTION_SCHEMA_SHORT = "s"; + static final String OPTION_SCHEMA_LONG = "schema"; + static final String OPTION_TABLE_SHORT = "t"; + static final String OPTION_TABLE_LONG = "table"; + static final String OPTION_CRON_EXPRESSION_SHORT = "cx"; + static final String OPTION_CRON_EXPRESSION_LONG = "cronExpression"; + static final String OPTION_HELP_SHORT = "h"; + static final String OPTION_HELP_LONG = "help"; + private final Configuration atlasConf; + private String namespace; Review Comment: All members can be made `final`. ########## addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorService.java: ########## @@ -0,0 +1,356 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.trino.cli; + +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.trino.client.AtlasClientHelper; +import org.apache.atlas.trino.client.TrinoClientHelper; +import org.apache.atlas.trino.model.Catalog; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class ExtractorService { + private static final Logger LOG = LoggerFactory.getLogger(ExtractorService.class); + + public static final int THREAD_POOL_SIZE = 5; + public static final int CATALOG_EXECUTION_TIMEOUT = 60; + public static final String TRINO_NAME_ATTRIBUTE = "name"; + private static final String TRINO_CATALOG_REGISTERED = "atlas.trino.catalogs.registered"; + private static final String TRINO_CATALOG_HOOK_ENABLED_PREFIX = "atlas.trino.catalog.hook.enabled."; + private static final String TRINO_CATALOG_HOOK_ENABLED_SUFFIX = ".namespace"; + private static Configuration atlasProperties; + private static TrinoClientHelper trinoClientHelper; + private static String trinoNamespace; + private ExtractorContext context; + + public boolean execute(ExtractorContext context) throws Exception { + this.context = context; + atlasProperties = context.getAtlasConf(); + trinoClientHelper = context.getTrinoConnector(); + trinoNamespace = context.getNamespace(); + + Map<String, String> catalogs = trinoClientHelper.getAllTrinoCatalogs(); + LOG.info("Found {} catalogs in Trino: {}", catalogs.size(), catalogs.keySet()); + + try { + processCatalogs(context, catalogs); + deleteCatalogs(context, catalogs); + } catch (AtlasServiceException e) { + throw new AtlasServiceException(e); + } + return true; + } + + public void processCatalogs(ExtractorContext context, Map<String, String> catalogInTrino) throws AtlasServiceException { + if (MapUtils.isEmpty(catalogInTrino)) { + LOG.debug("No catalogs found under Trino"); + return; + } + + List<Catalog> catalogsToProcess = new ArrayList<>(); + + if (StringUtils.isEmpty(context.getCatalog())) { + String[] registeredCatalogs = atlasProperties.getStringArray(TRINO_CATALOG_REGISTERED); + + if (registeredCatalogs != null) { + for (String registeredCatalog : registeredCatalogs) { + if (catalogInTrino.containsKey(registeredCatalog)) { + catalogsToProcess.add(getCatalogInstance(registeredCatalog, catalogInTrino.get(registeredCatalog))); + } + } + } + } else { + if (catalogInTrino.containsKey(context.getCatalog())) { + Catalog catalog = getCatalogInstance(context.getCatalog(), catalogInTrino.get(context.getCatalog())); + catalog.setSchemaToImport(context.getSchema()); + catalog.setTableToImport(context.getTable()); + catalogsToProcess.add(catalog); + } + } + + if (CollectionUtils.isEmpty(catalogsToProcess)) { + LOG.warn("No catalogs found to process"); + return; + } else { + LOG.info("{} catalogs to be extracted", catalogsToProcess.stream().map(Catalog::getName).collect(Collectors.toList())); + } + + AtlasEntity.AtlasEntityWithExtInfo trinoInstanceEntity = AtlasClientHelper.createOrUpdateInstanceEntity(trinoNamespace); + + ExecutorService catalogExecutor = Executors.newFixedThreadPool(Math.min(catalogsToProcess.size(), THREAD_POOL_SIZE)); + List<Future<?>> futures = new ArrayList<>(); + + for (Catalog currentCatalog : catalogsToProcess) { + futures.add(catalogExecutor.submit(() -> { + try { + currentCatalog.setTrinoInstanceEntity(trinoInstanceEntity); + processCatalog(currentCatalog); + } catch (Exception e) { + LOG.error("Error processing catalog: {}", currentCatalog, e); + } + })); + } + catalogExecutor.shutdown(); + + try { + if (!catalogExecutor.awaitTermination(CATALOG_EXECUTION_TIMEOUT, TimeUnit.MINUTES)) { + LOG.warn("Catalog processing did not complete within the timeout. {} minutes", CATALOG_EXECUTION_TIMEOUT); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Catalog processing was interrupted", e); + } + + LOG.info("Catalogs scanned for creation/updation completed"); + } + + public void processCatalog(Catalog catalog) throws AtlasServiceException, SQLException { + if (catalog != null) { + LOG.info("Started extracting {} catalog:", catalog.getName()); + String catalogName = catalog.getName(); + + AtlasEntity.AtlasEntityWithExtInfo trinoCatalogEntity = AtlasClientHelper.createOrUpdateCatalogEntity(catalog); + + List<String> schemas = trinoClientHelper.getTrinoSchemas(catalogName, catalog.getSchemaToImport()); + LOG.info("Found {} schema under {} catalog", schemas.size(), catalogName); + + processSchemas(catalog, trinoCatalogEntity.getEntity(), schemas); + + if (StringUtils.isEmpty(context.getSchema())) { + deleteSchemas(schemas, trinoCatalogEntity.getEntity().getGuid()); + } + } + } + + public void processSchemas(Catalog catalog, AtlasEntity trinoCatalogEntity, List<String> schemaToImport) { Review Comment: - make `processSchemas()` as `private` as this method is not called outside this class ########## addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/AtlasClientHelper.java: ########## @@ -0,0 +1,430 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.trino.client; + +import com.sun.jersey.api.client.ClientResponse; +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.model.discovery.AtlasSearchResult; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.model.instance.EntityMutations; +import org.apache.atlas.trino.model.Catalog; +import org.apache.atlas.type.AtlasTypeUtil; +import org.apache.atlas.utils.AuthenticationUtil; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.ArrayUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.atlas.type.AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME; + +public class AtlasClientHelper { + private static final Logger LOG = LoggerFactory.getLogger(AtlasClientHelper.class); + + public static final String TRINO_INSTANCE = "trino_instance"; + public static final String TRINO_CATALOG = "trino_catalog"; + public static final String TRINO_SCHEMA = "trino_schema"; + public static final String TRINO_TABLE = "trino_table"; + public static final String TRINO_COLUMN = "trino_column"; + public static final String TRINO_INSTANCE_CATALOG_ATTRIBUTE = "catalogs"; + public static final String TRINO_CATALOG_SCHEMA_ATTRIBUTE = "schemas"; + public static final String TRINO_SCHEMA_TABLE_ATTRIBUTE = "tables"; + public static final String QUALIFIED_NAME_ATTRIBUTE = "qualifiedName"; + public static final String NAME_ATTRIBUTE = "name"; + public static final int DEFAULT_PAGE_LIMIT = 10000; + private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/"; + private static final String APPLICATION_PROPERTY_ATLAS_ENDPOINT = "atlas.rest.address"; + private static final String TRINO_CATALOG_CONNECTOR_TYPE_ATTRIBUTE = "connectorType"; + private static final String TRINO_CATALOG_INSTANCE_ATTRIBUTE = "instance"; + private static final String TRINO_CATALOG_INSTANCE_RELATIONSHIP = "trino_instance_catalog"; + private static final String TRINO_SCHEMA_CATALOG_ATTRIBUTE = "catalog"; + private static final String TRINO_SCHEMA_CATALOG_RELATIONSHIP = "trino_schema_catalog"; + private static final String TRINO_COLUMN_DATA_TYPE_ATTRIBUTE = "data_type"; + private static final String TRINO_COLUMN_ORIDINAL_POSITION_ATTRIBUTE = "ordinal_position"; + private static final String TRINO_COLUMN_COLUMN_DEFAULT_ATTRIBUTE = "column_default"; + private static final String TRINO_COLUMN_IS_NULLABLE_ATTRIBUTE = "is_nullable"; + private static final String TRINO_COLUMN_TABLE_ATTRIBUTE = "table"; + private static final String TRINO_TABLE_TYPE = "table_type"; + private static final String TRINO_TABLE_COLUMN_RELATIONSHIP = "trino_table_columns"; + private static final String TRINO_TABLE_SCHEMA_RELATIONSHIP = "trino_table_schema"; + private static final String TRINO_TABLE_SCHEMA_ATTRIBUTE = "trinoschema"; + private static final String TRINO_TABLE_COLUMN_ATTRIBUTE = "columns"; + + private static AtlasClientV2 atlasClientV2; + + public AtlasClientHelper(Configuration atlasConf) throws IOException { + atlasClientV2 = getAtlasClientV2Instance(atlasConf); + } + + public static synchronized AtlasClientV2 getAtlasClientV2Instance(Configuration atlasConf) throws IOException { Review Comment: `getAtlasClientV2Instance()` is not called from outside this class. Consider marking this as `private`. ########## addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorService.java: ########## @@ -0,0 +1,356 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.trino.cli; + +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.trino.client.AtlasClientHelper; +import org.apache.atlas.trino.client.TrinoClientHelper; +import org.apache.atlas.trino.model.Catalog; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class ExtractorService { + private static final Logger LOG = LoggerFactory.getLogger(ExtractorService.class); + + public static final int THREAD_POOL_SIZE = 5; + public static final int CATALOG_EXECUTION_TIMEOUT = 60; + public static final String TRINO_NAME_ATTRIBUTE = "name"; + private static final String TRINO_CATALOG_REGISTERED = "atlas.trino.catalogs.registered"; + private static final String TRINO_CATALOG_HOOK_ENABLED_PREFIX = "atlas.trino.catalog.hook.enabled."; + private static final String TRINO_CATALOG_HOOK_ENABLED_SUFFIX = ".namespace"; + private static Configuration atlasProperties; + private static TrinoClientHelper trinoClientHelper; + private static String trinoNamespace; + private ExtractorContext context; + + public boolean execute(ExtractorContext context) throws Exception { + this.context = context; + atlasProperties = context.getAtlasConf(); + trinoClientHelper = context.getTrinoConnector(); + trinoNamespace = context.getNamespace(); + + Map<String, String> catalogs = trinoClientHelper.getAllTrinoCatalogs(); + LOG.info("Found {} catalogs in Trino: {}", catalogs.size(), catalogs.keySet()); + + try { + processCatalogs(context, catalogs); + deleteCatalogs(context, catalogs); + } catch (AtlasServiceException e) { + throw new AtlasServiceException(e); + } + return true; + } + + public void processCatalogs(ExtractorContext context, Map<String, String> catalogInTrino) throws AtlasServiceException { + if (MapUtils.isEmpty(catalogInTrino)) { + LOG.debug("No catalogs found under Trino"); + return; + } + + List<Catalog> catalogsToProcess = new ArrayList<>(); + + if (StringUtils.isEmpty(context.getCatalog())) { + String[] registeredCatalogs = atlasProperties.getStringArray(TRINO_CATALOG_REGISTERED); + + if (registeredCatalogs != null) { + for (String registeredCatalog : registeredCatalogs) { + if (catalogInTrino.containsKey(registeredCatalog)) { + catalogsToProcess.add(getCatalogInstance(registeredCatalog, catalogInTrino.get(registeredCatalog))); + } + } + } + } else { + if (catalogInTrino.containsKey(context.getCatalog())) { + Catalog catalog = getCatalogInstance(context.getCatalog(), catalogInTrino.get(context.getCatalog())); + catalog.setSchemaToImport(context.getSchema()); + catalog.setTableToImport(context.getTable()); + catalogsToProcess.add(catalog); + } + } + + if (CollectionUtils.isEmpty(catalogsToProcess)) { + LOG.warn("No catalogs found to process"); + return; + } else { + LOG.info("{} catalogs to be extracted", catalogsToProcess.stream().map(Catalog::getName).collect(Collectors.toList())); + } + + AtlasEntity.AtlasEntityWithExtInfo trinoInstanceEntity = AtlasClientHelper.createOrUpdateInstanceEntity(trinoNamespace); + + ExecutorService catalogExecutor = Executors.newFixedThreadPool(Math.min(catalogsToProcess.size(), THREAD_POOL_SIZE)); + List<Future<?>> futures = new ArrayList<>(); + + for (Catalog currentCatalog : catalogsToProcess) { + futures.add(catalogExecutor.submit(() -> { + try { + currentCatalog.setTrinoInstanceEntity(trinoInstanceEntity); + processCatalog(currentCatalog); + } catch (Exception e) { + LOG.error("Error processing catalog: {}", currentCatalog, e); + } + })); + } + catalogExecutor.shutdown(); + + try { + if (!catalogExecutor.awaitTermination(CATALOG_EXECUTION_TIMEOUT, TimeUnit.MINUTES)) { + LOG.warn("Catalog processing did not complete within the timeout. {} minutes", CATALOG_EXECUTION_TIMEOUT); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Catalog processing was interrupted", e); + } + + LOG.info("Catalogs scanned for creation/updation completed"); + } + + public void processCatalog(Catalog catalog) throws AtlasServiceException, SQLException { + if (catalog != null) { + LOG.info("Started extracting {} catalog:", catalog.getName()); + String catalogName = catalog.getName(); + + AtlasEntity.AtlasEntityWithExtInfo trinoCatalogEntity = AtlasClientHelper.createOrUpdateCatalogEntity(catalog); + + List<String> schemas = trinoClientHelper.getTrinoSchemas(catalogName, catalog.getSchemaToImport()); + LOG.info("Found {} schema under {} catalog", schemas.size(), catalogName); + + processSchemas(catalog, trinoCatalogEntity.getEntity(), schemas); + + if (StringUtils.isEmpty(context.getSchema())) { + deleteSchemas(schemas, trinoCatalogEntity.getEntity().getGuid()); + } + } + } + + public void processSchemas(Catalog catalog, AtlasEntity trinoCatalogEntity, List<String> schemaToImport) { + for (String schemaName : schemaToImport) { + LOG.info("Started extracting {} schema:", schemaName); + try { + AtlasEntity.AtlasEntityWithExtInfo schemaEntity = AtlasClientHelper.createOrUpdateSchemaEntity(catalog, trinoCatalogEntity, schemaName); + + Map<String, Map<String, Object>> tables = trinoClientHelper.getTrinoTables(catalog.getName(), schemaName, catalog.getTableToImport()); + LOG.info("Found {} tables under {}.{} catalog.schema", tables.size(), catalog.getName(), schemaName); + + processTables(catalog, schemaName, schemaEntity.getEntity(), tables); + + if (StringUtils.isEmpty(context.getTable())) { + deleteTables(new ArrayList<>(tables.keySet()), schemaEntity.getEntity().getGuid()); + } + } catch (Exception e) { + LOG.error("Error processing schema: {}", schemaName); + } + } + } + + public void processTables(Catalog catalog, String schemaName, AtlasEntity schemaEntity, Map<String, Map<String, Object>> trinoTables) { + for (String trinoTableName : trinoTables.keySet()) { + LOG.info("Started extracting {} table:", trinoTableName); + + try { + Map<String, Map<String, Object>> trinoColumns = trinoClientHelper.getTrinoColumns(catalog.getName(), schemaName, trinoTableName); + LOG.info("Found {} columns under {}.{}.{} catalog.schema.table", trinoColumns.size(), catalog.getName(), schemaName, trinoTableName); + + AtlasClientHelper.createOrUpdateTableEntity(catalog, schemaName, trinoTableName, trinoTables.get(trinoTableName), trinoColumns, schemaEntity); + } catch (Exception e) { + LOG.error("Error processing table: {}", trinoTableName, e); + } + } + } + + public void deleteCatalogs(ExtractorContext context, Map<String, String> catalogInTrino) throws AtlasServiceException { + if (StringUtils.isNotEmpty(context.getCatalog())) { + return; + } + + AtlasEntityHeader trinoInstance = AtlasClientHelper.getTrinoInstance(trinoNamespace); + if (trinoInstance != null) { + Set<String> catalogsToDelete = getCatalogsToDelete(catalogInTrino, trinoInstance.getGuid()); + + if (CollectionUtils.isNotEmpty(catalogsToDelete)) { + LOG.info("{} non existing catalogs to be deleted", catalogsToDelete, trinoInstance.getGuid()); + + for (String catalogGuid : catalogsToDelete) { + try { + deleteSchemas(null, catalogGuid); + AtlasClientHelper.deleteByGuid(Collections.singleton(catalogGuid)); + } catch (AtlasServiceException e) { + LOG.error("Error deleting catalog: {}", catalogGuid, e); + } + } + } else { + LOG.info("No catalogs found to delete"); + } + } + + LOG.info("Catalogs scanned for deletion completed"); + } + + public void deleteSchemas(List<String> schemasInTrino, String catalogGuid) { Review Comment: - make `deleteSchemas()` as `private` as this method is not called outside this class ########## addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/AtlasClientHelper.java: ########## @@ -0,0 +1,430 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.trino.client; + +import com.sun.jersey.api.client.ClientResponse; +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.model.discovery.AtlasSearchResult; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.model.instance.EntityMutations; +import org.apache.atlas.trino.model.Catalog; +import org.apache.atlas.type.AtlasTypeUtil; +import org.apache.atlas.utils.AuthenticationUtil; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.ArrayUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.atlas.type.AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME; + +public class AtlasClientHelper { + private static final Logger LOG = LoggerFactory.getLogger(AtlasClientHelper.class); + + public static final String TRINO_INSTANCE = "trino_instance"; + public static final String TRINO_CATALOG = "trino_catalog"; + public static final String TRINO_SCHEMA = "trino_schema"; + public static final String TRINO_TABLE = "trino_table"; + public static final String TRINO_COLUMN = "trino_column"; + public static final String TRINO_INSTANCE_CATALOG_ATTRIBUTE = "catalogs"; + public static final String TRINO_CATALOG_SCHEMA_ATTRIBUTE = "schemas"; + public static final String TRINO_SCHEMA_TABLE_ATTRIBUTE = "tables"; + public static final String QUALIFIED_NAME_ATTRIBUTE = "qualifiedName"; + public static final String NAME_ATTRIBUTE = "name"; + public static final int DEFAULT_PAGE_LIMIT = 10000; + private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/"; + private static final String APPLICATION_PROPERTY_ATLAS_ENDPOINT = "atlas.rest.address"; + private static final String TRINO_CATALOG_CONNECTOR_TYPE_ATTRIBUTE = "connectorType"; + private static final String TRINO_CATALOG_INSTANCE_ATTRIBUTE = "instance"; + private static final String TRINO_CATALOG_INSTANCE_RELATIONSHIP = "trino_instance_catalog"; + private static final String TRINO_SCHEMA_CATALOG_ATTRIBUTE = "catalog"; + private static final String TRINO_SCHEMA_CATALOG_RELATIONSHIP = "trino_schema_catalog"; + private static final String TRINO_COLUMN_DATA_TYPE_ATTRIBUTE = "data_type"; + private static final String TRINO_COLUMN_ORIDINAL_POSITION_ATTRIBUTE = "ordinal_position"; + private static final String TRINO_COLUMN_COLUMN_DEFAULT_ATTRIBUTE = "column_default"; + private static final String TRINO_COLUMN_IS_NULLABLE_ATTRIBUTE = "is_nullable"; + private static final String TRINO_COLUMN_TABLE_ATTRIBUTE = "table"; + private static final String TRINO_TABLE_TYPE = "table_type"; + private static final String TRINO_TABLE_COLUMN_RELATIONSHIP = "trino_table_columns"; + private static final String TRINO_TABLE_SCHEMA_RELATIONSHIP = "trino_table_schema"; + private static final String TRINO_TABLE_SCHEMA_ATTRIBUTE = "trinoschema"; + private static final String TRINO_TABLE_COLUMN_ATTRIBUTE = "columns"; + + private static AtlasClientV2 atlasClientV2; + + public AtlasClientHelper(Configuration atlasConf) throws IOException { + atlasClientV2 = getAtlasClientV2Instance(atlasConf); + } + + public static synchronized AtlasClientV2 getAtlasClientV2Instance(Configuration atlasConf) throws IOException { + if (atlasClientV2 == null) { + String[] atlasEndpoint = new String[] {DEFAULT_ATLAS_URL}; + + if (atlasConf != null && ArrayUtils.isNotEmpty(atlasConf.getStringArray(APPLICATION_PROPERTY_ATLAS_ENDPOINT))) { + atlasEndpoint = atlasConf.getStringArray(APPLICATION_PROPERTY_ATLAS_ENDPOINT); + } + + if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) { + String[] basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput(); + atlasClientV2 = new AtlasClientV2(atlasEndpoint, basicAuthUsernamePassword); + } else { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), atlasEndpoint); + } + } + return atlasClientV2; + } + + public static List<AtlasEntityHeader> getAllCatalogsInInstance(String instanceGuid) throws AtlasServiceException { Review Comment: Most static methods in this class can be made as non-static, by making AtlasClientHelper instance available to the callers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@atlas.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org