Samrat002 commented on code in PR #47: URL: https://github.com/apache/flink-connector-aws/pull/47#discussion_r1719542058
########## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java: ########## @@ -0,0 +1,1115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.aws.util.AWSClientUtil; +import org.apache.flink.connector.aws.util.AWSGeneralUtil; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionImpl; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogView; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedCatalogView; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.exceptions.TablePartitionedException; +import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants; +import org.apache.flink.table.catalog.glue.operator.GlueDatabaseOperator; +import org.apache.flink.table.catalog.glue.operator.GlueFunctionOperator; +import org.apache.flink.table.catalog.glue.operator.GluePartitionOperator; +import org.apache.flink.table.catalog.glue.operator.GlueTableOperator; +import org.apache.flink.table.catalog.glue.util.GlueUtils; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.functions.FunctionIdentifier; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.GetTablesRequest; +import software.amazon.awssdk.services.glue.model.GetTablesResponse; +import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsRequest; +import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsResponse; +import software.amazon.awssdk.services.glue.model.GlueException; +import software.amazon.awssdk.services.glue.model.Partition; +import software.amazon.awssdk.services.glue.model.Table; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly; + +/** Glue catalog implementation that uses AWS Glue Data Catalog as persistence at backend. */ +@PublicEvolving +public class GlueCatalog extends AbstractCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(GlueCatalog.class); + + /** instance of GlueOperator to facilitate glue related actions. */ + public GlueDatabaseOperator glueDatabaseOperator; + + public GlueTableOperator glueTableOperator; + public GluePartitionOperator gluePartitionOperator; + public GlueFunctionOperator glueFunctionOperator; + + public GlueClient glueClient; + + /** Default database name if not passed as part of catalog. */ + public static final String DEFAULT_DB = "default"; + + public GlueCatalog( + String catalogName, + String databaseName, + ReadableConfig catalogConfig, + Properties glueClientProperties) { + super(catalogName, databaseName); + checkNotNull(catalogConfig, "Catalog config cannot be null."); + String glueCatalogId = + String.valueOf( + catalogConfig.getOptional(GlueCatalogOptions.GLUE_CATALOG_ID).orElse(null)); + glueClient = createClient(glueClientProperties); + this.glueDatabaseOperator = new GlueDatabaseOperator(getName(), glueClient, glueCatalogId); + this.glueTableOperator = new GlueTableOperator(getName(), glueClient, glueCatalogId); + this.gluePartitionOperator = + new GluePartitionOperator(getName(), glueClient, glueCatalogId); + this.glueFunctionOperator = new GlueFunctionOperator(getName(), glueClient, glueCatalogId); + } + + private static GlueClient createClient(Properties glueClientProperties) { + return AWSClientUtil.createAwsSyncClient( + glueClientProperties, + AWSGeneralUtil.createSyncHttpClient(glueClientProperties, ApacheHttpClient.builder()), + GlueClient.builder(), + GlueCatalogConstants.BASE_GLUE_USER_AGENT_PREFIX_FORMAT, + GlueCatalogConstants.GLUE_CLIENT_USER_AGENT_PREFIX); + } + + @VisibleForTesting + public GlueCatalog( + String catalogName, + String databaseName, + GlueClient glueClient, + GlueDatabaseOperator glueDatabaseOperator, + GlueTableOperator glueTableOperator, + GluePartitionOperator gluePartitionOperator, + GlueFunctionOperator glueFunctionOperator) { + super(catalogName, databaseName); + this.glueClient = glueClient; + this.glueDatabaseOperator = glueDatabaseOperator; + this.glueTableOperator = glueTableOperator; + this.gluePartitionOperator = gluePartitionOperator; + this.glueFunctionOperator = glueFunctionOperator; + } + + /** + * Open the catalog. Used for any required preparation in initialization phase. + * + * @throws CatalogException in case of any runtime exception + */ + @Override + public void open() throws CatalogException {} + + /** + * Close the catalog when it is no longer needed and release any resource that it might be + * holding. + * + * @throws CatalogException in case of any runtime exception + */ + @Override + public void close() throws CatalogException { + try { + glueClient.close(); + } catch (Exception e) { + LOG.warn("Glue Client is not closed properly!", e); + } + } + + // ------ databases ------ + + /** + * Create a database. + * + * @param databaseName Name of the database to be created + * @param database The database definition + * @param ignoreIfExists Flag to specify behavior when a database with the given name already + * exists: if set to false, throw a DatabaseAlreadyExistException, if set to true, do + * nothing. + * @throws DatabaseAlreadyExistException if the given database already exists and ignoreIfExists + * is false + * @throws CatalogException in case of any runtime exception + */ + @Override + public void createDatabase( + String databaseName, CatalogDatabase database, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(databaseName), + "Database name cannot be null or empty."); + checkNotNull(database, "Database cannot be null."); + + databaseName = GlueUtils.getGlueConventionalName(databaseName); + if (databaseExists(databaseName)) { + if (!ignoreIfExists) { + throw new DatabaseAlreadyExistException(getName(), databaseName); + } + } else { + glueDatabaseOperator.createGlueDatabase(databaseName, database); + LOG.info("Created Database {}.", databaseName); + } + } + + /** + * Drop a database. + * + * @param databaseName Name of the database to be dropped. + * @param ignoreIfNotExists Flag to specify behavior when the database does not exist: if set to + * false, throw an exception, if set to true, do nothing. + * @param cascade Flag to specify behavior when the database contains table or function: if set + * to true, delete all tables and functions in the database and then delete the database, if + * set to false, throw an exception. + * @throws DatabaseNotExistException if the given database does not exist + * @throws DatabaseNotEmptyException if the given database is not empty and isRestrict is true + * @throws CatalogException in case of any runtime exception + */ + @Override + public void dropDatabase(String databaseName, boolean ignoreIfNotExists, boolean cascade) + throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException { + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(databaseName), + "Database name cannot be null or empty."); + databaseName = GlueUtils.getGlueConventionalName(databaseName); + if (databaseExists(databaseName)) { + if (cascade) { + List<String> tables = listTables(databaseName); + if (!tables.isEmpty()) { + glueDatabaseOperator.deleteTablesFromDatabase(databaseName, tables); + LOG.info("All Tables deleted from Database {}.", databaseName); + } + + List<String> functions = listFunctions(databaseName); + if (!functions.isEmpty()) { + glueDatabaseOperator.deleteFunctionsFromDatabase(databaseName, functions); + LOG.info("All Functions deleted from Database {}.", databaseName); + } + } + if (!isDatabaseEmpty(databaseName)) { + throw new DatabaseNotEmptyException(getName(), databaseName); + } + glueDatabaseOperator.dropGlueDatabase(databaseName); + LOG.info("Dropped Database: {}.", databaseName); + } else if (!ignoreIfNotExists) { + throw new DatabaseNotExistException(getName(), databaseName); + } + } + + /** + * Modify existing database. + * + * @param name Name of the database to be modified + * @param newDatabase The new database definition + * @param ignoreIfNotExists Flag to specify behavior when the given database does not exist: if + * set to false, throw an exception, if set to true, do nothing. + * @throws DatabaseNotExistException if the given database does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) + throws DatabaseNotExistException, CatalogException { + + name = GlueUtils.getGlueConventionalName(name); + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(name), + "Database name cannot be null or empty."); + checkNotNull(newDatabase, "Database cannot be Empty"); + try { + CatalogDatabase existingDatabase = glueDatabaseOperator.getDatabase(name); + if (existingDatabase != null) { + if (existingDatabase.getClass() != newDatabase.getClass()) { + throw new CatalogException( + String.format( + "Database types don't match. Existing database is '%s' and new database is '%s'.", + existingDatabase.getClass().getName(), + newDatabase.getClass().getName())); + } + glueDatabaseOperator.updateGlueDatabase(name, newDatabase); + } + } catch (DatabaseNotExistException e) { + if (!ignoreIfNotExists) { + throw new DatabaseNotExistException(getName(), name); + } + } + } + + /** + * Get list of databases in catalog. + * + * @return a list of the names of all databases + * @throws CatalogException in case of any runtime exception + */ + @Override + public List<String> listDatabases() throws CatalogException { + return glueDatabaseOperator.listGlueDatabases(); + } + + /** + * Get a database from this catalog. + * + * @param databaseName Name of the database + * @return The requested database + * @throws DatabaseNotExistException if the database does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public CatalogDatabase getDatabase(String databaseName) + throws DatabaseNotExistException, CatalogException { + + databaseName = GlueUtils.getGlueConventionalName(databaseName); + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(databaseName), + "Database name cannot be null or empty."); + return glueDatabaseOperator.getDatabase(databaseName); + } + + /** + * Check if a database exists in this catalog. + * + * @param databaseName Name of the database + * @return true if the given database exists in the catalog false otherwise + * @throws CatalogException in case of any runtime exception + */ + @Override + public boolean databaseExists(String databaseName) throws CatalogException { + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(databaseName), + "Database name cannot be null or empty."); + try { + return getDatabase(databaseName) != null; + } catch (DatabaseNotExistException e) { + return false; + } + } + + /** + * Check if database is empty. i.e. it should not contain 1. table 2. functions + * + * @param databaseName name of database. + * @return boolean True/False based on the content of database. + * @throws CatalogException Any Exception thrown due to glue error + */ + public boolean isDatabaseEmpty(String databaseName) throws CatalogException { + checkArgument( + !isNullOrWhitespaceOnly(databaseName), + "Database name cannot be null or empty spaces."); + GlueUtils.validate(databaseName); + GetTablesRequest tablesRequest = + GetTablesRequest.builder() + .catalogId(glueTableOperator.getGlueCatalogId()) + .databaseName(databaseName) + .maxResults(1) + .build(); + GetUserDefinedFunctionsRequest functionsRequest = + GetUserDefinedFunctionsRequest.builder() + .databaseName(databaseName) + .catalogId(glueFunctionOperator.getGlueCatalogId()) + .maxResults(1) + .build(); + try { + GetTablesResponse tableResponse = glueClient.getTables(tablesRequest); + GetUserDefinedFunctionsResponse functionResponse = + glueClient.getUserDefinedFunctions(functionsRequest); + return (tableResponse.sdkHttpResponse().isSuccessful() && tableResponse.tableList().isEmpty()) && Review Comment: modified the method and seperated out sdkResponse Check with requestId and content in tableList and functionList . please review the changes whenever time @foxus -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
