guptashailesh92 commented on code in PR #47: URL: https://github.com/apache/flink-connector-aws/pull/47#discussion_r1092312544
########## flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueOperator.java: ########## @@ -0,0 +1,1498 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.util; + +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogFunctionImpl; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogView; +import org.apache.flink.table.catalog.FunctionLanguage; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.glue.GlueCatalogConfig; +import org.apache.flink.table.catalog.glue.GlueCatalogOptions; +import org.apache.flink.table.catalog.mapper.DatatypeMapper; +import org.apache.flink.table.catalog.mapper.GlueDatatypeMapper; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.factories.ManagedTableFactory; +import org.apache.flink.table.resource.ResourceType; +import org.apache.flink.table.types.DataType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.AlreadyExistsException; +import software.amazon.awssdk.services.glue.model.BatchDeleteTableRequest; +import software.amazon.awssdk.services.glue.model.BatchDeleteTableResponse; +import software.amazon.awssdk.services.glue.model.Column; +import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest; +import software.amazon.awssdk.services.glue.model.CreateDatabaseResponse; +import software.amazon.awssdk.services.glue.model.CreatePartitionRequest; +import software.amazon.awssdk.services.glue.model.CreatePartitionResponse; +import software.amazon.awssdk.services.glue.model.CreateTableRequest; +import software.amazon.awssdk.services.glue.model.CreateTableResponse; +import software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionRequest; +import software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionResponse; +import software.amazon.awssdk.services.glue.model.Database; +import software.amazon.awssdk.services.glue.model.DatabaseInput; +import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest; +import software.amazon.awssdk.services.glue.model.DeleteDatabaseResponse; +import software.amazon.awssdk.services.glue.model.DeletePartitionRequest; +import software.amazon.awssdk.services.glue.model.DeletePartitionResponse; +import software.amazon.awssdk.services.glue.model.DeleteTableRequest; +import software.amazon.awssdk.services.glue.model.DeleteTableResponse; +import software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionRequest; +import software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionResponse; +import software.amazon.awssdk.services.glue.model.EntityNotFoundException; +import software.amazon.awssdk.services.glue.model.GetDatabaseRequest; +import software.amazon.awssdk.services.glue.model.GetDatabaseResponse; +import software.amazon.awssdk.services.glue.model.GetDatabasesRequest; +import software.amazon.awssdk.services.glue.model.GetDatabasesResponse; +import software.amazon.awssdk.services.glue.model.GetPartitionRequest; +import software.amazon.awssdk.services.glue.model.GetPartitionResponse; +import software.amazon.awssdk.services.glue.model.GetPartitionsRequest; +import software.amazon.awssdk.services.glue.model.GetPartitionsResponse; +import software.amazon.awssdk.services.glue.model.GetTableRequest; +import software.amazon.awssdk.services.glue.model.GetTableResponse; +import software.amazon.awssdk.services.glue.model.GetTablesRequest; +import software.amazon.awssdk.services.glue.model.GetTablesResponse; +import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionRequest; +import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionResponse; +import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsRequest; +import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsResponse; +import software.amazon.awssdk.services.glue.model.GlueException; +import software.amazon.awssdk.services.glue.model.GlueResponse; +import software.amazon.awssdk.services.glue.model.Partition; +import software.amazon.awssdk.services.glue.model.PartitionInput; +import software.amazon.awssdk.services.glue.model.PrincipalType; +import software.amazon.awssdk.services.glue.model.ResourceUri; +import software.amazon.awssdk.services.glue.model.StorageDescriptor; +import software.amazon.awssdk.services.glue.model.Table; +import software.amazon.awssdk.services.glue.model.TableInput; +import software.amazon.awssdk.services.glue.model.UpdateDatabaseRequest; +import software.amazon.awssdk.services.glue.model.UpdateDatabaseResponse; +import software.amazon.awssdk.services.glue.model.UpdateTableRequest; +import software.amazon.awssdk.services.glue.model.UpdateTableResponse; +import software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionRequest; +import software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionResponse; +import software.amazon.awssdk.services.glue.model.UserDefinedFunction; +import software.amazon.awssdk.services.glue.model.UserDefinedFunctionInput; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly; + +/** + * Utilities for Glue catalog operations. + * Important Note : https://aws.amazon.com/premiumsupport/knowledge-center/glue-crawler-internal-service-exception/ + */ +public class GlueOperator { + + /** + * Defines the location URI for database. + * This locationUri is high level path for catalog + */ + public final String locationUri; + + /** + * Instance of AwsProperties which holds the configs related to configure glue and aws setup. + */ + private final AwsProperties awsProperties; + + /** + * http client for glue client. + * Current implementation for client is sync type. + */ + private final GlueClient glueClient; + + private final DatatypeMapper<DataType> datatypeMapper; + + private final String catalogName; + + private static final Logger LOG = LoggerFactory.getLogger(GlueOperator.class); + + public GlueOperator(String locationUri, String catalogName, AwsProperties awsProperties, GlueClient glueClient) { + this.locationUri = locationUri; + this.awsProperties = awsProperties; + this.glueClient = glueClient; + this.datatypeMapper = new GlueDatatypeMapper<>(); + this.catalogName = catalogName; + } + + public void closeClient() { + glueClient.close(); + } + + // -------------- Database related operations. + + /** + * List all databases present. + * + * @return List of fully qualified database names + */ + public List<String> listGlueDatabases() throws CatalogException { + try { + GetDatabasesRequest.Builder databasesRequestBuilder = + GetDatabasesRequest.builder() + .catalogId(getGlueCatalogId()); + + GetDatabasesResponse response = glueClient.getDatabases(databasesRequestBuilder.build()); + List<String> glueDatabases = + response.databaseList().stream() + .map(Database::name) + .collect(Collectors.toList()); + String dbResultNextToken = response.nextToken(); + if (Optional.ofNullable(dbResultNextToken).isPresent()) { + do { + databasesRequestBuilder.nextToken(dbResultNextToken); + response = glueClient.getDatabases(databasesRequestBuilder.build()); + glueDatabases.addAll( + response.databaseList().stream() + .map(Database::name) + .collect(Collectors.toList())); + dbResultNextToken = response.nextToken(); + } while (Optional.ofNullable(dbResultNextToken).isPresent()); + } + return glueDatabases; + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e.getCause()); + } + } + + /** + * Create database in glue data catalog. + * + * @param databaseName fully qualified name of database. + * @param database Instance of {@link CatalogDatabase}. + * @throws CatalogException when unknown error from glue servers. + * @throws DatabaseAlreadyExistException when database exists already in glue data catalog. + */ + public void createGlueDatabase(String databaseName, CatalogDatabase database) + throws CatalogException, DatabaseAlreadyExistException { + + validateName(databaseName); + Map<String, String> properties = database.getProperties(); + DatabaseInput.Builder databaseInputBuilder = DatabaseInput.builder().name(databaseName) + .description(database.getComment()) + // update location and remove location from properties. + .locationUri(extractDatabaseLocation(properties, databaseName)) + .parameters(properties); + CreateDatabaseRequest.Builder requestBuilder = CreateDatabaseRequest.builder() + .databaseInput(databaseInputBuilder.build()) + .catalogId(getGlueCatalogId()).tags(getFlinkCatalogTag()); + LOG.info(String.format("Database Properties Listing :- %s", + properties.entrySet().stream().map(e -> e.getKey() + e.getValue()).collect(Collectors.joining(",")))); + try { + CreateDatabaseResponse response = glueClient.createDatabase(requestBuilder.build()); + LOG.debug(getDebugLog(response)); + LOG.info(String.format("%s Database created.", databaseName)); + } catch (EntityNotFoundException e) { + throw new DatabaseAlreadyExistException(catalogName, databaseName, e); + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + } + + /** + * Extract location from database properties if present and remove location from properties. + * fallback to create default location if not present + * + * @param databaseProperties database properties. + * @param databaseName fully qualified name for database. + * @return location for database. + */ + private String extractDatabaseLocation(Map<String, String> databaseProperties, String databaseName) { + return databaseProperties.containsKey(GlueCatalogConfig.LOCATION_URI) ? + databaseProperties.remove(GlueCatalogConfig.LOCATION_URI) : + locationUri + GlueCatalogConfig.LOCATION_SEPARATOR + databaseName; + } + + /** + * Create tag for flink in glue catalog for identification. + * + * @return Key/Value pair for tags + */ + private Map<String, String> getFlinkCatalogTag() { + Map<String, String> tags = new HashMap<>(); + tags.put("source", "flink_catalog"); + return tags; + } + + /** + * Delete a database from Glue data catalog only when database is empty. + * + * @param databaseName fully qualified name of database + * @throws CatalogException Any Exception thrown due to glue error + * @throws DatabaseNotExistException when database doesn't exists in glue catalog. + */ + public void dropGlueDatabase(String databaseName) throws CatalogException, DatabaseNotExistException { + + validateName(databaseName); + + DeleteDatabaseRequest deleteDatabaseRequest = DeleteDatabaseRequest.builder() + .name(databaseName).catalogId(getGlueCatalogId()).build(); + try { + DeleteDatabaseResponse response = glueClient.deleteDatabase(deleteDatabaseRequest); + LOG.debug(getDebugLog(response)); + LOG.info(String.format("Database Dropped %s", databaseName)); + } catch (EntityNotFoundException e) { + throw new DatabaseNotExistException(catalogName, databaseName); + } catch (GlueException e) { + throw new CatalogException(catalogName, e); + } + } + + /** + * Drops list of table in database from glue data catalog. + * + * @param databaseName fully qualified name of database + * @param tables List of tables to remove from database. + * @throws CatalogException Any Exception thrown due to glue error + */ + public void deleteTablesFromDatabase(String databaseName, Collection<String> tables) throws CatalogException { + validateName(databaseName); + BatchDeleteTableRequest batchTableRequest = + BatchDeleteTableRequest.builder() + .databaseName(databaseName) + .catalogId(getGlueCatalogId()) + .tablesToDelete(tables) + .build(); + + try { + BatchDeleteTableResponse response = glueClient.batchDeleteTable(batchTableRequest); + if (response.hasErrors()) { + String errorMsg = String.format( + "Glue Table errors:- %s", + response.errors().stream() + .map( + e -> + "Table: " + + e.tableName() + + "\nErrorDetail: " + + e.errorDetail() + .errorMessage()) + .collect(Collectors.joining("\n"))); + LOG.error(errorMsg); + } + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + } + + /** + * Drops list of user defined function in database from glue data catalog. + * + * @param databaseName fully qualified name of database + * @param functions List of tables to remove from database. + * @throws CatalogException Any Exception thrown due to glue error + */ + public void deleteFunctionsFromDatabase(String databaseName, Collection<String> functions) + throws CatalogException { + validateName(databaseName); + try { + DeleteUserDefinedFunctionRequest.Builder requestBuilder = DeleteUserDefinedFunctionRequest.builder() + .databaseName(databaseName).catalogId(getGlueCatalogId()); + for (String functionName : functions) { + requestBuilder.functionName(functionName); + DeleteUserDefinedFunctionResponse response = glueClient.deleteUserDefinedFunction(requestBuilder.build()); + LOG.debug(getDebugLog(response)); + validateGlueResponse(response); + LOG.info(String.format("Dropped Function %s", functionName)); + } + + } catch (GlueException e) { + LOG.error(String.format("Error deleting functions in database: %s", databaseName)); + throw new CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + } + + + /** + * Check if database is empty. + * i.e. it should not contain + * 1. table + * 2. functions + * + * @param databaseName name of database. + * @return boolean True/False based on the content of database. + * @throws CatalogException Any Exception thrown due to glue error + */ + public boolean isDatabaseEmpty(String databaseName) throws CatalogException { + checkArgument(!isNullOrWhitespaceOnly(databaseName)); + validateName(databaseName); + GetTablesRequest tablesRequest = + GetTablesRequest.builder() + .catalogId(getGlueCatalogId()) + .databaseName(databaseName).maxResults(1) + .build(); + try { + GetTablesResponse response = glueClient.getTables(tablesRequest); + if (response.sdkHttpResponse().isSuccessful() && response.tableList().size() == 0) { + return listGlueFunctions(databaseName).size() == 0; + } + return false; + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + + } + + /** + * Get a database from this glue data catalog. + * + * @param databaseName fully qualified name of database. + * @return Instance of {@link CatalogDatabase } . + * @throws DatabaseNotExistException when database doesn't exists in Glue data catalog. + * @throws CatalogException when any unknown error occurs in glue. + */ + public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException { + + validateName(databaseName); + GetDatabaseRequest getDatabaseRequest = + GetDatabaseRequest.builder() + .name(databaseName) + .catalogId(getGlueCatalogId()) + .build(); + try { + GetDatabaseResponse response = glueClient.getDatabase(getDatabaseRequest); + LOG.debug( + GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER + ": existing database. Client call response :- " + + response.sdkHttpResponse().statusText()); + validateGlueResponse(response); + return getCatalogDatabase(response.database()); + } catch (EntityNotFoundException e) { + throw new DatabaseNotExistException(catalogName, databaseName); + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + } + + /** + * Build CatalogDatabase instance using information from glue Database. + * + * @param glueDatabase {@link Database } + * @return {@link CatalogDatabase } instance. + */ + private CatalogDatabase getCatalogDatabase(Database glueDatabase) { + Map<String, String> properties = new HashMap<>(glueDatabase.parameters()); + + // retrieve location uri into properties + properties.put(GlueCatalogConfig.LOCATION_URI, glueDatabase.locationUri()); + String comment = glueDatabase.description(); + return new CatalogDatabaseImpl(properties, comment); + } + + public void updateGlueDatabase(String databaseName, CatalogDatabase newDatabase) + throws CatalogException { + + validateName(databaseName); + Map<String, String> properties = newDatabase.getProperties(); + DatabaseInput.Builder databaseInputBuilder = DatabaseInput.builder() + .parameters(properties) + .description(newDatabase.getComment()) + .name(databaseName) + .locationUri(extractDatabaseLocation(properties, databaseName)); + + UpdateDatabaseRequest updateRequest = + UpdateDatabaseRequest.builder() + .databaseInput(databaseInputBuilder.build()) + .name(databaseName) + .catalogId(getGlueCatalogId()) + .build(); + UpdateDatabaseResponse response = glueClient.updateDatabase(updateRequest); + LOG.debug(getDebugLog(response)); + LOG.info(String.format("Database Updated. %s", databaseName)); + validateGlueResponse(response); + } + + // -------------- Table related operations. + + /** + * Create table in glue data catalog. + * + * @param tablePath Fully qualified name of table. {@link ObjectPath} + * @param table instance of {@link CatalogBaseTable} containing table related information. + * @param managedTable identifier if managed table. + * @throws CatalogException Any Exception thrown due to glue error + */ + public void createGlueTable(final ObjectPath tablePath, final CatalogBaseTable table, + final boolean managedTable) throws CatalogException { + + checkNotNull(table); + checkNotNull(tablePath); + Map<String, String> properties = new HashMap<>(table.getOptions()); + String tableOwner = extractTableOwner(properties); + if (managedTable) { + properties.put(CONNECTOR.key(), ManagedTableFactory.DEFAULT_IDENTIFIER); + } + Set<Column> glueColumns = getGlueColumnsFromCatalogTable(table); + + // create StorageDescriptor for table + StorageDescriptor.Builder storageDescriptorBuilder = StorageDescriptor.builder() + .inputFormat(extractInputFormat(properties)) + .outputFormat(extractOutputFormat(properties)) + .location(extractTableLocation(properties, tablePath)); + + // create TableInput Builder with available information. + TableInput.Builder tableInputBuilder = TableInput.builder() + .name(tablePath.getObjectName()) + .description(table.getComment()) + .tableType(table.getTableKind().name()) + .lastAccessTime(Instant.now()) + .owner(tableOwner); + + if (table.getTableKind().name().equalsIgnoreCase(CatalogBaseTable.TableKind.VIEW.name())) { + tableInputBuilder.viewExpandedText(getExpandedQuery(table)); + tableInputBuilder.viewOriginalText(getOriginalQuery(table)); + } + + CreateTableRequest.Builder requestBuilder = CreateTableRequest.builder() + .catalogId(getGlueCatalogId()) + .databaseName(tablePath.getDatabaseName()); + + if (table instanceof CatalogTable) { + CatalogTable catalogTable = (CatalogTable) table; + if (catalogTable.isPartitioned()) { + LOG.info("Catalog table is partitioned"); + Collection<Column> partitionKeys = getPartitionKeys(catalogTable, glueColumns); + LOG.info("Partition columns are -> " + + partitionKeys.stream().map(Column::name).collect(Collectors.joining(","))); + tableInputBuilder.partitionKeys(partitionKeys); + } + } + + try { + // apply storage descriptor and tableInput for request + storageDescriptorBuilder.columns(glueColumns); + tableInputBuilder.storageDescriptor(storageDescriptorBuilder.build()); + tableInputBuilder.parameters(properties); + requestBuilder.tableInput(tableInputBuilder.build()); + CreateTableResponse response = glueClient.createTable(requestBuilder.build()); + LOG.debug(getDebugLog(response)); + validateGlueResponse(response); + LOG.info(String.format("Table created. %s", tablePath.getFullName())); + + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + } + + private String getExpandedQuery(CatalogBaseTable table) { + // todo complete it. + return ""; + } + + private String getOriginalQuery(CatalogBaseTable table) { + // todo complete it. + return ""; + } + + /** + * Extract table owner name and remove from properties. + * + * @param properties Map of properties. + * @return fully qualified owner name. + */ + private String extractTableOwner(Map<String, String> properties) { + return properties.containsKey(GlueCatalogConfig.TABLE_OWNER) ? + properties.remove(GlueCatalogConfig.TABLE_OWNER) : null; + } + + /** + * Build set of {@link Column} associated with table. + * + * @param catalogBaseTable instance of {@link CatalogBaseTable}. + * @return Set of Column + */ + private Set<Column> getGlueColumnsFromCatalogTable(CatalogBaseTable catalogBaseTable) { + checkNotNull(catalogBaseTable); + TableSchema tableSchema = catalogBaseTable.getSchema(); + return Arrays.stream(tableSchema.getFieldNames()) + .map(fieldName -> getGlueColumn(catalogBaseTable, tableSchema, fieldName)) + .collect(Collectors.toSet()); + } + + /** + * Extract location from database properties if present and remove location from properties. + * fallback to create default location if not present + * + * @param tableProperties table properties. + * @param tablePath fully qualified object for table. + * @return location for table. + */ + private String extractTableLocation(Map<String, String> tableProperties, ObjectPath tablePath) { + return tableProperties.containsKey(GlueCatalogConfig.LOCATION_URI) ? + tableProperties.remove(GlueCatalogConfig.LOCATION_URI) : locationUri + GlueCatalogConfig.LOCATION_SEPARATOR + + tablePath.getDatabaseName() + GlueCatalogConfig.LOCATION_SEPARATOR + tablePath.getObjectName(); + + } + + /** + * Extract OutputFormat from properties if present and remove outputFormat from properties. + * fallback to default format if not present + * + * @param tableProperties Key/Value properties + * @return output Format. + */ + private String extractOutputFormat(Map<String, String> tableProperties) { + return tableProperties.containsKey(GlueCatalogConfig.TABLE_OUTPUT_FORMAT) ? tableProperties.remove(GlueCatalogConfig.TABLE_OUTPUT_FORMAT) + : GlueCatalogOptions.OUTPUT_FORMAT.defaultValue(); + } + + /** + * Extract InputFormat from properties if present and remove inputFormat from properties. + * fallback to default format if not present + * + * @param tableProperties Key/Value properties + * @return input Format. + */ + private String extractInputFormat(Map<String, String> tableProperties) { + return tableProperties.containsKey(GlueCatalogConfig.TABLE_INPUT_FORMAT) ? + tableProperties.remove(GlueCatalogConfig.TABLE_INPUT_FORMAT) : GlueCatalogOptions.INPUT_FORMAT.defaultValue(); + + } + + /** + * Get list of filtered columns which are partition columns. + * + * @param catalogTable {@link CatalogTable} instance. + * @param columns List of all column in table. + * @return List of column marked as partition key. + */ + private Collection<Column> getPartitionKeys(CatalogTable catalogTable, Collection<Column> columns) { + Set<String> partitionKeys = new HashSet<>(catalogTable.getPartitionKeys()); + return columns.stream().filter(column -> partitionKeys.contains(column.name())).collect(Collectors.toList()); + } + + /** + * @param tablePath fully Qualified table path. + * @param newTable instance of {@link CatalogBaseTable} containing information for table. + * @param managedTable identifier for managed table. + * @throws CatalogException Glue related exception. + */ + public void alterGlueTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean managedTable) + throws CatalogException { + + Map<String, String> properties = new HashMap<>(newTable.getOptions()); + String tableOwner = extractTableOwner(properties); + + if (managedTable) { + properties.put(CONNECTOR.key(), ManagedTableFactory.DEFAULT_IDENTIFIER); + } + + Set<Column> glueColumns = getGlueColumnsFromCatalogTable(newTable); + + // create StorageDescriptor for table + StorageDescriptor.Builder storageDescriptorBuilder = StorageDescriptor.builder() + .inputFormat(extractInputFormat(properties)) + .outputFormat(extractOutputFormat(properties)) + .location(extractTableLocation(properties, tablePath)) + .parameters(properties) + .columns(glueColumns); + + // create TableInput Builder with available information. + TableInput.Builder tableInputBuilder = TableInput.builder() + .name(tablePath.getObjectName()) + .description(newTable.getComment()) + .tableType(newTable.getTableKind().name()) + .lastAccessTime(Instant.now()) + .owner(tableOwner); + + UpdateTableRequest.Builder requestBuilder = UpdateTableRequest.builder().tableInput(tableInputBuilder.build()) + .catalogId(getGlueCatalogId()) + .databaseName(tablePath.getDatabaseName()); + + if (newTable instanceof CatalogTable) { + CatalogTable catalogTable = (CatalogTable) newTable; + if (catalogTable.isPartitioned()) { + tableInputBuilder.partitionKeys(getPartitionKeys(catalogTable, glueColumns)); + } + } + + // apply storage descriptor and tableInput for request + tableInputBuilder.storageDescriptor(storageDescriptorBuilder.build()); + requestBuilder.tableInput(tableInputBuilder.build()); + + try { + UpdateTableResponse response = glueClient.updateTable(requestBuilder.build()); + LOG.debug(getDebugLog(response)); + validateGlueResponse(response); + LOG.info(String.format("Table updated. %s", tablePath.getFullName())); + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + } + + private String getDebugLog(GlueResponse response) { + return String.format("Glue response : status = %s \n " + + "Details = %s \nMetadataResponse = %s", + response.sdkHttpResponse().isSuccessful(), response.sdkHttpResponse().toString(), response.responseMetadata()); + } + + /** + * Get names of all tables or views under this database based on type identifier. + * An empty list is returned if none exists. + * + * @param databaseName fully qualified database name. + * @return a list of the names of all tables or views in this database based on type identifier. + * @throws CatalogException in case of any runtime exception + */ + public List<String> getGlueTableList(String databaseName, String type) throws CatalogException { + GetTablesRequest.Builder tablesRequestBuilder = + GetTablesRequest.builder() + .databaseName(databaseName) + .catalogId(getGlueCatalogId()); + GetTablesResponse response = glueClient.getTables(tablesRequestBuilder.build()); + validateGlueResponse(response); + List<String> finalTableList = response.tableList().stream() + .filter(table -> table.tableType().equalsIgnoreCase(type)) + .map(Table::name).collect(Collectors.toList()); + String tableResultNextToken = response.nextToken(); + + if (Optional.ofNullable(tableResultNextToken).isPresent()) { + do { + // update token in requestBuilder to fetch next batch + tablesRequestBuilder.nextToken(tableResultNextToken); + response = glueClient.getTables(tablesRequestBuilder.build()); + validateGlueResponse(response); + finalTableList.addAll(response.tableList().stream() + .filter(table -> table.tableType().equalsIgnoreCase(type)) + .map(Table::name).collect(Collectors.toList())); + tableResultNextToken = response.nextToken(); + } while (Optional.ofNullable(tableResultNextToken).isPresent()); + } + return finalTableList; + } + + /** + * Returns a {@link Table} identified by the given table Path. {@link ObjectPath}. + * + * @param tablePath Path of the table or view + * @return The requested table. Glue encapsulates whether table or view in its attribute called type. + * @throws TableNotExistException if the target does not exist + * @throws CatalogException in case of any runtime exception + */ + public Table getGlueTable(ObjectPath tablePath) throws TableNotExistException, CatalogException { + GetTableRequest tablesRequest = + GetTableRequest.builder() + .databaseName(tablePath.getDatabaseName()) + .name(tablePath.getObjectName()) + .catalogId(getGlueCatalogId()) + .build(); + try { + GetTableResponse response = glueClient.getTable(tablesRequest); + LOG.info(String.format("Glue table Found %s", response.table().name())); + validateGlueResponse(response); + return response.table(); + } catch (EntityNotFoundException e) { + throw new TableNotExistException(catalogName, tablePath, e); + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + + } + + /** + * Check if a table or view exists in glue data catalog. + * + * @param tablePath Path of the table or view + * @return true if the given table exists in the catalog false otherwise + * @throws CatalogException in case of any runtime exception + */ + public boolean glueTableExists(ObjectPath tablePath) throws CatalogException { + try { + Table glueTable = getGlueTable(tablePath); + return glueTable != null && glueTable.name().equals(tablePath.getObjectName()); + } catch (TableNotExistException e) { + LOG.warn(String.format("%s\nDatabase: %s Table: %s", GlueCatalogConfig.TABLE_NOT_EXISTS_IDENTIFIER, + tablePath.getDatabaseName(), tablePath.getObjectName())); + return false; + } catch (CatalogException e) { + LOG.error(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e.getCause()); + throw new CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + + } + + /** + * Modify an existing function. Function name should be handled in a case-insensitive way. + * + * @param functionPath path of function. + * @param newFunction modified function. + * @throws CatalogException on runtime errors. + */ + public void alterGlueFunction(ObjectPath functionPath, CatalogFunction newFunction) throws CatalogException { + UserDefinedFunctionInput functionInput = createFunctionInput(functionPath, newFunction); + UpdateUserDefinedFunctionRequest updateUserDefinedFunctionRequest = UpdateUserDefinedFunctionRequest.builder() + .functionName(functionPath.getObjectName()) + .databaseName(functionPath.getDatabaseName()) + .catalogId(getGlueCatalogId()) + .functionInput(functionInput).build(); + UpdateUserDefinedFunctionResponse response = glueClient.updateUserDefinedFunction(updateUserDefinedFunctionRequest); + validateGlueResponse(response); + LOG.info(String.format("Function altered. %s", functionPath.getFullName())); + } + + /** + * Drop a table or view from glue data catalog. + * + * @param tablePath fully qualified table path + * @throws CatalogException on runtime errors. + */ + public void dropGlueTable(ObjectPath tablePath) throws CatalogException { + DeleteTableRequest.Builder tableRequestBuilder = + DeleteTableRequest.builder() + .databaseName(tablePath.getDatabaseName()) + .name(tablePath.getObjectName()) + .catalogId(getGlueCatalogId()); + try { + DeleteTableResponse response = glueClient.deleteTable(tableRequestBuilder.build()); + validateGlueResponse(response); + LOG.info(String.format("Dropped Table %s.", tablePath.getObjectName())); + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + + } + + /** + * validate response from client call. + */ + private void validateGlueResponse(GlueResponse response) { + if (!response.sdkHttpResponse().isSuccessful()) { + throw new CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER); + } + } + + public Schema getSchemaFromGlueTable(Table glueTable) { + Schema.Builder schemaBuilder = Schema.newBuilder(); + for (Column col : glueTable.storageDescriptor().columns()) { + schemaBuilder.column(col.name(), col.type()); + } + return schemaBuilder.build(); + } + + // -------------- Function related operations. + + /** + * Create a function. Function name should be handled in a case-insensitive way. + * + * @param functionPath path of the function + * @param function Flink function to be created + * @throws CatalogException in case of any runtime exception + */ + public void createGlueFunction(ObjectPath functionPath, CatalogFunction function) + throws CatalogException, FunctionAlreadyExistException { + + UserDefinedFunctionInput functionInput = createFunctionInput(functionPath, function); + CreateUserDefinedFunctionRequest.Builder requestBuilder = CreateUserDefinedFunctionRequest.builder() + .databaseName(functionPath.getDatabaseName()) + .catalogId(getGlueCatalogId()) + .functionInput(functionInput); + try { + CreateUserDefinedFunctionResponse response = glueClient.createUserDefinedFunction(requestBuilder.build()); + validateGlueResponse(response); + LOG.info(String.format("Function created. %s", functionPath.getFullName())); + } catch (AlreadyExistsException e) { + LOG.error(String.format("%s.%s already Exists. Function language of type: %s", + functionPath.getDatabaseName(), functionPath.getObjectName(), function.getFunctionLanguage())); + throw new FunctionAlreadyExistException(catalogName, functionPath); + } catch (GlueException e) { + LOG.error("Error creating glue function.", e.getCause()); + throw new CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + } + + private UserDefinedFunctionInput createFunctionInput(ObjectPath functionPath, CatalogFunction function) + throws UnsupportedOperationException { + Collection<software.amazon.awssdk.services.glue.model.ResourceUri> resourceUris = new LinkedList<>(); + for (org.apache.flink.table.resource.ResourceUri resourceUri : function.getFunctionResources()) { + switch (resourceUri.getResourceType()) { + case JAR: + case FILE: + case ARCHIVE: + resourceUris.add(ResourceUri.builder().resourceType(resourceUri.getResourceType().name()) + .uri(resourceUri.getUri()).build()); + break; + default: + throw new UnsupportedOperationException("GlueCatalog supports only creating resources JAR/FILE or ARCHIVE."); + } + + } + return UserDefinedFunctionInput.builder().functionName(functionPath.getObjectName()) + .className(getGlueFunctionClassName(function)) + .ownerType(PrincipalType.USER) + .ownerName(GlueCatalogConfig.FLINK_CATALOG) + .resourceUris(resourceUris) + .build(); + } + + /** + * Get the user defined function from glue Catalog. Function name should be handled in a case-insensitive way. + * + * @param functionPath path of the function + * @return the requested function + * @throws CatalogException in case of any runtime exception + */ + public CatalogFunction getGlueFunction(ObjectPath functionPath) { + GetUserDefinedFunctionRequest request = GetUserDefinedFunctionRequest.builder() + .catalogId(getGlueCatalogId()) + .databaseName(functionPath.getDatabaseName()) + .functionName(functionPath.getObjectName()) + .build(); + GetUserDefinedFunctionResponse response = glueClient.getUserDefinedFunction(request); + validateGlueResponse(response); + UserDefinedFunction udf = response.userDefinedFunction(); + + List<org.apache.flink.table.resource.ResourceUri> resourceUris = new LinkedList<>(); + for (ResourceUri resourceUri : udf.resourceUris()) { + resourceUris.add(new org.apache.flink.table.resource.ResourceUri( + ResourceType.valueOf(resourceUri.resourceType().name()), resourceUri.uri())); + } + + return new CatalogFunctionImpl(getCatalogFunctionClassName(udf), getFunctionalLanguage(udf), resourceUris); + } + + public List<String> listGlueFunctions(String databaseName) { + GetUserDefinedFunctionsRequest.Builder functionsRequest = GetUserDefinedFunctionsRequest.builder() + .databaseName(databaseName).catalogId(getGlueCatalogId()); + + List<String> glueFunctions; + try { + GetUserDefinedFunctionsResponse functionsResponse = glueClient.getUserDefinedFunctions(functionsRequest.build()); + String token = functionsResponse.nextToken(); + glueFunctions = functionsResponse + .userDefinedFunctions() + .stream() + .map(UserDefinedFunction::functionName) + .collect(Collectors.toCollection(LinkedList::new)); + if (Optional.ofNullable(token).isPresent()) { + do { + functionsRequest.nextToken(token); + functionsResponse = glueClient.getUserDefinedFunctions(functionsRequest.build()); + glueFunctions.addAll(functionsResponse + .userDefinedFunctions() + .stream() + .map(UserDefinedFunction::functionName) + .collect(Collectors.toCollection(LinkedList::new))); + token = functionsResponse.nextToken(); + } while (Optional.ofNullable(token).isPresent()); + } + + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + return glueFunctions; + } + + public boolean glueFunctionExists(ObjectPath functionPath) { + GetUserDefinedFunctionRequest request = GetUserDefinedFunctionRequest.builder() + .functionName(functionPath.getObjectName()).databaseName(functionPath.getDatabaseName()) + .catalogId(getGlueCatalogId()) + .build(); + + try { + GetUserDefinedFunctionResponse response = glueClient.getUserDefinedFunction(request); + validateGlueResponse(response); + return response.userDefinedFunction().functionName().equalsIgnoreCase(functionPath.getObjectName()); + } catch (EntityNotFoundException e) { + LOG.warn(String.format("Entry not found for function %s.%s", functionPath.getObjectName(), functionPath.getDatabaseName())); + return false; + } catch (GlueException e) { + LOG.error(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + throw new CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + } + + public void dropGlueFunction(ObjectPath functionPath) throws CatalogException { + DeleteUserDefinedFunctionRequest request = DeleteUserDefinedFunctionRequest.builder() + .catalogId(getGlueCatalogId()) + .functionName(functionPath.getObjectName()) + .databaseName(functionPath.getDatabaseName()) + .build(); + DeleteUserDefinedFunctionResponse response = glueClient.deleteUserDefinedFunction(request); + validateGlueResponse(response); + LOG.info(String.format("Dropped Function. %s", functionPath.getFullName())); + } + + // -------------- Partition related operations. + + public void ensurePartitionedTable(ObjectPath tablePath, Table glueTable) + throws TableNotPartitionedException { + if (!glueTable.hasPartitionKeys()) { + throw new TableNotPartitionedException(catalogName, tablePath); + } + } + + /** + * create partition in glue data catalog. + * + * @param glueTable glue table + * @param partitionSpec partition spec + * @param catalogPartition partition to add + */ + public void createGluePartition(Table glueTable, CatalogPartitionSpec partitionSpec, + CatalogPartition catalogPartition) throws CatalogException, PartitionSpecInvalidException { + + List<String> partCols = getColumnNames(glueTable.partitionKeys()); + LOG.info(String.format("Partition Columns are : %s", String.join(", ", partCols))); + List<String> partitionValues = + getOrderedFullPartitionValues( + partitionSpec, + partCols, + new ObjectPath(glueTable.databaseName(), glueTable.name())); + + // validate partition values + for (int i = 0; i < partCols.size(); i++) { + if (isNullOrWhitespaceOnly(partitionValues.get(i))) { + throw new PartitionSpecInvalidException( + catalogName, + partCols, + new ObjectPath(glueTable.databaseName(), glueTable.name()), + partitionSpec); + } + } + StorageDescriptor.Builder sdBuilder = glueTable.storageDescriptor().toBuilder(); + Map<String, String> partitionProperties = catalogPartition.getProperties(); + + sdBuilder.location(extractPartitionLocation(partitionProperties)); + sdBuilder.parameters(partitionSpec.getPartitionSpec()); + String comment = catalogPartition.getComment(); + if (comment != null) { + partitionProperties.put(GlueCatalogConfig.COMMENT, comment); + } + PartitionInput.Builder partitionInput = PartitionInput.builder() + .parameters(partitionProperties) + .lastAccessTime(Instant.now()) + .storageDescriptor(sdBuilder.build()) + .values(partitionValues); + CreatePartitionRequest createPartitionRequest = CreatePartitionRequest.builder() + .partitionInput(partitionInput.build()) + .catalogId(getGlueCatalogId()) + .databaseName(glueTable.databaseName()) + .tableName(glueTable.name()) + .build(); + + try { + CreatePartitionResponse response = glueClient.createPartition(createPartitionRequest); + validateGlueResponse(response); + LOG.info("Partition Created."); + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + } + + private String extractPartitionLocation(Map<String, String> properties) { + return properties.containsKey(GlueCatalogConfig.LOCATION_URI) ? properties.remove(GlueCatalogConfig.LOCATION_URI) : null; + } + + /** + * Get column names from List of column. + */ + private static List<String> getColumnNames(final List<Column> columns) { + return columns.stream().map(Column::name).collect(Collectors.toList()); + } + + /** + * Get a list of ordered partition values by re-arranging them based on the given list of + * partition keys. If the partition value is null, it'll be converted into default partition + * name. + * + * @param partitionSpec a partition spec. + * @param partitionKeys a list of partition keys. + * @param tablePath path of the table to which the partition belongs. + * @return A list of partition values ordered according to partitionKeys. + * @throws PartitionSpecInvalidException thrown if partitionSpec and partitionKeys have + * different sizes, or any key in partitionKeys doesn't exist in partitionSpec. + */ + private List<String> getOrderedFullPartitionValues( + CatalogPartitionSpec partitionSpec, List<String> partitionKeys, ObjectPath tablePath) + throws PartitionSpecInvalidException { + Map<String, String> spec = partitionSpec.getPartitionSpec(); + if (spec.size() != partitionKeys.size()) { + throw new PartitionSpecInvalidException( + catalogName, partitionKeys, tablePath, partitionSpec); + } + + List<String> values = new ArrayList<>(spec.size()); + for (String key : partitionKeys) { + if (!spec.containsKey(key)) { + throw new PartitionSpecInvalidException( + catalogName, partitionKeys, tablePath, partitionSpec); + } else { + String value = spec.get(key); + if (value == null) { + value = GlueCatalogConfig.DEFAULT_PARTITION_NAME; + } + values.add(value); + } + } + + return values; + } + + /** + * Update glue table. + * + * @param tablePath contains database name and table name. + * @param partitionSpec Existing partition information. + * @param newPartition Partition information with new changes. + * @throws CatalogException Exception in failure. + */ + public void alterGluePartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, + CatalogPartition newPartition) throws CatalogException { + // todo has to implement + } + + /** + * Get CatalogPartitionSpec of all partitions from glue data catalog. + * + * @param tablePath fully qualified table path. + * @return List of PartitionSpec + */ + public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath) { + + GetPartitionsRequest.Builder request = + GetPartitionsRequest.builder().catalogId(getGlueCatalogId()) + .databaseName(tablePath.getDatabaseName()) + .tableName(tablePath.getObjectName()); + try { + GetPartitionsResponse response = glueClient.getPartitions(request.build()); + validateGlueResponse(response); + List<CatalogPartitionSpec> finalPartitionsList = response.partitions().stream() + .map(this::getCatalogPartitionSpec).collect(Collectors.toList()); + String partitionsResultNextToken = response.nextToken(); + if (Optional.ofNullable(partitionsResultNextToken).isPresent()) { + do { + // creating a new GetPartitionsResult using next token. + request.nextToken(partitionsResultNextToken); + response = glueClient.getPartitions(request.build()); + finalPartitionsList.addAll(response.partitions().stream() + .map(this::getCatalogPartitionSpec).collect(Collectors.toList())); + partitionsResultNextToken = response.nextToken(); + } while (Optional.ofNullable(partitionsResultNextToken).isPresent()); + } + + return finalPartitionsList; + + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + + } + + /** + * @param tablePath + * @param partitionSpec + * @throws CatalogException + * @throws TableNotExistException + * @throws PartitionSpecInvalidException + */ + public void dropGluePartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws CatalogException { + + try { + Table glueTable = getGlueTable(tablePath); + List<String> partCols = getColumnNames(glueTable.partitionKeys()); + DeletePartitionRequest deletePartitionRequest = DeletePartitionRequest.builder() + .catalogId(getGlueCatalogId()) + .databaseName(tablePath.getDatabaseName()) + .tableName(tablePath.getObjectName()) + .partitionValues(getOrderedFullPartitionValues(partitionSpec, partCols, tablePath)) + .build(); + DeletePartitionResponse response = glueClient.deletePartition(deletePartitionRequest); + validateGlueResponse(response); + LOG.info("Partition Dropped."); + } catch (TableNotExistException | PartitionSpecInvalidException e) { + e.printStackTrace(); + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + } + + public List<CatalogPartitionSpec> listGluePartitionsByFilter(ObjectPath tablePath, List<Expression> filters) { + String expressionString = filters.stream().map(x -> getExpressionString(x, new StringBuilder())) + .collect(Collectors.joining(GlueCatalogConfig.SPACE + GlueCatalogConfig.AND)); + try { + GetPartitionsRequest request = GetPartitionsRequest.builder().databaseName(tablePath.getDatabaseName()) + .tableName(tablePath.getObjectName()) + .catalogId(getGlueCatalogId()) + .expression(expressionString) + .build(); + GetPartitionsResponse response = glueClient.getPartitions(request); + List<CatalogPartitionSpec> catalogPartitionSpecList = response.partitions().stream() + .map(this::getCatalogPartitionSpec).collect(Collectors.toList()); + // GlueOperator.validateGlueResponse(response); + String nextToken = response.nextToken(); + if (Optional.ofNullable(nextToken).isPresent()) { + do { + // creating a new GetPartitionsResult using next token. + request = + GetPartitionsRequest.builder().databaseName(tablePath.getDatabaseName()) + .tableName(tablePath.getObjectName()) + .catalogId(getGlueCatalogId()) + .expression(expressionString) + .nextToken(nextToken) + .build(); + response = glueClient.getPartitions(request); + catalogPartitionSpecList.addAll(response.partitions().stream() + .map(this::getCatalogPartitionSpec).collect(Collectors.toList())); + nextToken = response.nextToken(); + } while (Optional.ofNullable(nextToken).isPresent()); + } + return catalogPartitionSpecList; + } catch (GlueException e) { + throw new CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e); + } + + } + + private String getExpressionString(Expression expression, StringBuilder sb) { + + for (Expression childExpression : expression.getChildren()) { + if (childExpression.getChildren() != null && childExpression.getChildren().size() > 0) { + getExpressionString(childExpression, sb); + } + } + return sb.insert(0, expression.asSummaryString() + GlueCatalogConfig.SPACE + + GlueCatalogConfig.AND).toString(); + + } + + public Partition getGluePartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException { + try { + GetPartitionRequest request = GetPartitionRequest.builder().catalogId(getGlueCatalogId()) + .databaseName(tablePath.getDatabaseName()).tableName(tablePath.getObjectName()) + .build(); + GetPartitionResponse response = glueClient.getPartition(request); + validateGlueResponse(response); + Partition partition = response.partition(); + if (partition.hasValues() && + specSubset(partitionSpec.getPartitionSpec(), partition.storageDescriptor().parameters())) { + return partition; + } + } catch (EntityNotFoundException e) { + throw new PartitionNotExistException(catalogName, tablePath, partitionSpec); + } + return null; + } + + public boolean gluePartitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws CatalogException { + GetPartitionRequest request = GetPartitionRequest.builder().catalogId(getGlueCatalogId()) + .databaseName(tablePath.getDatabaseName()).tableName(tablePath.getObjectName()) + .build(); + try { + GetPartitionResponse response = glueClient.getPartition(request); + validateGlueResponse(response); + return response.partition().parameters().keySet().containsAll(partitionSpec.getPartitionSpec().keySet()); + } catch (EntityNotFoundException e) { + LOG.warn(String.format("%s is not found", partitionSpec.getPartitionSpec())); + } catch (GlueException e) { + throw new CatalogException(catalogName, e); + } + return false; + } + + private Column getGlueColumn(CatalogBaseTable catalogBaseTable, TableSchema tableSchema, String fieldName) throws CatalogException { Review Comment: It shows that `TableSchema` is deprecated. ``` This class has been deprecated as part of FLIP-164. It has been replaced by two more dedicated classes Schema and ResolvedSchema. Use Schema for declaration in APIs. ResolvedSchema is offered by the framework after resolution and validation. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
