guptashailesh92 commented on code in PR #47:
URL: 
https://github.com/apache/flink-connector-aws/pull/47#discussion_r1092312544


##########
flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueOperator.java:
##########
@@ -0,0 +1,1498 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue.util;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogFunctionImpl;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.FunctionLanguage;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.glue.GlueCatalogConfig;
+import org.apache.flink.table.catalog.glue.GlueCatalogOptions;
+import org.apache.flink.table.catalog.mapper.DatatypeMapper;
+import org.apache.flink.table.catalog.mapper.GlueDatatypeMapper;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.ManagedTableFactory;
+import org.apache.flink.table.resource.ResourceType;
+import org.apache.flink.table.types.DataType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.AlreadyExistsException;
+import software.amazon.awssdk.services.glue.model.BatchDeleteTableRequest;
+import software.amazon.awssdk.services.glue.model.BatchDeleteTableResponse;
+import software.amazon.awssdk.services.glue.model.Column;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.CreatePartitionRequest;
+import software.amazon.awssdk.services.glue.model.CreatePartitionResponse;
+import software.amazon.awssdk.services.glue.model.CreateTableRequest;
+import software.amazon.awssdk.services.glue.model.CreateTableResponse;
+import 
software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionRequest;
+import 
software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.Database;
+import software.amazon.awssdk.services.glue.model.DatabaseInput;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.DeletePartitionRequest;
+import software.amazon.awssdk.services.glue.model.DeletePartitionResponse;
+import software.amazon.awssdk.services.glue.model.DeleteTableRequest;
+import software.amazon.awssdk.services.glue.model.DeleteTableResponse;
+import 
software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionRequest;
+import 
software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import software.amazon.awssdk.services.glue.model.GetDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.GetDatabasesRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabasesResponse;
+import software.amazon.awssdk.services.glue.model.GetPartitionRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionResponse;
+import software.amazon.awssdk.services.glue.model.GetPartitionsRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionsResponse;
+import software.amazon.awssdk.services.glue.model.GetTableRequest;
+import software.amazon.awssdk.services.glue.model.GetTableResponse;
+import software.amazon.awssdk.services.glue.model.GetTablesRequest;
+import software.amazon.awssdk.services.glue.model.GetTablesResponse;
+import 
software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionRequest;
+import 
software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionResponse;
+import 
software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsRequest;
+import 
software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsResponse;
+import software.amazon.awssdk.services.glue.model.GlueException;
+import software.amazon.awssdk.services.glue.model.GlueResponse;
+import software.amazon.awssdk.services.glue.model.Partition;
+import software.amazon.awssdk.services.glue.model.PartitionInput;
+import software.amazon.awssdk.services.glue.model.PrincipalType;
+import software.amazon.awssdk.services.glue.model.ResourceUri;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.UpdateTableRequest;
+import software.amazon.awssdk.services.glue.model.UpdateTableResponse;
+import 
software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionRequest;
+import 
software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunction;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunctionInput;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
+
+/**
+ * Utilities for Glue catalog operations.
+ * Important Note : 
https://aws.amazon.com/premiumsupport/knowledge-center/glue-crawler-internal-service-exception/
+ */
+public class GlueOperator {
+
+    /**
+     * Defines the location URI for database.
+     * This locationUri is high level path for catalog
+     */
+    public final String locationUri;
+
+    /**
+     * Instance of AwsProperties which holds the configs related to configure 
glue and aws setup.
+     */
+    private final AwsProperties awsProperties;
+
+    /**
+     * http client for glue client.
+     * Current implementation for client is sync type.
+     */
+    private final GlueClient glueClient;
+
+    private final DatatypeMapper<DataType> datatypeMapper;
+
+    private final String catalogName;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(GlueOperator.class);
+
+    public GlueOperator(String locationUri, String catalogName, AwsProperties 
awsProperties, GlueClient glueClient) {
+        this.locationUri = locationUri;
+        this.awsProperties = awsProperties;
+        this.glueClient = glueClient;
+        this.datatypeMapper = new GlueDatatypeMapper<>();
+        this.catalogName = catalogName;
+    }
+
+    public void closeClient() {
+        glueClient.close();
+    }
+
+    // -------------- Database related operations.
+
+    /**
+     * List all databases present.
+     *
+     * @return List of fully qualified database names
+     */
+    public List<String> listGlueDatabases() throws CatalogException {
+        try {
+            GetDatabasesRequest.Builder databasesRequestBuilder =
+                    GetDatabasesRequest.builder()
+                            .catalogId(getGlueCatalogId());
+
+            GetDatabasesResponse response = 
glueClient.getDatabases(databasesRequestBuilder.build());
+            List<String> glueDatabases =
+                    response.databaseList().stream()
+                            .map(Database::name)
+                            .collect(Collectors.toList());
+            String dbResultNextToken = response.nextToken();
+            if (Optional.ofNullable(dbResultNextToken).isPresent()) {
+                do {
+                    databasesRequestBuilder.nextToken(dbResultNextToken);
+                    response = 
glueClient.getDatabases(databasesRequestBuilder.build());
+                    glueDatabases.addAll(
+                            response.databaseList().stream()
+                                    .map(Database::name)
+                                    .collect(Collectors.toList()));
+                    dbResultNextToken = response.nextToken();
+                } while (Optional.ofNullable(dbResultNextToken).isPresent());
+            }
+            return glueDatabases;
+        } catch (GlueException e) {
+            throw new 
CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e.getCause());
+        }
+    }
+
+    /**
+     * Create database in glue data catalog.
+     *
+     * @param databaseName fully qualified name of database.
+     * @param database     Instance of {@link CatalogDatabase}.
+     * @throws CatalogException              when unknown error from glue 
servers.
+     * @throws DatabaseAlreadyExistException when database exists already in 
glue data catalog.
+     */
+    public void createGlueDatabase(String databaseName, CatalogDatabase 
database)
+            throws CatalogException, DatabaseAlreadyExistException {
+
+        validateName(databaseName);
+        Map<String, String> properties = database.getProperties();
+        DatabaseInput.Builder databaseInputBuilder = 
DatabaseInput.builder().name(databaseName)
+                .description(database.getComment())
+                // update location and remove location from properties.
+                .locationUri(extractDatabaseLocation(properties, databaseName))
+                .parameters(properties);
+        CreateDatabaseRequest.Builder requestBuilder = 
CreateDatabaseRequest.builder()
+                .databaseInput(databaseInputBuilder.build())
+                .catalogId(getGlueCatalogId()).tags(getFlinkCatalogTag());
+        LOG.info(String.format("Database Properties Listing :- %s",
+                properties.entrySet().stream().map(e -> e.getKey() + 
e.getValue()).collect(Collectors.joining(","))));
+        try {
+            CreateDatabaseResponse response = 
glueClient.createDatabase(requestBuilder.build());
+            LOG.debug(getDebugLog(response));
+            LOG.info(String.format("%s Database created.", databaseName));
+        } catch (EntityNotFoundException e) {
+            throw new DatabaseAlreadyExistException(catalogName, databaseName, 
e);
+        } catch (GlueException e) {
+            throw new 
CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Extract location from database properties if present and remove 
location from properties.
+     * fallback to create default location if not present
+     *
+     * @param databaseProperties database properties.
+     * @param databaseName       fully qualified name for database.
+     * @return location for database.
+     */
+    private String extractDatabaseLocation(Map<String, String> 
databaseProperties, String databaseName) {
+        return databaseProperties.containsKey(GlueCatalogConfig.LOCATION_URI) ?
+                databaseProperties.remove(GlueCatalogConfig.LOCATION_URI) :
+                locationUri + GlueCatalogConfig.LOCATION_SEPARATOR + 
databaseName;
+    }
+
+    /**
+     * Create tag for flink in glue catalog for identification.
+     *
+     * @return Key/Value pair for tags
+     */
+    private Map<String, String> getFlinkCatalogTag() {
+        Map<String, String> tags = new HashMap<>();
+        tags.put("source", "flink_catalog");
+        return tags;
+    }
+
+    /**
+     * Delete a database from Glue data catalog only when database is empty.
+     *
+     * @param databaseName fully qualified name of database
+     * @throws CatalogException          Any Exception thrown due to glue error
+     * @throws DatabaseNotExistException when database doesn't exists in glue 
catalog.
+     */
+    public void dropGlueDatabase(String databaseName) throws CatalogException, 
DatabaseNotExistException {
+
+        validateName(databaseName);
+
+        DeleteDatabaseRequest deleteDatabaseRequest = 
DeleteDatabaseRequest.builder()
+                .name(databaseName).catalogId(getGlueCatalogId()).build();
+        try {
+            DeleteDatabaseResponse response = 
glueClient.deleteDatabase(deleteDatabaseRequest);
+            LOG.debug(getDebugLog(response));
+            LOG.info(String.format("Database Dropped %s", databaseName));
+        } catch (EntityNotFoundException e) {
+            throw new DatabaseNotExistException(catalogName, databaseName);
+        } catch (GlueException e) {
+            throw new CatalogException(catalogName, e);
+        }
+    }
+
+    /**
+     * Drops list of table in database from glue data catalog.
+     *
+     * @param databaseName fully qualified name of database
+     * @param tables       List of tables to remove from database.
+     * @throws CatalogException Any Exception thrown due to glue error
+     */
+    public void deleteTablesFromDatabase(String databaseName, 
Collection<String> tables) throws CatalogException {
+        validateName(databaseName);
+        BatchDeleteTableRequest batchTableRequest =
+                BatchDeleteTableRequest.builder()
+                        .databaseName(databaseName)
+                        .catalogId(getGlueCatalogId())
+                        .tablesToDelete(tables)
+                        .build();
+
+        try {
+            BatchDeleteTableResponse response = 
glueClient.batchDeleteTable(batchTableRequest);
+            if (response.hasErrors()) {
+                String errorMsg = String.format(
+                        "Glue Table errors:- %s",
+                        response.errors().stream()
+                                .map(
+                                        e ->
+                                                "Table: "
+                                                        + e.tableName()
+                                                        + "\nErrorDetail:  "
+                                                        + e.errorDetail()
+                                                        .errorMessage())
+                                .collect(Collectors.joining("\n")));
+                LOG.error(errorMsg);
+            }
+        } catch (GlueException e) {
+            throw new 
CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Drops list of user defined function in database from glue data catalog.
+     *
+     * @param databaseName fully qualified name of database
+     * @param functions    List of tables to remove from database.
+     * @throws CatalogException Any Exception thrown due to glue error
+     */
+    public void deleteFunctionsFromDatabase(String databaseName, 
Collection<String> functions)
+            throws CatalogException {
+        validateName(databaseName);
+        try {
+            DeleteUserDefinedFunctionRequest.Builder requestBuilder = 
DeleteUserDefinedFunctionRequest.builder()
+                    .databaseName(databaseName).catalogId(getGlueCatalogId());
+            for (String functionName : functions) {
+                requestBuilder.functionName(functionName);
+                DeleteUserDefinedFunctionResponse response = 
glueClient.deleteUserDefinedFunction(requestBuilder.build());
+                LOG.debug(getDebugLog(response));
+                validateGlueResponse(response);
+                LOG.info(String.format("Dropped Function %s", functionName));
+            }
+
+        } catch (GlueException e) {
+            LOG.error(String.format("Error deleting functions in database: 
%s", databaseName));
+            throw new 
CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+
+    /**
+     * Check if database is empty.
+     * i.e. it should not contain
+     * 1. table
+     * 2. functions
+     *
+     * @param databaseName name of database.
+     * @return boolean True/False based on the content of database.
+     * @throws CatalogException Any Exception thrown due to glue error
+     */
+    public boolean isDatabaseEmpty(String databaseName) throws 
CatalogException {
+        checkArgument(!isNullOrWhitespaceOnly(databaseName));
+        validateName(databaseName);
+        GetTablesRequest tablesRequest =
+                GetTablesRequest.builder()
+                        .catalogId(getGlueCatalogId())
+                        .databaseName(databaseName).maxResults(1)
+                        .build();
+        try {
+            GetTablesResponse response = glueClient.getTables(tablesRequest);
+            if (response.sdkHttpResponse().isSuccessful() && 
response.tableList().size() == 0) {
+                return listGlueFunctions(databaseName).size() == 0;
+            }
+            return false;
+        } catch (GlueException e) {
+            throw new 
CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+
+    }
+
+    /**
+     * Get a database from this glue data catalog.
+     *
+     * @param databaseName fully qualified name of database.
+     * @return Instance of {@link CatalogDatabase } .
+     * @throws DatabaseNotExistException when database doesn't exists in Glue 
data catalog.
+     * @throws CatalogException          when any unknown error occurs in glue.
+     */
+    public CatalogDatabase getDatabase(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+
+        validateName(databaseName);
+        GetDatabaseRequest getDatabaseRequest =
+                GetDatabaseRequest.builder()
+                        .name(databaseName)
+                        .catalogId(getGlueCatalogId())
+                        .build();
+        try {
+            GetDatabaseResponse response = 
glueClient.getDatabase(getDatabaseRequest);
+            LOG.debug(
+                    GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER + ": 
existing database. Client call response :- "
+                            + response.sdkHttpResponse().statusText());
+            validateGlueResponse(response);
+            return getCatalogDatabase(response.database());
+        } catch (EntityNotFoundException e) {
+            throw new DatabaseNotExistException(catalogName, databaseName);
+        } catch (GlueException e) {
+            throw new 
CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Build CatalogDatabase instance using information from glue Database.
+     *
+     * @param glueDatabase {@link Database }
+     * @return {@link CatalogDatabase } instance.
+     */
+    private CatalogDatabase getCatalogDatabase(Database glueDatabase) {
+        Map<String, String> properties = new 
HashMap<>(glueDatabase.parameters());
+
+        // retrieve location uri into properties
+        properties.put(GlueCatalogConfig.LOCATION_URI, 
glueDatabase.locationUri());
+        String comment = glueDatabase.description();
+        return new CatalogDatabaseImpl(properties, comment);
+    }
+
+    public void updateGlueDatabase(String databaseName, CatalogDatabase 
newDatabase)
+            throws CatalogException {
+
+        validateName(databaseName);
+        Map<String, String> properties = newDatabase.getProperties();
+        DatabaseInput.Builder databaseInputBuilder = DatabaseInput.builder()
+                .parameters(properties)
+                .description(newDatabase.getComment())
+                .name(databaseName)
+                .locationUri(extractDatabaseLocation(properties, 
databaseName));
+
+        UpdateDatabaseRequest updateRequest =
+                UpdateDatabaseRequest.builder()
+                        .databaseInput(databaseInputBuilder.build())
+                        .name(databaseName)
+                        .catalogId(getGlueCatalogId())
+                        .build();
+        UpdateDatabaseResponse response = 
glueClient.updateDatabase(updateRequest);
+        LOG.debug(getDebugLog(response));
+        LOG.info(String.format("Database Updated. %s", databaseName));
+        validateGlueResponse(response);
+    }
+
+    // -------------- Table related operations.
+
+    /**
+     * Create table in glue data catalog.
+     *
+     * @param tablePath    Fully qualified name of table. {@link ObjectPath}
+     * @param table        instance of {@link CatalogBaseTable} containing 
table related information.
+     * @param managedTable identifier if managed table.
+     * @throws CatalogException Any Exception thrown due to glue error
+     */
+    public void createGlueTable(final ObjectPath tablePath, final 
CatalogBaseTable table,
+                                final boolean managedTable) throws 
CatalogException {
+
+        checkNotNull(table);
+        checkNotNull(tablePath);
+        Map<String, String> properties = new HashMap<>(table.getOptions());
+        String tableOwner = extractTableOwner(properties);
+        if (managedTable) {
+            properties.put(CONNECTOR.key(), 
ManagedTableFactory.DEFAULT_IDENTIFIER);
+        }
+        Set<Column> glueColumns = getGlueColumnsFromCatalogTable(table);
+
+        // create StorageDescriptor for table
+        StorageDescriptor.Builder storageDescriptorBuilder = 
StorageDescriptor.builder()
+                .inputFormat(extractInputFormat(properties))
+                .outputFormat(extractOutputFormat(properties))
+                .location(extractTableLocation(properties, tablePath));
+
+        // create TableInput Builder with available information.
+        TableInput.Builder tableInputBuilder = TableInput.builder()
+                .name(tablePath.getObjectName())
+                .description(table.getComment())
+                .tableType(table.getTableKind().name())
+                .lastAccessTime(Instant.now())
+                .owner(tableOwner);
+
+        if 
(table.getTableKind().name().equalsIgnoreCase(CatalogBaseTable.TableKind.VIEW.name()))
 {
+            tableInputBuilder.viewExpandedText(getExpandedQuery(table));
+            tableInputBuilder.viewOriginalText(getOriginalQuery(table));
+        }
+
+        CreateTableRequest.Builder requestBuilder = 
CreateTableRequest.builder()
+                .catalogId(getGlueCatalogId())
+                .databaseName(tablePath.getDatabaseName());
+
+        if (table instanceof CatalogTable) {
+            CatalogTable catalogTable = (CatalogTable) table;
+            if (catalogTable.isPartitioned()) {
+                LOG.info("Catalog table is partitioned");
+                Collection<Column> partitionKeys = 
getPartitionKeys(catalogTable, glueColumns);
+                LOG.info("Partition columns are -> " +
+                        
partitionKeys.stream().map(Column::name).collect(Collectors.joining(",")));
+                tableInputBuilder.partitionKeys(partitionKeys);
+            }
+        }
+
+        try {
+            // apply storage descriptor and tableInput for request
+            storageDescriptorBuilder.columns(glueColumns);
+            
tableInputBuilder.storageDescriptor(storageDescriptorBuilder.build());
+            tableInputBuilder.parameters(properties);
+            requestBuilder.tableInput(tableInputBuilder.build());
+            CreateTableResponse response = 
glueClient.createTable(requestBuilder.build());
+            LOG.debug(getDebugLog(response));
+            validateGlueResponse(response);
+            LOG.info(String.format("Table created. %s", 
tablePath.getFullName()));
+
+        } catch (GlueException e) {
+            throw new 
CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    private String getExpandedQuery(CatalogBaseTable table) {
+        // todo complete it.
+        return "";
+    }
+
+    private String getOriginalQuery(CatalogBaseTable table) {
+        // todo complete it.
+        return "";
+    }
+
+    /**
+     * Extract table owner name and remove from properties.
+     *
+     * @param properties Map of properties.
+     * @return fully qualified owner name.
+     */
+    private String extractTableOwner(Map<String, String> properties) {
+        return properties.containsKey(GlueCatalogConfig.TABLE_OWNER) ?
+                properties.remove(GlueCatalogConfig.TABLE_OWNER) : null;
+    }
+
+    /**
+     * Build set of {@link Column} associated with table.
+     *
+     * @param catalogBaseTable instance of {@link CatalogBaseTable}.
+     * @return Set of Column
+     */
+    private Set<Column> getGlueColumnsFromCatalogTable(CatalogBaseTable 
catalogBaseTable) {
+        checkNotNull(catalogBaseTable);
+        TableSchema tableSchema = catalogBaseTable.getSchema();
+        return Arrays.stream(tableSchema.getFieldNames())
+                .map(fieldName -> getGlueColumn(catalogBaseTable, tableSchema, 
fieldName))
+                .collect(Collectors.toSet());
+    }
+
+    /**
+     * Extract location from database properties if present and remove 
location from properties.
+     * fallback to create default location if not present
+     *
+     * @param tableProperties table properties.
+     * @param tablePath       fully qualified object for table.
+     * @return location for table.
+     */
+    private String extractTableLocation(Map<String, String> tableProperties, 
ObjectPath tablePath) {
+        return tableProperties.containsKey(GlueCatalogConfig.LOCATION_URI) ?
+                tableProperties.remove(GlueCatalogConfig.LOCATION_URI) : 
locationUri + GlueCatalogConfig.LOCATION_SEPARATOR
+                + tablePath.getDatabaseName() + 
GlueCatalogConfig.LOCATION_SEPARATOR + tablePath.getObjectName();
+
+    }
+
+    /**
+     * Extract OutputFormat from properties if present and remove outputFormat 
from properties.
+     * fallback to default format if not present
+     *
+     * @param tableProperties Key/Value properties
+     * @return output Format.
+     */
+    private String extractOutputFormat(Map<String, String> tableProperties) {
+        return 
tableProperties.containsKey(GlueCatalogConfig.TABLE_OUTPUT_FORMAT) ? 
tableProperties.remove(GlueCatalogConfig.TABLE_OUTPUT_FORMAT)
+                : GlueCatalogOptions.OUTPUT_FORMAT.defaultValue();
+    }
+
+    /**
+     * Extract InputFormat from properties if present and remove inputFormat 
from properties.
+     * fallback to default format if not present
+     *
+     * @param tableProperties Key/Value properties
+     * @return input Format.
+     */
+    private String extractInputFormat(Map<String, String> tableProperties) {
+        return 
tableProperties.containsKey(GlueCatalogConfig.TABLE_INPUT_FORMAT) ?
+                tableProperties.remove(GlueCatalogConfig.TABLE_INPUT_FORMAT) : 
GlueCatalogOptions.INPUT_FORMAT.defaultValue();
+
+    }
+
+    /**
+     * Get list of filtered columns which are partition columns.
+     *
+     * @param catalogTable {@link CatalogTable} instance.
+     * @param columns      List of all column in table.
+     * @return List of column marked as partition key.
+     */
+    private Collection<Column> getPartitionKeys(CatalogTable catalogTable, 
Collection<Column> columns) {
+        Set<String> partitionKeys = new 
HashSet<>(catalogTable.getPartitionKeys());
+        return columns.stream().filter(column -> 
partitionKeys.contains(column.name())).collect(Collectors.toList());
+    }
+
+    /**
+     * @param tablePath    fully Qualified table path.
+     * @param newTable     instance of {@link CatalogBaseTable} containing 
information for table.
+     * @param managedTable identifier for managed table.
+     * @throws CatalogException Glue related exception.
+     */
+    public void alterGlueTable(ObjectPath tablePath, CatalogBaseTable 
newTable, boolean managedTable)
+            throws CatalogException {
+
+        Map<String, String> properties = new HashMap<>(newTable.getOptions());
+        String tableOwner = extractTableOwner(properties);
+
+        if (managedTable) {
+            properties.put(CONNECTOR.key(), 
ManagedTableFactory.DEFAULT_IDENTIFIER);
+        }
+
+        Set<Column> glueColumns = getGlueColumnsFromCatalogTable(newTable);
+
+        // create StorageDescriptor for table
+        StorageDescriptor.Builder storageDescriptorBuilder = 
StorageDescriptor.builder()
+                .inputFormat(extractInputFormat(properties))
+                .outputFormat(extractOutputFormat(properties))
+                .location(extractTableLocation(properties, tablePath))
+                .parameters(properties)
+                .columns(glueColumns);
+
+        // create TableInput Builder with available information.
+        TableInput.Builder tableInputBuilder = TableInput.builder()
+                .name(tablePath.getObjectName())
+                .description(newTable.getComment())
+                .tableType(newTable.getTableKind().name())
+                .lastAccessTime(Instant.now())
+                .owner(tableOwner);
+
+        UpdateTableRequest.Builder requestBuilder = 
UpdateTableRequest.builder().tableInput(tableInputBuilder.build())
+                .catalogId(getGlueCatalogId())
+                .databaseName(tablePath.getDatabaseName());
+
+        if (newTable instanceof CatalogTable) {
+            CatalogTable catalogTable = (CatalogTable) newTable;
+            if (catalogTable.isPartitioned()) {
+                tableInputBuilder.partitionKeys(getPartitionKeys(catalogTable, 
glueColumns));
+            }
+        }
+
+        // apply storage descriptor and tableInput for request
+        tableInputBuilder.storageDescriptor(storageDescriptorBuilder.build());
+        requestBuilder.tableInput(tableInputBuilder.build());
+
+        try {
+            UpdateTableResponse response = 
glueClient.updateTable(requestBuilder.build());
+            LOG.debug(getDebugLog(response));
+            validateGlueResponse(response);
+            LOG.info(String.format("Table updated. %s", 
tablePath.getFullName()));
+        } catch (GlueException e) {
+            throw new 
CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    private String getDebugLog(GlueResponse response) {
+        return String.format("Glue response : status = %s \n " +
+                        "Details = %s \nMetadataResponse = %s",
+                response.sdkHttpResponse().isSuccessful(), 
response.sdkHttpResponse().toString(), response.responseMetadata());
+    }
+
+    /**
+     * Get names of all tables or views under this database based on type 
identifier.
+     * An empty list is returned if none exists.
+     *
+     * @param databaseName fully qualified database name.
+     * @return a list of the names of all tables or views in this database 
based on type identifier.
+     * @throws CatalogException in case of any runtime exception
+     */
+    public List<String> getGlueTableList(String databaseName, String type) 
throws CatalogException {
+        GetTablesRequest.Builder tablesRequestBuilder =
+                GetTablesRequest.builder()
+                        .databaseName(databaseName)
+                        .catalogId(getGlueCatalogId());
+        GetTablesResponse response = 
glueClient.getTables(tablesRequestBuilder.build());
+        validateGlueResponse(response);
+        List<String> finalTableList = response.tableList().stream()
+                .filter(table -> table.tableType().equalsIgnoreCase(type))
+                .map(Table::name).collect(Collectors.toList());
+        String tableResultNextToken = response.nextToken();
+
+        if (Optional.ofNullable(tableResultNextToken).isPresent()) {
+            do {
+                // update token in requestBuilder to fetch next batch
+                tablesRequestBuilder.nextToken(tableResultNextToken);
+                response = glueClient.getTables(tablesRequestBuilder.build());
+                validateGlueResponse(response);
+                finalTableList.addAll(response.tableList().stream()
+                        .filter(table -> 
table.tableType().equalsIgnoreCase(type))
+                        .map(Table::name).collect(Collectors.toList()));
+                tableResultNextToken = response.nextToken();
+            } while (Optional.ofNullable(tableResultNextToken).isPresent());
+        }
+        return finalTableList;
+    }
+
+    /**
+     * Returns a {@link Table} identified by the given table Path. {@link 
ObjectPath}.
+     *
+     * @param tablePath Path of the table or view
+     * @return The requested table. Glue encapsulates whether table or view in 
its attribute called type.
+     * @throws TableNotExistException if the target does not exist
+     * @throws CatalogException       in case of any runtime exception
+     */
+    public Table getGlueTable(ObjectPath tablePath) throws 
TableNotExistException, CatalogException {
+        GetTableRequest tablesRequest =
+                GetTableRequest.builder()
+                        .databaseName(tablePath.getDatabaseName())
+                        .name(tablePath.getObjectName())
+                        .catalogId(getGlueCatalogId())
+                        .build();
+        try {
+            GetTableResponse response = glueClient.getTable(tablesRequest);
+            LOG.info(String.format("Glue table Found %s", 
response.table().name()));
+            validateGlueResponse(response);
+            return response.table();
+        } catch (EntityNotFoundException e) {
+            throw new TableNotExistException(catalogName, tablePath, e);
+        } catch (GlueException e) {
+            throw new 
CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+
+    }
+
+    /**
+     * Check if a table or view exists in glue data catalog.
+     *
+     * @param tablePath Path of the table or view
+     * @return true if the given table exists in the catalog false otherwise
+     * @throws CatalogException in case of any runtime exception
+     */
+    public boolean glueTableExists(ObjectPath tablePath) throws 
CatalogException {
+        try {
+            Table glueTable = getGlueTable(tablePath);
+            return glueTable != null && 
glueTable.name().equals(tablePath.getObjectName());
+        } catch (TableNotExistException e) {
+            LOG.warn(String.format("%s\nDatabase: %s Table: %s", 
GlueCatalogConfig.TABLE_NOT_EXISTS_IDENTIFIER,
+                    tablePath.getDatabaseName(), tablePath.getObjectName()));
+            return false;
+        } catch (CatalogException e) {
+            LOG.error(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, 
e.getCause());
+            throw new 
CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+
+    }
+
+    /**
+     * Modify an existing function. Function name should be handled in a 
case-insensitive way.
+     *
+     * @param functionPath path of function.
+     * @param newFunction  modified function.
+     * @throws CatalogException on runtime errors.
+     */
+    public void alterGlueFunction(ObjectPath functionPath, CatalogFunction 
newFunction) throws CatalogException {
+        UserDefinedFunctionInput functionInput = 
createFunctionInput(functionPath, newFunction);
+        UpdateUserDefinedFunctionRequest updateUserDefinedFunctionRequest = 
UpdateUserDefinedFunctionRequest.builder()
+                .functionName(functionPath.getObjectName())
+                .databaseName(functionPath.getDatabaseName())
+                .catalogId(getGlueCatalogId())
+                .functionInput(functionInput).build();
+        UpdateUserDefinedFunctionResponse response = 
glueClient.updateUserDefinedFunction(updateUserDefinedFunctionRequest);
+        validateGlueResponse(response);
+        LOG.info(String.format("Function altered. %s", 
functionPath.getFullName()));
+    }
+
+    /**
+     * Drop a table or view from glue data catalog.
+     *
+     * @param tablePath fully qualified table path
+     * @throws CatalogException on runtime errors.
+     */
+    public void dropGlueTable(ObjectPath tablePath) throws CatalogException {
+        DeleteTableRequest.Builder tableRequestBuilder =
+                DeleteTableRequest.builder()
+                        .databaseName(tablePath.getDatabaseName())
+                        .name(tablePath.getObjectName())
+                        .catalogId(getGlueCatalogId());
+        try {
+            DeleteTableResponse response = 
glueClient.deleteTable(tableRequestBuilder.build());
+            validateGlueResponse(response);
+            LOG.info(String.format("Dropped Table %s.", 
tablePath.getObjectName()));
+        } catch (GlueException e) {
+            throw new 
CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+
+    }
+
+    /**
+     * validate response from client call.
+     */
+    private void validateGlueResponse(GlueResponse response) {
+        if (!response.sdkHttpResponse().isSuccessful()) {
+            throw new 
CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER);
+        }
+    }
+
+    public Schema getSchemaFromGlueTable(Table glueTable) {
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        for (Column col : glueTable.storageDescriptor().columns()) {
+            schemaBuilder.column(col.name(), col.type());
+        }
+        return schemaBuilder.build();
+    }
+
+    // -------------- Function related operations.
+
+    /**
+     * Create a function. Function name should be handled in a 
case-insensitive way.
+     *
+     * @param functionPath path of the function
+     * @param function     Flink function to be created
+     * @throws CatalogException in case of any runtime exception
+     */
+    public void createGlueFunction(ObjectPath functionPath, CatalogFunction 
function)
+            throws CatalogException, FunctionAlreadyExistException {
+
+        UserDefinedFunctionInput functionInput = 
createFunctionInput(functionPath, function);
+        CreateUserDefinedFunctionRequest.Builder requestBuilder = 
CreateUserDefinedFunctionRequest.builder()
+                .databaseName(functionPath.getDatabaseName())
+                .catalogId(getGlueCatalogId())
+                .functionInput(functionInput);
+        try {
+            CreateUserDefinedFunctionResponse response = 
glueClient.createUserDefinedFunction(requestBuilder.build());
+            validateGlueResponse(response);
+            LOG.info(String.format("Function created. %s", 
functionPath.getFullName()));
+        } catch (AlreadyExistsException e) {
+            LOG.error(String.format("%s.%s already Exists. Function language 
of type: %s",
+                    functionPath.getDatabaseName(), 
functionPath.getObjectName(), function.getFunctionLanguage()));
+            throw new FunctionAlreadyExistException(catalogName, functionPath);
+        } catch (GlueException e) {
+            LOG.error("Error creating glue function.", e.getCause());
+            throw new 
CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    private UserDefinedFunctionInput createFunctionInput(ObjectPath 
functionPath, CatalogFunction function)
+            throws UnsupportedOperationException {
+        Collection<software.amazon.awssdk.services.glue.model.ResourceUri> 
resourceUris = new LinkedList<>();
+        for (org.apache.flink.table.resource.ResourceUri resourceUri : 
function.getFunctionResources()) {
+            switch (resourceUri.getResourceType()) {
+                case JAR:
+                case FILE:
+                case ARCHIVE:
+                    
resourceUris.add(ResourceUri.builder().resourceType(resourceUri.getResourceType().name())
+                            .uri(resourceUri.getUri()).build());
+                    break;
+                default:
+                    throw new UnsupportedOperationException("GlueCatalog 
supports only creating resources JAR/FILE or ARCHIVE.");
+            }
+
+        }
+        return 
UserDefinedFunctionInput.builder().functionName(functionPath.getObjectName())
+                .className(getGlueFunctionClassName(function))
+                .ownerType(PrincipalType.USER)
+                .ownerName(GlueCatalogConfig.FLINK_CATALOG)
+                .resourceUris(resourceUris)
+                .build();
+    }
+
+    /**
+     * Get the user defined function from glue Catalog. Function name should 
be handled in a case-insensitive way.
+     *
+     * @param functionPath path of the function
+     * @return the requested function
+     * @throws CatalogException in case of any runtime exception
+     */
+    public CatalogFunction getGlueFunction(ObjectPath functionPath) {
+        GetUserDefinedFunctionRequest request = 
GetUserDefinedFunctionRequest.builder()
+                .catalogId(getGlueCatalogId())
+                .databaseName(functionPath.getDatabaseName())
+                .functionName(functionPath.getObjectName())
+                .build();
+        GetUserDefinedFunctionResponse response = 
glueClient.getUserDefinedFunction(request);
+        validateGlueResponse(response);
+        UserDefinedFunction udf = response.userDefinedFunction();
+
+        List<org.apache.flink.table.resource.ResourceUri> resourceUris = new 
LinkedList<>();
+        for (ResourceUri resourceUri : udf.resourceUris()) {
+            resourceUris.add(new org.apache.flink.table.resource.ResourceUri(
+                    ResourceType.valueOf(resourceUri.resourceType().name()), 
resourceUri.uri()));
+        }
+
+        return new CatalogFunctionImpl(getCatalogFunctionClassName(udf), 
getFunctionalLanguage(udf), resourceUris);
+    }
+
+    public List<String> listGlueFunctions(String databaseName) {
+        GetUserDefinedFunctionsRequest.Builder functionsRequest = 
GetUserDefinedFunctionsRequest.builder()
+                .databaseName(databaseName).catalogId(getGlueCatalogId());
+
+        List<String> glueFunctions;
+        try {
+            GetUserDefinedFunctionsResponse functionsResponse = 
glueClient.getUserDefinedFunctions(functionsRequest.build());
+            String token = functionsResponse.nextToken();
+            glueFunctions = functionsResponse
+                    .userDefinedFunctions()
+                    .stream()
+                    .map(UserDefinedFunction::functionName)
+                    .collect(Collectors.toCollection(LinkedList::new));
+            if (Optional.ofNullable(token).isPresent()) {
+                do {
+                    functionsRequest.nextToken(token);
+                    functionsResponse = 
glueClient.getUserDefinedFunctions(functionsRequest.build());
+                    glueFunctions.addAll(functionsResponse
+                            .userDefinedFunctions()
+                            .stream()
+                            .map(UserDefinedFunction::functionName)
+                            
.collect(Collectors.toCollection(LinkedList::new)));
+                    token = functionsResponse.nextToken();
+                } while (Optional.ofNullable(token).isPresent());
+            }
+
+        } catch (GlueException e) {
+            throw new 
CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+        return glueFunctions;
+    }
+
+    public boolean glueFunctionExists(ObjectPath functionPath) {
+        GetUserDefinedFunctionRequest request = 
GetUserDefinedFunctionRequest.builder()
+                
.functionName(functionPath.getObjectName()).databaseName(functionPath.getDatabaseName())
+                .catalogId(getGlueCatalogId())
+                .build();
+
+        try {
+            GetUserDefinedFunctionResponse response = 
glueClient.getUserDefinedFunction(request);
+            validateGlueResponse(response);
+            return 
response.userDefinedFunction().functionName().equalsIgnoreCase(functionPath.getObjectName());
+        } catch (EntityNotFoundException e) {
+            LOG.warn(String.format("Entry not found for function %s.%s", 
functionPath.getObjectName(), functionPath.getDatabaseName()));
+            return false;
+        } catch (GlueException e) {
+            LOG.error(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+            throw new 
CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    public void dropGlueFunction(ObjectPath functionPath) throws 
CatalogException {
+        DeleteUserDefinedFunctionRequest request = 
DeleteUserDefinedFunctionRequest.builder()
+                .catalogId(getGlueCatalogId())
+                .functionName(functionPath.getObjectName())
+                .databaseName(functionPath.getDatabaseName())
+                .build();
+        DeleteUserDefinedFunctionResponse response = 
glueClient.deleteUserDefinedFunction(request);
+        validateGlueResponse(response);
+        LOG.info(String.format("Dropped Function. %s", 
functionPath.getFullName()));
+    }
+
+    // -------------- Partition related operations.
+
+    public void ensurePartitionedTable(ObjectPath tablePath, Table glueTable)
+            throws TableNotPartitionedException {
+        if (!glueTable.hasPartitionKeys()) {
+            throw new TableNotPartitionedException(catalogName, tablePath);
+        }
+    }
+
+    /**
+     * create partition in glue data catalog.
+     *
+     * @param glueTable        glue table
+     * @param partitionSpec    partition spec
+     * @param catalogPartition partition to add
+     */
+    public void createGluePartition(Table glueTable, CatalogPartitionSpec 
partitionSpec,
+                                    CatalogPartition catalogPartition) throws 
CatalogException, PartitionSpecInvalidException {
+
+        List<String> partCols = getColumnNames(glueTable.partitionKeys());
+        LOG.info(String.format("Partition Columns are : %s", String.join(", ", 
partCols)));
+        List<String> partitionValues =
+                getOrderedFullPartitionValues(
+                        partitionSpec,
+                        partCols,
+                        new ObjectPath(glueTable.databaseName(), 
glueTable.name()));
+
+        // validate partition values
+        for (int i = 0; i < partCols.size(); i++) {
+            if (isNullOrWhitespaceOnly(partitionValues.get(i))) {
+                throw new PartitionSpecInvalidException(
+                        catalogName,
+                        partCols,
+                        new ObjectPath(glueTable.databaseName(), 
glueTable.name()),
+                        partitionSpec);
+            }
+        }
+        StorageDescriptor.Builder sdBuilder = 
glueTable.storageDescriptor().toBuilder();
+        Map<String, String> partitionProperties = 
catalogPartition.getProperties();
+
+        sdBuilder.location(extractPartitionLocation(partitionProperties));
+        sdBuilder.parameters(partitionSpec.getPartitionSpec());
+        String comment = catalogPartition.getComment();
+        if (comment != null) {
+            partitionProperties.put(GlueCatalogConfig.COMMENT, comment);
+        }
+        PartitionInput.Builder partitionInput = PartitionInput.builder()
+                .parameters(partitionProperties)
+                .lastAccessTime(Instant.now())
+                .storageDescriptor(sdBuilder.build())
+                .values(partitionValues);
+        CreatePartitionRequest createPartitionRequest = 
CreatePartitionRequest.builder()
+                .partitionInput(partitionInput.build())
+                .catalogId(getGlueCatalogId())
+                .databaseName(glueTable.databaseName())
+                .tableName(glueTable.name())
+                .build();
+
+        try {
+            CreatePartitionResponse response = 
glueClient.createPartition(createPartitionRequest);
+            validateGlueResponse(response);
+            LOG.info("Partition Created.");
+        } catch (GlueException e) {
+            throw new 
CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    private String extractPartitionLocation(Map<String, String> properties) {
+        return properties.containsKey(GlueCatalogConfig.LOCATION_URI) ? 
properties.remove(GlueCatalogConfig.LOCATION_URI) : null;
+    }
+
+    /**
+     * Get column names from List of column.
+     */
+    private static List<String> getColumnNames(final List<Column> columns) {
+        return columns.stream().map(Column::name).collect(Collectors.toList());
+    }
+
+    /**
+     * Get a list of ordered partition values by re-arranging them based on 
the given list of
+     * partition keys. If the partition value is null, it'll be converted into 
default partition
+     * name.
+     *
+     * @param partitionSpec a partition spec.
+     * @param partitionKeys a list of partition keys.
+     * @param tablePath     path of the table to which the partition belongs.
+     * @return A list of partition values ordered according to partitionKeys.
+     * @throws PartitionSpecInvalidException thrown if partitionSpec and 
partitionKeys have
+     *                                       different sizes, or any key in 
partitionKeys doesn't exist in partitionSpec.
+     */
+    private List<String> getOrderedFullPartitionValues(
+            CatalogPartitionSpec partitionSpec, List<String> partitionKeys, 
ObjectPath tablePath)
+            throws PartitionSpecInvalidException {
+        Map<String, String> spec = partitionSpec.getPartitionSpec();
+        if (spec.size() != partitionKeys.size()) {
+            throw new PartitionSpecInvalidException(
+                    catalogName, partitionKeys, tablePath, partitionSpec);
+        }
+
+        List<String> values = new ArrayList<>(spec.size());
+        for (String key : partitionKeys) {
+            if (!spec.containsKey(key)) {
+                throw new PartitionSpecInvalidException(
+                        catalogName, partitionKeys, tablePath, partitionSpec);
+            } else {
+                String value = spec.get(key);
+                if (value == null) {
+                    value = GlueCatalogConfig.DEFAULT_PARTITION_NAME;
+                }
+                values.add(value);
+            }
+        }
+
+        return values;
+    }
+
+    /**
+     * Update glue table.
+     *
+     * @param tablePath     contains database name and table name.
+     * @param partitionSpec Existing partition information.
+     * @param newPartition  Partition information with new changes.
+     * @throws CatalogException Exception in failure.
+     */
+    public void alterGluePartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec,
+                                   CatalogPartition newPartition) throws 
CatalogException {
+        // todo has to implement
+    }
+
+    /**
+     * Get CatalogPartitionSpec of all partitions from glue data catalog.
+     *
+     * @param tablePath fully qualified table path.
+     * @return List of PartitionSpec
+     */
+    public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath) {
+
+        GetPartitionsRequest.Builder request =
+                GetPartitionsRequest.builder().catalogId(getGlueCatalogId())
+                        .databaseName(tablePath.getDatabaseName())
+                        .tableName(tablePath.getObjectName());
+        try {
+            GetPartitionsResponse response = 
glueClient.getPartitions(request.build());
+            validateGlueResponse(response);
+            List<CatalogPartitionSpec> finalPartitionsList = 
response.partitions().stream()
+                    
.map(this::getCatalogPartitionSpec).collect(Collectors.toList());
+            String partitionsResultNextToken = response.nextToken();
+            if (Optional.ofNullable(partitionsResultNextToken).isPresent()) {
+                do {
+                    // creating a new GetPartitionsResult using next token.
+                    request.nextToken(partitionsResultNextToken);
+                    response = glueClient.getPartitions(request.build());
+                    finalPartitionsList.addAll(response.partitions().stream()
+                            
.map(this::getCatalogPartitionSpec).collect(Collectors.toList()));
+                    partitionsResultNextToken = response.nextToken();
+                } while 
(Optional.ofNullable(partitionsResultNextToken).isPresent());
+            }
+
+            return finalPartitionsList;
+
+        } catch (GlueException e) {
+            throw new 
CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+
+    }
+
+    /**
+     * @param tablePath
+     * @param partitionSpec
+     * @throws CatalogException
+     * @throws TableNotExistException
+     * @throws PartitionSpecInvalidException
+     */
+    public void dropGluePartition(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec)
+            throws CatalogException {
+
+        try {
+            Table glueTable = getGlueTable(tablePath);
+            List<String> partCols = getColumnNames(glueTable.partitionKeys());
+            DeletePartitionRequest deletePartitionRequest = 
DeletePartitionRequest.builder()
+                    .catalogId(getGlueCatalogId())
+                    .databaseName(tablePath.getDatabaseName())
+                    .tableName(tablePath.getObjectName())
+                    
.partitionValues(getOrderedFullPartitionValues(partitionSpec, partCols, 
tablePath))
+                    .build();
+            DeletePartitionResponse response = 
glueClient.deletePartition(deletePartitionRequest);
+            validateGlueResponse(response);
+            LOG.info("Partition Dropped.");
+        } catch (TableNotExistException | PartitionSpecInvalidException e) {
+            e.printStackTrace();
+        } catch (GlueException e) {
+            throw new 
CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    public List<CatalogPartitionSpec> listGluePartitionsByFilter(ObjectPath 
tablePath, List<Expression> filters) {
+        String expressionString = filters.stream().map(x -> 
getExpressionString(x, new StringBuilder()))
+                .collect(Collectors.joining(GlueCatalogConfig.SPACE + 
GlueCatalogConfig.AND));
+        try {
+            GetPartitionsRequest request = 
GetPartitionsRequest.builder().databaseName(tablePath.getDatabaseName())
+                    .tableName(tablePath.getObjectName())
+                    .catalogId(getGlueCatalogId())
+                    .expression(expressionString)
+                    .build();
+            GetPartitionsResponse response = glueClient.getPartitions(request);
+            List<CatalogPartitionSpec> catalogPartitionSpecList = 
response.partitions().stream()
+                    
.map(this::getCatalogPartitionSpec).collect(Collectors.toList());
+            // GlueOperator.validateGlueResponse(response);
+            String nextToken = response.nextToken();
+            if (Optional.ofNullable(nextToken).isPresent()) {
+                do {
+                    // creating a new GetPartitionsResult using next token.
+                    request =
+                            
GetPartitionsRequest.builder().databaseName(tablePath.getDatabaseName())
+                                    .tableName(tablePath.getObjectName())
+                                    .catalogId(getGlueCatalogId())
+                                    .expression(expressionString)
+                                    .nextToken(nextToken)
+                                    .build();
+                    response = glueClient.getPartitions(request);
+                    
catalogPartitionSpecList.addAll(response.partitions().stream()
+                            
.map(this::getCatalogPartitionSpec).collect(Collectors.toList()));
+                    nextToken = response.nextToken();
+                } while (Optional.ofNullable(nextToken).isPresent());
+            }
+            return catalogPartitionSpecList;
+        } catch (GlueException e) {
+            throw new 
CatalogException(GlueCatalogConfig.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+
+    }
+
+    private String getExpressionString(Expression expression, StringBuilder 
sb) {
+
+        for (Expression childExpression : expression.getChildren()) {
+            if (childExpression.getChildren() != null && 
childExpression.getChildren().size() > 0) {
+                getExpressionString(childExpression, sb);
+            }
+        }
+        return sb.insert(0, expression.asSummaryString() + 
GlueCatalogConfig.SPACE +
+                GlueCatalogConfig.AND).toString();
+
+    }
+
+    public Partition getGluePartition(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
+            throws PartitionNotExistException {
+        try {
+            GetPartitionRequest request = 
GetPartitionRequest.builder().catalogId(getGlueCatalogId())
+                    
.databaseName(tablePath.getDatabaseName()).tableName(tablePath.getObjectName())
+                    .build();
+            GetPartitionResponse response = glueClient.getPartition(request);
+            validateGlueResponse(response);
+            Partition partition = response.partition();
+            if (partition.hasValues() &&
+                    specSubset(partitionSpec.getPartitionSpec(), 
partition.storageDescriptor().parameters())) {
+                return partition;
+            }
+        } catch (EntityNotFoundException e) {
+            throw new PartitionNotExistException(catalogName, tablePath, 
partitionSpec);
+        }
+        return null;
+    }
+
+    public boolean gluePartitionExists(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
+            throws CatalogException {
+        GetPartitionRequest request = 
GetPartitionRequest.builder().catalogId(getGlueCatalogId())
+                
.databaseName(tablePath.getDatabaseName()).tableName(tablePath.getObjectName())
+                .build();
+        try {
+            GetPartitionResponse response = glueClient.getPartition(request);
+            validateGlueResponse(response);
+            return 
response.partition().parameters().keySet().containsAll(partitionSpec.getPartitionSpec().keySet());
+        } catch (EntityNotFoundException e) {
+            LOG.warn(String.format("%s is not found", 
partitionSpec.getPartitionSpec()));
+        } catch (GlueException e) {
+            throw new CatalogException(catalogName, e);
+        }
+        return false;
+    }
+
+    private Column getGlueColumn(CatalogBaseTable catalogBaseTable, 
TableSchema tableSchema, String fieldName) throws CatalogException {

Review Comment:
   It shows that `TableSchema` is deprecated. 
   
   ```
   
   This class has been deprecated as part of FLIP-164. It has been replaced by 
two more dedicated classes Schema and ResolvedSchema. Use Schema for 
declaration in APIs. ResolvedSchema is offered by the framework after 
resolution and validation.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to