vahmed-hamdy commented on code in PR #47:
URL: 
https://github.com/apache/flink-connector-aws/pull/47#discussion_r1223084883


##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogOptions.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/** A collection of {@link ConfigOption} which is used in GlueCatalog. */
+public class GlueCatalogOptions extends CommonCatalogOptions {
+
+    public static final ConfigOption<String> DEFAULT_DATABASE =
+            ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY)
+                    .stringType()
+                    .defaultValue(GenericInMemoryCatalog.DEFAULT_DB);
+
+    public static final ConfigOption<String> INPUT_FORMAT =
+            ConfigOptions.key(GlueCatalogConstants.TABLE_INPUT_FORMAT)
+                    .stringType()
+                    .noDefaultValue();
+
+    public static final ConfigOption<String> OUTPUT_FORMAT =
+            ConfigOptions.key(GlueCatalogConstants.TABLE_OUTPUT_FORMAT)
+                    .stringType()
+                    .noDefaultValue();
+
+    public static final ConfigOption<String> GLUE_CATALOG_ENDPOINT =
+            ConfigOptions.key(AWSConfigConstants.GLUE_CATALOG_ENDPOINT)
+                    .stringType()
+                    .noDefaultValue();
+
+    public static final ConfigOption<String> GLUE_CATALOG_ID =
+            
ConfigOptions.key(AWSConfigConstants.GLUE_CATALOG_ID).stringType().noDefaultValue();
+
+    public static final ConfigOption<String> GLUE_ACCOUNT_ID =
+            
ConfigOptions.key(AWSConfigConstants.GLUE_ACCOUNT_ID).stringType().noDefaultValue();
+    public static final ConfigOption<String> CREDENTIAL_PROVIDER =

Review Comment:
   nit: inconsistent line spacing



##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueCatalogOptionsUtils.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.aws.table.util.AWSOptionUtils;
+import org.apache.flink.connector.aws.table.util.HttpClientOptionUtils;
+import org.apache.flink.connector.base.table.options.ConfigurationValidator;
+import org.apache.flink.connector.base.table.options.TableOptionsUtils;
+import org.apache.flink.table.catalog.glue.GlueCatalogOptions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/** Option Handler for Glue Catalog. */
+@Internal
+public class GlueCatalogOptionsUtils implements TableOptionsUtils, 
ConfigurationValidator {
+
+    /** Allowed Http Client Types. */
+    private static final String[] ALLOWED_GLUE_HTTP_CLIENTS =
+            new String[] {
+                AWSConfigConstants.CLIENT_TYPE_URLCONNECTION,

Review Comment:
   I think we only support apache and urlconnection, right?



##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueUtils.java:
##########
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue.util;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.FunctionLanguage;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.glue.GlueCatalogOptions;
+import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants;
+import org.apache.flink.table.types.DataType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.model.Column;
+import software.amazon.awssdk.services.glue.model.Database;
+import software.amazon.awssdk.services.glue.model.GlueResponse;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunction;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
+
+/** Utilities related glue Operation. */
+public class GlueUtils {
+
+    private static final Logger LOG = LoggerFactory.getLogger(GlueUtils.class);
+
+    /**
+     * Glue supports lowercase naming convention.
+     *
+     * @param name fully qualified name.
+     * @return modified name according to glue convention.
+     */
+    public static String getGlueConventionalName(String name) {
+
+        return name.toLowerCase(Locale.ROOT);
+    }
+
+    /**
+     * Extract location from database properties if present and remove 
location from properties.
+     * fallback to create default location if not present
+     *
+     * @param databaseProperties database properties.
+     * @param databaseName fully qualified name for database.
+     * @return location for database.
+     */
+    public static String extractDatabaseLocation(
+            final Map<String, String> databaseProperties,
+            final String databaseName,
+            final String catalogPath) {
+        if (databaseProperties.containsKey(GlueCatalogConstants.LOCATION_URI)) 
{
+            return 
databaseProperties.remove(GlueCatalogConstants.LOCATION_URI);
+        } else {
+            LOG.info("No location URI Set. Using Catalog Path as default");
+            return catalogPath + GlueCatalogConstants.LOCATION_SEPARATOR + 
databaseName;
+        }
+    }
+
+    /**
+     * Extract location from database properties if present and remove 
location from properties.
+     * fallback to create default location if not present
+     *
+     * @param tableProperties table properties.
+     * @param tablePath fully qualified object for table.
+     * @return location for table.
+     */
+    public static String extractTableLocation(
+            final Map<String, String> tableProperties,
+            final ObjectPath tablePath,
+            final String catalogPath) {
+        if (tableProperties.containsKey(GlueCatalogConstants.LOCATION_URI)) {
+            return tableProperties.remove(GlueCatalogConstants.LOCATION_URI);
+        } else {
+            return catalogPath
+                    + GlueCatalogConstants.LOCATION_SEPARATOR
+                    + tablePath.getDatabaseName()
+                    + GlueCatalogConstants.LOCATION_SEPARATOR
+                    + tablePath.getObjectName();
+        }
+    }
+
+    /**
+     * Build CatalogDatabase instance using information from glue Database 
instance.
+     *
+     * @param glueDatabase {@link Database }
+     * @return {@link CatalogDatabase } instance.
+     */
+    public static CatalogDatabase getCatalogDatabase(final Database 
glueDatabase) {
+        Map<String, String> properties = new 
HashMap<>(glueDatabase.parameters());
+        return new CatalogDatabaseImpl(properties, glueDatabase.description());
+    }
+
+    /**
+     * A Glue database name cannot be longer than 252 characters. The only 
acceptable characters are
+     * lowercase letters, numbers, and the underscore character. More details: 
<a
+     * 
href="https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html";>...</a>
+     *
+     * @param name name
+     */
+    public static void validate(String name) {
+        checkArgument(
+                name != null && 
GlueCatalogConstants.GLUE_DB_PATTERN.matcher(name).find(),
+                "Database name is not according to Glue Norms, "
+                        + "check here 
https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html";);
+    }
+
+    /** validate response from client call. */
+    public static void validateGlueResponse(GlueResponse response) {
+        if (response != null && !response.sdkHttpResponse().isSuccessful()) {
+            throw new 
CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER);
+        }
+    }
+
+    /**
+     * @param udf Instance of UserDefinedFunction
+     * @return ClassName for function
+     */
+    public static String getCatalogFunctionClassName(final UserDefinedFunction 
udf) {
+        validateUDFClassName(udf.className());
+        String[] splitedName = 
udf.className().split(GlueCatalogConstants.DEFAULT_SEPARATOR);
+        return splitedName[splitedName.length - 1];
+    }
+
+    /**
+     * Validates UDF class name from glue.
+     *
+     * @param name name of UDF.
+     */
+    private static void validateUDFClassName(final String name) {
+        checkArgument(!isNullOrWhitespaceOnly(name));
+
+        if (name.split(GlueCatalogConstants.DEFAULT_SEPARATOR).length
+                != GlueCatalogConstants.UDF_CLASS_NAME_SIZE) {
+            throw new ValidationException("Improper Classname");
+        }
+    }
+
+    /**
+     * Derive functionalLanguage from glue function name. Glue doesn't have 
any attribute to save
+     * the functionalLanguage Name. Thus, storing FunctionalLanguage in the 
name itself.
+     *
+     * @param glueFunction Function name from glue.
+     * @return Identifier for FunctionalLanguage.
+     */
+    public static FunctionLanguage getFunctionalLanguage(final 
UserDefinedFunction glueFunction) {
+        if 
(glueFunction.className().startsWith(GlueCatalogConstants.FLINK_JAVA_FUNCTION_PREFIX))
 {
+            return FunctionLanguage.JAVA;
+        } else if (glueFunction
+                .className()
+                
.startsWith(GlueCatalogConstants.FLINK_PYTHON_FUNCTION_PREFIX)) {
+            return FunctionLanguage.PYTHON;
+        } else if (glueFunction
+                .className()
+                .startsWith(GlueCatalogConstants.FLINK_SCALA_FUNCTION_PREFIX)) 
{
+            return FunctionLanguage.SCALA;
+        } else {
+            throw new CatalogException("Invalid Functional Language");
+        }
+    }
+
+    /**
+     * Get expanded Query from CatalogBaseTable.
+     *
+     * @param table Instance of catalogBaseTable.
+     * @return expandedQuery for Glue Table.
+     */
+    public static String getExpandedQuery(CatalogBaseTable table) {
+        // https://issues.apache.org/jira/browse/FLINK-31961
+        return "";
+    }
+
+    /**
+     * Get Original Query from CatalogBaseTable.
+     *
+     * @param table Instance of CatalogBaseTable.
+     * @return OriginalQuery for Glue Table.
+     */
+    public static String getOriginalQuery(CatalogBaseTable table) {
+        // https://issues.apache.org/jira/browse/FLINK-31961
+        return "";
+    }
+
+    /**
+     * Extract table owner name and remove from properties.
+     *
+     * @param properties Map of properties.
+     * @return fully qualified owner name.
+     */
+    public static String extractTableOwner(Map<String, String> properties) {
+        return properties.containsKey(GlueCatalogConstants.TABLE_OWNER)
+                ? properties.remove(GlueCatalogConstants.TABLE_OWNER)
+                : null;
+    }
+
+    /**
+     * Derive Instance of Glue Column from {@link CatalogBaseTable}.
+     *
+     * @param catalogBaseTable Instance of {@link CatalogBaseTable}.
+     * @param tableSchema TableSchema.
+     * @param fieldName name of {@link Column}.
+     * @return Instance of {@link Column}.
+     * @throws CatalogException Throws exception in case of failure.
+     */
+    public static Column getGlueColumn(
+            final CatalogBaseTable catalogBaseTable,
+            final TableSchema tableSchema,
+            final String fieldName)
+            throws CatalogException {
+        LOG.info("Getting glue column details.");
+        Optional<DataType> dataType = tableSchema.getFieldDataType(fieldName);
+        if (dataType.isPresent()) {
+            String glueDataType = dataType.get().toString();
+            return Column.builder()
+                    .comment(catalogBaseTable.getComment())
+                    .type(glueDataType)
+                    .name(fieldName)
+                    .build();
+        } else {
+            throw new CatalogException("DataType information missing from 
table schema");
+        }
+    }
+
+    /**
+     * Build set of {@link Column} associated with table.
+     *
+     * @param catalogBaseTable instance of {@link CatalogBaseTable}.
+     * @return Set of Column
+     */
+    public static Set<Column> getGlueColumnsFromCatalogTable(
+            final CatalogBaseTable catalogBaseTable) {
+        checkNotNull(catalogBaseTable);
+        TableSchema tableSchema = catalogBaseTable.getSchema();
+        return Arrays.stream(tableSchema.getFieldNames())
+                .map(fieldName -> getGlueColumn(catalogBaseTable, tableSchema, 
fieldName))
+                .collect(Collectors.toSet());
+    }
+
+    /**
+     * Extract InputFormat from properties if present and remove inputFormat 
from properties.
+     * fallback to default format if not present
+     *
+     * @param tableProperties Key/Value properties
+     * @return input Format.
+     */
+    public static String extractInputFormat(final Map<String, String> 
tableProperties) {
+        return 
tableProperties.containsKey(GlueCatalogConstants.TABLE_INPUT_FORMAT)
+                ? 
tableProperties.remove(GlueCatalogConstants.TABLE_INPUT_FORMAT)
+                : GlueCatalogOptions.INPUT_FORMAT.defaultValue();
+    }
+
+    /**
+     * Extract OutputFormat from properties if present and remove outputFormat 
from properties.
+     * fallback to default format if not present
+     *
+     * @param tableProperties Key/Value properties
+     * @return output Format.
+     */
+    public static String extractOutputFormat(Map<String, String> 
tableProperties) {
+        return 
tableProperties.containsKey(GlueCatalogConstants.TABLE_OUTPUT_FORMAT)
+                ? 
tableProperties.remove(GlueCatalogConstants.TABLE_OUTPUT_FORMAT)
+                : GlueCatalogOptions.OUTPUT_FORMAT.defaultValue();
+    }
+
+    /**
+     * Get list of filtered columns which are partition columns.
+     *
+     * @param catalogTable {@link CatalogTable} instance.
+     * @param columns List of all column in table.
+     * @return List of column marked as partition key.
+     */
+    public static Collection<Column> getPartitionKeys(
+            CatalogTable catalogTable, Collection<Column> columns) {
+        Set<String> partitionKeys = new 
HashSet<>(catalogTable.getPartitionKeys());
+        return columns.stream()
+                .filter(column -> partitionKeys.contains(column.name()))
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * check spec1 is subset of spec2.
+     *
+     * @param subsetProps Key/Value pair spec
+     * @param props Key/Value pair spec
+     * @return is spec1 is subset of spec2
+     */
+    public static boolean specSubset(Map<String, String> subsetProps, 
Map<String, String> props) {

Review Comment:
   nit: this method is not used!



##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/constants/GlueCatalogConstants.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue.constants;
+
+import org.apache.flink.table.catalog.glue.GlueCatalog;
+
+import java.util.regex.Pattern;
+
+/** Configs for catalog meta-objects in {@link GlueCatalog}. */
+public class GlueCatalogConstants {

Review Comment:
   Can we add `@Internal` annotation to class?



##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/operator/GlueDatabaseOperator.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue.operator;
+
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants;
+import org.apache.flink.table.catalog.glue.util.GlueUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.BatchDeleteTableRequest;
+import software.amazon.awssdk.services.glue.model.BatchDeleteTableResponse;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.Database;
+import software.amazon.awssdk.services.glue.model.DatabaseInput;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseResponse;
+import 
software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionRequest;
+import 
software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import software.amazon.awssdk.services.glue.model.GetDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.GetDatabasesRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabasesResponse;
+import software.amazon.awssdk.services.glue.model.GlueException;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseResponse;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/** Utilities for Glue catalog Database related operations. */
+public class GlueDatabaseOperator extends GlueOperator {

Review Comment:
   `@Internal` will leave the rest since this needs a quick scan holistically 



##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/operator/GlueTableOperator.java:
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue.operator;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants;
+import org.apache.flink.table.catalog.glue.util.GlueUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.Column;
+import software.amazon.awssdk.services.glue.model.CreateTableRequest;
+import software.amazon.awssdk.services.glue.model.CreateTableResponse;
+import software.amazon.awssdk.services.glue.model.DeleteTableRequest;
+import software.amazon.awssdk.services.glue.model.DeleteTableResponse;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import software.amazon.awssdk.services.glue.model.GetTableRequest;
+import software.amazon.awssdk.services.glue.model.GetTableResponse;
+import software.amazon.awssdk.services.glue.model.GetTablesRequest;
+import software.amazon.awssdk.services.glue.model.GetTablesResponse;
+import software.amazon.awssdk.services.glue.model.GlueException;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+import software.amazon.awssdk.services.glue.model.UpdateTableRequest;
+import software.amazon.awssdk.services.glue.model.UpdateTableResponse;
+
+import java.time.Instant;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Utilities for Glue Table related operations. */
+public class GlueTableOperator extends GlueOperator {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(GlueTableOperator.class);
+
+    public GlueTableOperator(String catalogName, GlueClient glueClient, String 
glueCatalogId) {
+        super(catalogName, glueClient, glueCatalogId);
+    }
+
+    /**
+     * Create table in glue data catalog.
+     *
+     * @param tablePath Fully qualified name of table. {@link ObjectPath}
+     * @param table instance of {@link CatalogBaseTable} containing table 
related information.
+     * @throws CatalogException Any Exception thrown due to glue error
+     */
+    public void createGlueTable(final ObjectPath tablePath, final 
CatalogBaseTable table)
+            throws CatalogException {
+
+        checkNotNull(table, "Table cannot be Null");
+        checkNotNull(tablePath, "TablePath cannot be Null");
+
+        final Map<String, String> tableProperties = new 
HashMap<>(table.getOptions());
+        String tableOwner = GlueUtils.extractTableOwner(tableProperties);
+        Set<Column> glueTableColumns = 
GlueUtils.getGlueColumnsFromCatalogTable(table);
+
+        StorageDescriptor.Builder storageDescriptorBuilder =
+                StorageDescriptor.builder()
+                        
.inputFormat(GlueUtils.extractInputFormat(tableProperties))
+                        
.outputFormat(GlueUtils.extractOutputFormat(tableProperties));
+
+        TableInput.Builder tableInputBuilder =
+                TableInput.builder()
+                        .name(tablePath.getObjectName())
+                        .description(table.getComment())
+                        .tableType(table.getTableKind().name())
+                        .lastAccessTime(Instant.now())
+                        .owner(tableOwner)
+                        .viewExpandedText(GlueUtils.getExpandedQuery(table))
+                        .viewOriginalText(GlueUtils.getOriginalQuery(table));
+
+        CreateTableRequest.Builder requestBuilder =
+                CreateTableRequest.builder()
+                        .catalogId(getGlueCatalogId())
+                        .databaseName(tablePath.getDatabaseName());
+
+        if (table instanceof CatalogTable) {
+            CatalogTable catalogTable = (CatalogTable) table;
+            if (catalogTable.isPartitioned()) {
+                LOG.info("Catalog table is partitioned");
+                Collection<Column> partitionKeys =
+                        GlueUtils.getPartitionKeys(catalogTable, 
glueTableColumns);
+                LOG.info(
+                        "Partition columns are -> "
+                                + partitionKeys.stream()
+                                        .map(Column::name)
+                                        .collect(Collectors.joining(",")));
+                tableInputBuilder.partitionKeys(partitionKeys);
+            }
+        }
+
+        try {
+            storageDescriptorBuilder.columns(glueTableColumns);
+            
tableInputBuilder.storageDescriptor(storageDescriptorBuilder.build());
+            tableInputBuilder.parameters(tableProperties);
+            requestBuilder.tableInput(tableInputBuilder.build());
+
+            CreateTableResponse response = 
glueClient.createTable(requestBuilder.build());
+            GlueUtils.validateGlueResponse(response);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(GlueUtils.getDebugLog(response));
+            }
+
+            LOG.info(String.format("Table created. %s", 
tablePath.getFullName()));
+        } catch (GlueException e) {
+            throw new 
CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Update Table in glue data catalog.
+     *
+     * @param tablePath fully Qualified Table Path.
+     * @param newTable instance of {@link CatalogBaseTable} containing 
information for table.
+     * @throws CatalogException Glue related exception.
+     */
+    public void alterGlueTable(ObjectPath tablePath, CatalogBaseTable newTable)
+            throws CatalogException {
+
+        Map<String, String> tableProperties = new 
HashMap<>(newTable.getOptions());
+        String tableOwner = GlueUtils.extractTableOwner(tableProperties);
+        Set<Column> glueColumns = 
GlueUtils.getGlueColumnsFromCatalogTable(newTable);
+
+        StorageDescriptor.Builder storageDescriptorBuilder =
+                StorageDescriptor.builder()
+                        
.inputFormat(GlueUtils.extractInputFormat(tableProperties))
+                        
.outputFormat(GlueUtils.extractOutputFormat(tableProperties))
+                        .parameters(tableProperties)
+                        .columns(glueColumns);
+
+        TableInput.Builder tableInputBuilder =
+                TableInput.builder()
+                        .name(tablePath.getObjectName())
+                        .description(newTable.getComment())
+                        .tableType(newTable.getTableKind().name())
+                        .lastAccessTime(Instant.now())
+                        .owner(tableOwner);
+
+        UpdateTableRequest.Builder requestBuilder =
+                UpdateTableRequest.builder()
+                        .tableInput(tableInputBuilder.build())
+                        .catalogId(getGlueCatalogId())
+                        .databaseName(tablePath.getDatabaseName());
+
+        if (newTable instanceof CatalogTable) {
+            CatalogTable catalogTable = (CatalogTable) newTable;
+            if (catalogTable.isPartitioned()) {
+                tableInputBuilder.partitionKeys(
+                        GlueUtils.getPartitionKeys(catalogTable, glueColumns));
+            }
+        }
+
+        tableInputBuilder.storageDescriptor(storageDescriptorBuilder.build());
+        requestBuilder.tableInput(tableInputBuilder.build());
+
+        try {
+            UpdateTableResponse response = 
glueClient.updateTable(requestBuilder.build());
+            LOG.debug(GlueUtils.getDebugLog(response));
+            GlueUtils.validateGlueResponse(response);
+            LOG.info(String.format("Table updated. %s", 
tablePath.getFullName()));
+        } catch (GlueException e) {
+            throw new 
CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Get names of all tables or views under this database based on type 
identifier. An empty list
+     * is returned if none exists.
+     *
+     * @param databaseName fully qualified database name.
+     * @return a list of the names of all tables or views in this database 
based on type identifier.
+     * @throws CatalogException in case of any runtime exception
+     */
+    public List<String> getGlueTableList(String databaseName, String type) 
throws CatalogException {
+        GetTablesRequest.Builder tablesRequestBuilder =
+                
GetTablesRequest.builder().databaseName(databaseName).catalogId(getGlueCatalogId());
+        GetTablesResponse response = 
glueClient.getTables(tablesRequestBuilder.build());
+        GlueUtils.validateGlueResponse(response);
+        List<String> finalTableList =
+                response.tableList().stream()
+                        .filter(table -> 
table.tableType().equalsIgnoreCase(type))
+                        .map(Table::name)
+                        .collect(Collectors.toList());
+        String tableResultNextToken = response.nextToken();
+
+        if (Optional.ofNullable(tableResultNextToken).isPresent()) {
+            do {
+                tablesRequestBuilder.nextToken(tableResultNextToken);
+                response = glueClient.getTables(tablesRequestBuilder.build());
+                GlueUtils.validateGlueResponse(response);
+                finalTableList.addAll(
+                        response.tableList().stream()
+                                .filter(table -> 
table.tableType().equalsIgnoreCase(type))
+                                .map(Table::name)
+                                .collect(Collectors.toList()));
+                tableResultNextToken = response.nextToken();
+            } while (Optional.ofNullable(tableResultNextToken).isPresent());
+        }
+        return finalTableList;
+    }
+
+    /**
+     * Returns a {@link Table} identified by the given Table Path. {@link 
ObjectPath}.
+     *
+     * @param tablePath Path of the table or view
+     * @return The requested table. Glue encapsulates whether table or view in 
its attribute called
+     *     type.
+     * @throws TableNotExistException if the target does not exist
+     * @throws CatalogException in case of any runtime exception
+     */
+    public Table getGlueTable(ObjectPath tablePath)
+            throws TableNotExistException, CatalogException {
+
+        checkNotNull(tablePath, "TablePath cannot be Null");
+
+        GetTableRequest tablesRequest =
+                GetTableRequest.builder()
+                        .databaseName(tablePath.getDatabaseName())
+                        .name(tablePath.getObjectName())
+                        .catalogId(getGlueCatalogId())
+                        .build();
+        try {
+            GetTableResponse response = glueClient.getTable(tablesRequest);
+            GlueUtils.validateGlueResponse(response);
+            return response.table();
+        } catch (EntityNotFoundException e) {
+            throw new TableNotExistException(catalogName, tablePath, e);
+        } catch (GlueException e) {
+            throw new 
CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Check if a table or view exists in glue data catalog.
+     *
+     * @param tablePath Path of the table or view
+     * @return true if the given table exists in the catalog false otherwise
+     * @throws CatalogException in case of any runtime exception
+     */
+    public boolean glueTableExists(ObjectPath tablePath) throws 
CatalogException {
+        try {
+            Table glueTable = getGlueTable(tablePath);
+            return glueTable != null && 
glueTable.name().equals(tablePath.getObjectName());
+        } catch (TableNotExistException e) {
+            LOG.warn(
+                    String.format(
+                            "%s\nDatabase: %s Table: %s",
+                            GlueCatalogConstants.TABLE_NOT_EXISTS_IDENTIFIER,
+                            tablePath.getDatabaseName(),
+                            tablePath.getObjectName()));
+            return false;
+        } catch (CatalogException e) {
+            throw new CatalogException(
+                    GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, 
e.getCause());
+        }
+    }
+
+    /**
+     * Drop a table or view from glue data catalog.
+     *
+     * @param tablePath fully qualified Table Path
+     * @throws CatalogException on runtime errors.
+     */
+    public void dropGlueTable(ObjectPath tablePath) throws CatalogException {
+        DeleteTableRequest.Builder tableRequestBuilder =
+                DeleteTableRequest.builder()
+                        .databaseName(tablePath.getDatabaseName())
+                        .name(tablePath.getObjectName())
+                        .catalogId(getGlueCatalogId());
+        try {
+            DeleteTableResponse response = 
glueClient.deleteTable(tableRequestBuilder.build());
+            GlueUtils.validateGlueResponse(response);
+            LOG.info(String.format("Dropped Table %s.", 
tablePath.getObjectName()));
+        } catch (GlueException e) {
+            throw new 
CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER, e);
+        }
+    }
+
+    /**
+     * Create a {@link CatalogTable} using all the information from {@link 
Table}.
+     *
+     * @param glueTable Instance of Table from glue Data catalog.
+     * @return {@link CatalogTable}.
+     */
+    public CatalogBaseTable getCatalogBaseTableFromGlueTable(Table glueTable) {
+
+        checkNotNull(glueTable, "Glue Table cannot be null");
+        Schema schemaInfo = GlueUtils.getSchemaFromGlueTable(glueTable);
+
+        List<String> partitionKeys =
+                
glueTable.partitionKeys().stream().map(Column::name).collect(Collectors.toList());
+        Map<String, String> properties = new HashMap<>(glueTable.parameters());
+
+        if (glueTable.owner() != null) {
+            properties.put(GlueCatalogConstants.TABLE_OWNER, 
glueTable.owner());
+        }
+
+        if (glueTable.storageDescriptor().hasParameters()) {
+            properties.putAll(glueTable.storageDescriptor().parameters());
+        }
+
+        if (glueTable.storageDescriptor().inputFormat() != null) {
+            properties.put(
+                    GlueCatalogConstants.TABLE_INPUT_FORMAT,
+                    glueTable.storageDescriptor().inputFormat());
+        }
+
+        if (glueTable.storageDescriptor().outputFormat() != null) {
+            properties.put(
+                    GlueCatalogConstants.TABLE_OUTPUT_FORMAT,
+                    glueTable.storageDescriptor().outputFormat());
+        }
+
+        if 
(glueTable.tableType().equals(CatalogBaseTable.TableKind.TABLE.name())) {
+            return CatalogTable.of(schemaInfo, glueTable.description(), 
partitionKeys, properties);
+        } else if 
(glueTable.tableType().equals(CatalogBaseTable.TableKind.VIEW.name())) {
+            return CatalogView.of(
+                    schemaInfo,
+                    glueTable.description(),
+                    glueTable.viewOriginalText(),
+                    glueTable.viewExpandedText(),
+                    properties);
+
+        } else {
+            throw new CatalogException("Unknown TableType.");
+        }
+    }
+
+    /**
+     * Rename glue table. Glue catalog don't support renaming table. For 
renaming in Flink, it has
+     * to be done in 3 step. 1. fetch existing table info from glue 2. Create 
a table with new-name
+     * and use properties of existing table 3. Delete existing table Note: 
This above steps are not
+     * Atomic in nature.
+     *
+     * <p>Associated issue :- <a 
href="https://issues.apache.org/jira/browse/FLINK-31926";>...</a>
+     *
+     * @param oldTablePath old table name
+     * @param newTablePath new renamed table
+     */
+    public void renameGlueTable(ObjectPath oldTablePath, ObjectPath 
newTablePath)

Review Comment:
   Should we throw UnsupportedOperation meanwhile, if we release before 
implementation, it would be very confusing for users.



##########
flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/config/AWSConfigConstants.java:
##########
@@ -154,6 +154,77 @@ public enum CredentialProvider {
     /** Read Request timeout for {@link SdkAsyncHttpClient}. */
     public static final String HTTP_CLIENT_READ_TIMEOUT_MILLIS = 
"aws.http-client.read-timeout";
 
+    /**
+     * The type of {@link software.amazon.awssdk.http.SdkHttpClient}. If set, 
all AWS clients will
+     * use this specified HTTP client. If not set, HTTP_CLIENT_TYPE_DEFAULT 
will be used. For
+     * specific types supported, see HTTP_CLIENT_TYPE_* defined below.
+     */
+    public static final String HTTP_CLIENT_TYPE = "http-client.type";
+
+    // ---- glue configs
+
+    /**
+     * Used to configure the connection timeout in milliseconds for {@link
+     * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag 
only works when
+     * {@link #HTTP_CLIENT_TYPE} is set to HTTP_CLIENT_TYPE_APACHE
+     *
+     * <p>For more details, see <a
+     * 
href="https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html";>...</a>
+     */
+    public static final String HTTP_CLIENT_CONNECTION_TIMEOUT_MS =
+            "http-client.connection-timeout-ms";

Review Comment:
   hmmm, do you think we need to make it consistent with existing http-client 
configs like `HTTP_CLIENT_READ_TIMEOUT_MILLIS` where prefix is 
`aws.http-cllient` I am aware this might need some rework of OptionsUtils 
implementations. Wdyt? can loop in Danny for a second opinion.



##########
flink-catalog-aws/flink-catalog-aws-glue/src/test/java/org/apache/flink/table/catalog/glue/DummyGlueClient.java:
##########
@@ -0,0 +1,241 @@
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectPath;
+
+import lombok.Getter;
+import lombok.Setter;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.core.exception.SdkClientException;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.GlueServiceClientConfiguration;
+import software.amazon.awssdk.services.glue.model.AlreadyExistsException;
+import software.amazon.awssdk.services.glue.model.BatchDeleteTableRequest;
+import software.amazon.awssdk.services.glue.model.BatchDeleteTableResponse;
+import 
software.amazon.awssdk.services.glue.model.ConcurrentModificationException;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.CreatePartitionRequest;
+import software.amazon.awssdk.services.glue.model.CreatePartitionResponse;
+import software.amazon.awssdk.services.glue.model.CreateTableRequest;
+import software.amazon.awssdk.services.glue.model.CreateTableResponse;
+import 
software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionRequest;
+import 
software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import software.amazon.awssdk.services.glue.model.GetDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.GetDatabasesRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabasesResponse;
+import software.amazon.awssdk.services.glue.model.GetPartitionRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionResponse;
+import software.amazon.awssdk.services.glue.model.GetTableRequest;
+import software.amazon.awssdk.services.glue.model.GetTableResponse;
+import software.amazon.awssdk.services.glue.model.GetTablesRequest;
+import software.amazon.awssdk.services.glue.model.GetTablesResponse;
+import 
software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionRequest;
+import 
software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionResponse;
+import 
software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsRequest;
+import 
software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsResponse;
+import software.amazon.awssdk.services.glue.model.GlueEncryptionException;
+import software.amazon.awssdk.services.glue.model.GlueException;
+import software.amazon.awssdk.services.glue.model.InternalServiceException;
+import software.amazon.awssdk.services.glue.model.InvalidInputException;
+import software.amazon.awssdk.services.glue.model.OperationTimeoutException;
+import software.amazon.awssdk.services.glue.model.ResourceNotReadyException;
+import 
software.amazon.awssdk.services.glue.model.ResourceNumberLimitExceededException;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.UpdatePartitionRequest;
+import software.amazon.awssdk.services.glue.model.UpdatePartitionResponse;
+
+/** Dummy Glue client for Test. */
+public class DummyGlueClient implements GlueClient {
+
+    @Getter @Setter public boolean isTableExists;
+
+    @Getter @Setter public boolean isPartitionedTable;
+
+    @Getter @Setter public String tableType;
+
+    @Getter @Setter public ObjectPath tablePath;
+
+    @Getter @Setter public ObjectPath functionPath;
+
+    @Getter @Setter public String databaseName;
+
+    @Getter @Setter public String warehousePath;

Review Comment:
   nit: Could just use `@Data` over the class instead of multiple getters and 
setters.



##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogOptions.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/** A collection of {@link ConfigOption} which is used in GlueCatalog. */
+public class GlueCatalogOptions extends CommonCatalogOptions {
+
+    public static final ConfigOption<String> DEFAULT_DATABASE =
+            ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY)
+                    .stringType()
+                    .defaultValue(GenericInMemoryCatalog.DEFAULT_DB);
+
+    public static final ConfigOption<String> INPUT_FORMAT =
+            ConfigOptions.key(GlueCatalogConstants.TABLE_INPUT_FORMAT)
+                    .stringType()
+                    .noDefaultValue();
+
+    public static final ConfigOption<String> OUTPUT_FORMAT =
+            ConfigOptions.key(GlueCatalogConstants.TABLE_OUTPUT_FORMAT)
+                    .stringType()
+                    .noDefaultValue();
+
+    public static final ConfigOption<String> GLUE_CATALOG_ENDPOINT =
+            ConfigOptions.key(AWSConfigConstants.GLUE_CATALOG_ENDPOINT)
+                    .stringType()
+                    .noDefaultValue();
+
+    public static final ConfigOption<String> GLUE_CATALOG_ID =
+            
ConfigOptions.key(AWSConfigConstants.GLUE_CATALOG_ID).stringType().noDefaultValue();
+
+    public static final ConfigOption<String> GLUE_ACCOUNT_ID =
+            
ConfigOptions.key(AWSConfigConstants.GLUE_ACCOUNT_ID).stringType().noDefaultValue();
+    public static final ConfigOption<String> CREDENTIAL_PROVIDER =
+            ConfigOptions.key(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)
+                    .stringType()
+                    .noDefaultValue();
+
+    public static final ConfigOption<String> HTTP_CLIENT_TYPE =
+            ConfigOptions.key(AWSConfigConstants.HTTP_CLIENT_TYPE)
+                    .stringType()
+                    .defaultValue(AWSConfigConstants.CLIENT_TYPE_APACHE);
+
+    public static final ConfigOption<String> REGION =
+            
ConfigOptions.key(AWSConfigConstants.AWS_REGION).stringType().noDefaultValue();
+
+    public static final ConfigOption<String> HTTP_PROTOCOL_VERSION =

Review Comment:
   nit: This is not needed as it is handled by `AWSOptionsUtils`



##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/factory/GlueCatalogFactory.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue.factory;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.glue.GlueCatalog;
+import org.apache.flink.table.catalog.glue.GlueCatalogOptions;
+import org.apache.flink.table.catalog.glue.util.GlueCatalogOptionsUtils;
+import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** Catalog factory for {@link GlueCatalog}. */
+public class GlueCatalogFactory implements CatalogFactory {

Review Comment:
   `@publicEvolving` or `@Public` could we double check all `public` classes 
are annotated?



##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/factory/GlueCatalogFactoryOptions.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue.factory;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.table.catalog.glue.GlueCatalog;
+import org.apache.flink.table.catalog.glue.GlueCatalogOptions;
+
+/** {@link ConfigOption}s for {@link GlueCatalog}. */
+@Internal
+public class GlueCatalogFactoryOptions {

Review Comment:
   What is difference between this and `GlueCatalogOptions`



##########
flink-connector-aws-base/pom.xml:
##########
@@ -88,6 +88,14 @@ under the License.
             <artifactId>flink-architecture-tests-test</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>

Review Comment:
   We shouldn't depend on glue for this module!
   Should only be needed for the catalog module!



##########
flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/config/AWSConfigConstants.java:
##########
@@ -154,6 +154,77 @@ public enum CredentialProvider {
     /** Read Request timeout for {@link SdkAsyncHttpClient}. */
     public static final String HTTP_CLIENT_READ_TIMEOUT_MILLIS = 
"aws.http-client.read-timeout";
 
+    /**
+     * The type of {@link software.amazon.awssdk.http.SdkHttpClient}. If set, 
all AWS clients will
+     * use this specified HTTP client. If not set, HTTP_CLIENT_TYPE_DEFAULT 
will be used. For
+     * specific types supported, see HTTP_CLIENT_TYPE_* defined below.
+     */
+    public static final String HTTP_CLIENT_TYPE = "http-client.type";
+
+    // ---- glue configs
+
+    /**
+     * Used to configure the connection timeout in milliseconds for {@link
+     * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag 
only works when
+     * {@link #HTTP_CLIENT_TYPE} is set to HTTP_CLIENT_TYPE_APACHE
+     *
+     * <p>For more details, see <a
+     * 
href="https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html";>...</a>
+     */
+    public static final String HTTP_CLIENT_CONNECTION_TIMEOUT_MS =
+            "http-client.connection-timeout-ms";
+
+    /**
+     * Used to configure the max connections number for {@link
+     * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag 
only works when
+     * {@link #HTTP_CLIENT_TYPE} is set to HTTP_CLIENT_TYPE_APACHE
+     *
+     * <p>For more details, see <a
+     * 
href="https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html";>...</a>
+     */
+    public static final String HTTP_CLIENT_APACHE_MAX_CONNECTIONS =
+            "http-client.apache.max-connections";
+
+    /**
+     * Used to configure the socket timeout in milliseconds for {@link
+     * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag 
only works when
+     * {@link #HTTP_CLIENT_TYPE} is set to HTTP_CLIENT_TYPE_APACHE
+     *
+     * <p>For more details, see <a
+     * 
href="https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html";>...</a>
+     */
+    public static final String HTTP_CLIENT_SOCKET_TIMEOUT_MS = 
"http-client.socket-timeout-ms";
+
+    /**
+     * Configure an alternative endpoint of the Glue service for GlueCatalog 
to access.
+     *
+     * <p>This could be used to use GlueCatalog with any glue-compatible 
metastore service that has
+     * a different endpoint
+     */
+    public static final String GLUE_CATALOG_ENDPOINT = "aws.glue.endpoint";
+
+    /**
+     * The ID of the Glue Data Catalog where the tables reside. If none is 
provided, Glue
+     * automatically uses the caller's AWS account ID by default.
+     *
+     * <p>For more details, see <a
+     * 
href="https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-catalog-databases.html";>...</a>
+     */
+    public static final String GLUE_CATALOG_ID = "aws.glue.id";
+
+    /**
+     * The account ID used in a Glue resource ARN, e.g.
+     * arn:aws:glue:us-east-1:1000000000000:table/db1/table1
+     */
+    public static final String GLUE_ACCOUNT_ID = "aws.glue.account-id";
+

Review Comment:
   These are the only configs that are glue specific. We have kept this package 
abstract as possible, we can move those to the glue catalog package



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to