dannycranmer commented on code in PR #47:
URL: 
https://github.com/apache/flink-connector-aws/pull/47#discussion_r1147301118


##########
flink-catalog-aws-glue/pom.xml:
##########
@@ -0,0 +1,120 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-connector-aws-parent</artifactId>
+        <version>4.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>flink-catalog-aws-glue</artifactId>
+    <name>Flink : Catalog : AWS : Glue</name>

Review Comment:
   ```suggestion
       <name>Flink : Catalog : AWS : Glue Data Catalog</name>
   ```



##########
flink-catalog-aws-glue/pom.xml:
##########
@@ -0,0 +1,120 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-connector-aws-parent</artifactId>
+        <version>4.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>flink-catalog-aws-glue</artifactId>
+    <name>Flink : Catalog : AWS : Glue</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-aws-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>glue</artifactId>
+            <version>${aws.sdkv2.version}</version>

Review Comment:
   You can remove this line as it should be managed by the bom



##########
flink-catalog-aws-glue/pom.xml:
##########
@@ -0,0 +1,120 @@
+<?xml version="1.0" encoding="UTF-8"?>

Review Comment:
   With the formats we have started to add parent modules to split 
connectors/formats, we should do the same for catalog. Can you move beneath 
`flink-catalog-aws` like so  
https://github.com/apache/flink-connector-aws/tree/main/flink-formats-aws



##########
flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java:
##########
@@ -0,0 +1,1030 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionImpl;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedCatalogView;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.glue.util.AwsClientFactories;
+import org.apache.flink.table.catalog.glue.util.AwsProperties;
+import org.apache.flink.table.catalog.glue.util.GlueOperator;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.FunctionIdentifier;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.Partition;
+import software.amazon.awssdk.services.glue.model.Table;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A Glue catalog implementation that uses glue catalog. */
+public class GlueCatalog extends AbstractCatalog {

Review Comment:
   Missing support annotation. This one should probably be `@PublicEvolving`. 
Please check all files



##########
flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueOperator.java:
##########
@@ -0,0 +1,1639 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue.util;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogFunctionImpl;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.FunctionLanguage;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.glue.GlueCatalogConstants;
+import org.apache.flink.table.catalog.glue.GlueCatalogOptions;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.ManagedTableFactory;
+import org.apache.flink.table.resource.ResourceType;
+import org.apache.flink.table.types.DataType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.AlreadyExistsException;
+import software.amazon.awssdk.services.glue.model.BatchDeleteTableRequest;
+import software.amazon.awssdk.services.glue.model.BatchDeleteTableResponse;
+import software.amazon.awssdk.services.glue.model.Column;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.CreatePartitionRequest;
+import software.amazon.awssdk.services.glue.model.CreatePartitionResponse;
+import software.amazon.awssdk.services.glue.model.CreateTableRequest;
+import software.amazon.awssdk.services.glue.model.CreateTableResponse;
+import 
software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionRequest;
+import 
software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.Database;
+import software.amazon.awssdk.services.glue.model.DatabaseInput;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.DeletePartitionRequest;
+import software.amazon.awssdk.services.glue.model.DeletePartitionResponse;
+import software.amazon.awssdk.services.glue.model.DeleteTableRequest;
+import software.amazon.awssdk.services.glue.model.DeleteTableResponse;
+import 
software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionRequest;
+import 
software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import software.amazon.awssdk.services.glue.model.GetDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.GetDatabasesRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabasesResponse;
+import software.amazon.awssdk.services.glue.model.GetPartitionRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionResponse;
+import software.amazon.awssdk.services.glue.model.GetPartitionsRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionsResponse;
+import software.amazon.awssdk.services.glue.model.GetTableRequest;
+import software.amazon.awssdk.services.glue.model.GetTableResponse;
+import software.amazon.awssdk.services.glue.model.GetTablesRequest;
+import software.amazon.awssdk.services.glue.model.GetTablesResponse;
+import 
software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionRequest;
+import 
software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionResponse;
+import 
software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsRequest;
+import 
software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsResponse;
+import software.amazon.awssdk.services.glue.model.GlueException;
+import software.amazon.awssdk.services.glue.model.GlueResponse;
+import software.amazon.awssdk.services.glue.model.Partition;
+import software.amazon.awssdk.services.glue.model.PartitionInput;
+import software.amazon.awssdk.services.glue.model.PrincipalType;
+import software.amazon.awssdk.services.glue.model.ResourceUri;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.UpdateTableRequest;
+import software.amazon.awssdk.services.glue.model.UpdateTableResponse;
+import 
software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionRequest;
+import 
software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunction;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunctionInput;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
+
+/**
+ * Utilities for Glue catalog operations. Important Note :
+ * 
https://aws.amazon.com/premiumsupport/knowledge-center/glue-crawler-internal-service-exception/
+ */
+public class GlueOperator {

Review Comment:
   This class is huge and I wonder if it is worth splitting into smaller 
classes? We could split by Database/Table/etc?



##########
flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogConstants.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.table.catalog.glue.util.AwsClientFactory;
+
+import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+
+import java.util.regex.Pattern;
+
+/** Configs for catalog meta-objects in {@link GlueCatalog}. */
+public class GlueCatalogConstants {
+
+    public static final String COMMENT = "comment";
+    public static final String DEFAULT_SEPARATOR = ":";
+    public static final String LOCATION_SEPARATOR = "/";
+    public static final String LOCATION_URI = "location-uri";
+    public static final String AND = "and";
+    public static final String NEXT_LINE = "\n";
+    public static final String SPACE = " ";
+
+    public static final String TABLE_OWNER = "owner";
+    public static final String TABLE_INPUT_FORMAT = "table.input.format";
+    public static final String TABLE_OUTPUT_FORMAT = "table.output.format";
+
+    public static final String FLINK_SCALA_FUNCTION_PREFIX = "flink:scala:";
+    public static final String FLINK_PYTHON_FUNCTION_PREFIX = "flink:python:";
+    public static final String FLINK_JAVA_FUNCTION_PREFIX = "flink:java:";
+
+    public static final String FLINK_CATALOG = "FLINK_CATALOG";
+
+    public static final Pattern GLUE_DB_PATTERN = 
Pattern.compile("^[a-z0-9_]{1,252}$");
+    public static final String GLUE_EXCEPTION_MSG_IDENTIFIER = "GLUE 
EXCEPTION";
+    public static final String TABLE_NOT_EXISTS_IDENTIFIER = "TABLE DOESN'T 
EXISTS";
+    public static final String DEFAULT_PARTITION_NAME = 
"__GLUE_DEFAULT_PARTITION__";
+
+    // ---- glue configs
+    /**
+     * The type of {@link software.amazon.awssdk.http.SdkHttpClient} 
implementation used by {@link
+     * AwsClientFactory} If set, all AWS clients will use this specified HTTP 
client. If not set,
+     * HTTP_CLIENT_TYPE_DEFAULT will be used. For specific types supported, 
see HTTP_CLIENT_TYPE_*
+     * defined below.
+     */
+    public static final String HTTP_CLIENT_TYPE = "http-client.type";
+
+    /**
+     * Used to configure the connection acquisition timeout in milliseconds 
for {@link
+     * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag 
only works when
+     * {@link #HTTP_CLIENT_TYPE} is set to HTTP_CLIENT_TYPE_APACHE
+     *
+     * <p>For more details, see
+     * 
https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html
+     */
+    public static final String 
HTTP_CLIENT_APACHE_CONNECTION_ACQUISITION_TIMEOUT_MS =
+            "http-client.apache.connection-acquisition-timeout-ms";
+
+    /**
+     * If Glue should skip name validations It is recommended to stick to Glue 
best practice in
+     * https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html 
to make sure operations
+     * are Hive compatible. This is only added for users that have existing 
conventions using
+     * non-standard characters. When database name and table name validation 
are skipped, there is
+     * no guarantee that downstream systems would all support the names.
+     */
+    public static final String GLUE_CATALOG_SKIP_NAME_VALIDATION = 
"glue.skip-name-validation";
+
+    /**
+     * Used to configure the connection max idle time in milliseconds for 
{@link
+     * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag 
only works when
+     * {@link #HTTP_CLIENT_TYPE} is set to HTTP_CLIENT_TYPE_APACHE
+     *
+     * <p>For more details, see
+     * 
https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html
+     */
+    public static final String HTTP_CLIENT_APACHE_CONNECTION_MAX_IDLE_TIME_MS =
+            "http-client.apache.connection-max-idle-time-ms";
+
+    /**
+     * Used to configure the connection time to live in milliseconds for {@link
+     * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag 
only works when
+     * {@link #HTTP_CLIENT_TYPE} is set to HTTP_CLIENT_TYPE_APACHE
+     *
+     * <p>For more details, see
+     * 
https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html
+     */
+    public static final String HTTP_CLIENT_APACHE_CONNECTION_TIME_TO_LIVE_MS =
+            "http-client.apache.connection-time-to-live-ms";
+
+    /**
+     * Used to configure the connection timeout in milliseconds for {@link
+     * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag 
only works when
+     * {@link #HTTP_CLIENT_TYPE} is set to HTTP_CLIENT_TYPE_APACHE
+     *
+     * <p>For more details, see
+     * 
https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html
+     */
+    public static final String HTTP_CLIENT_APACHE_CONNECTION_TIMEOUT_MS =

Review Comment:
   All of the AWS SDK/client specific things should ideally go in the AWS base 
module so they can be reused across projects. We are aiming for consistent 
configuration keys across AWS integrations. Can you please refactor?



##########
flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java:
##########
@@ -0,0 +1,1030 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionImpl;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedCatalogView;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.glue.util.AwsClientFactories;
+import org.apache.flink.table.catalog.glue.util.AwsProperties;
+import org.apache.flink.table.catalog.glue.util.GlueOperator;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.FunctionIdentifier;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.Partition;
+import software.amazon.awssdk.services.glue.model.Table;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A Glue catalog implementation that uses glue catalog. */
+public class GlueCatalog extends AbstractCatalog {
+
+    /** instance of GlueOperator to facilitate glue related actions. */
+    public GlueOperator glueOperator;
+
+    /** Default database name if not passed as part of catalog. */
+    public static final String DEFAULT_DB = "default";
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(GlueCatalog.class);
+
+    public GlueCatalog(String catalogName, String databaseName, ReadableConfig 
glueProperties) {
+        super(catalogName, databaseName);
+        checkNotNull(glueProperties);
+        // setLocationUri for the database level
+        String locationUri = "";
+        if 
(glueProperties.getOptional(GlueCatalogOptions.LOCATION_KEY).isPresent()) {
+            locationUri = 
glueProperties.getOptional(GlueCatalogOptions.LOCATION_KEY).get();
+        }
+        // initialize aws client factories
+        AwsProperties awsProperties = new AwsProperties(glueProperties);
+
+        // create glue client
+        GlueClient glueClient = 
AwsClientFactories.factory(awsProperties).glue();
+        this.glueOperator = new GlueOperator(locationUri, getName(), 
awsProperties, glueClient);
+    }
+
+    /**
+     * Open the catalog. Used for any required preparation in initialization 
phase.
+     *
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public void open() throws CatalogException {}
+
+    /**
+     * Close the catalog when it is no longer needed and release any resource 
that it might be
+     * holding.
+     *
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public void close() throws CatalogException {
+        try {
+            glueOperator.closeClient();
+        } catch (Exception e) {
+            LOG.warn("Glue Client is not closed properly!");

Review Comment:
   1. Please include the error in the log `LOG.warn("..", e);`
   2. Do we not want to rethrow this here? `throw new CatalogException("...", 
e);`



##########
flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java:
##########
@@ -0,0 +1,1030 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionImpl;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedCatalogView;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.glue.util.AwsClientFactories;
+import org.apache.flink.table.catalog.glue.util.AwsProperties;
+import org.apache.flink.table.catalog.glue.util.GlueOperator;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.FunctionIdentifier;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.Partition;
+import software.amazon.awssdk.services.glue.model.Table;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A Glue catalog implementation that uses glue catalog. */
+public class GlueCatalog extends AbstractCatalog {
+
+    /** instance of GlueOperator to facilitate glue related actions. */
+    public GlueOperator glueOperator;
+
+    /** Default database name if not passed as part of catalog. */
+    public static final String DEFAULT_DB = "default";
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(GlueCatalog.class);
+
+    public GlueCatalog(String catalogName, String databaseName, ReadableConfig 
glueProperties) {
+        super(catalogName, databaseName);
+        checkNotNull(glueProperties);
+        // setLocationUri for the database level
+        String locationUri = "";
+        if 
(glueProperties.getOptional(GlueCatalogOptions.LOCATION_KEY).isPresent()) {
+            locationUri = 
glueProperties.getOptional(GlueCatalogOptions.LOCATION_KEY).get();
+        }
+        // initialize aws client factories

Review Comment:
   Please review the guidance on comments 
https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#comments
   
   These type of comments do not add any extra context/info and can be removed



##########
flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsClientFactories.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue.util;
+
+import org.apache.flink.configuration.ReadableConfig;
+
+import software.amazon.awssdk.services.glue.GlueClient;
+
+/** Default factories. */
+public class AwsClientFactories {
+
+    private AwsClientFactories() {}
+
+    public static AwsClientFactory factory(AwsProperties properties) {
+        return new DefaultAwsClientFactory(properties);
+    }
+
+    static class DefaultAwsClientFactory implements AwsClientFactory {
+
+        /** instance that holds provides. */
+        private AwsProperties awsProperties;
+
+        DefaultAwsClientFactory(AwsProperties properties) {
+            awsProperties = properties;
+        }
+
+        @Override
+        public GlueClient glue() {
+            return GlueClient.builder()

Review Comment:
   How about credential providers? Can we use the util classes 
[#1](https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSClientUtil.java),
 
[#2](https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java)
 to build clients and credential providers so the 
configuration/options/features is consistent/reused for all integrations?



##########
flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsClientFactory.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue.util;
+
+import org.apache.flink.configuration.ReadableConfig;
+
+import software.amazon.awssdk.services.glue.GlueClient;
+
+/**
+ * Interface to customize AWS clients used by Flink. A custom factory must 
have a no-arg.
+ * constructor, and use {@link #initialize(ReadableConfig)} to initialize the 
factory.

Review Comment:
   Interesting idea, I like this. Can we extract anything common to aws-base 
for reuse with other integrations?



##########
flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java:
##########
@@ -0,0 +1,1030 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionImpl;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedCatalogView;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.glue.util.AwsClientFactories;
+import org.apache.flink.table.catalog.glue.util.AwsProperties;
+import org.apache.flink.table.catalog.glue.util.GlueOperator;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.FunctionIdentifier;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.Partition;
+import software.amazon.awssdk.services.glue.model.Table;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A Glue catalog implementation that uses glue catalog. */
+public class GlueCatalog extends AbstractCatalog {
+
+    /** instance of GlueOperator to facilitate glue related actions. */
+    public GlueOperator glueOperator;
+
+    /** Default database name if not passed as part of catalog. */
+    public static final String DEFAULT_DB = "default";
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(GlueCatalog.class);
+
+    public GlueCatalog(String catalogName, String databaseName, ReadableConfig 
glueProperties) {
+        super(catalogName, databaseName);
+        checkNotNull(glueProperties);
+        // setLocationUri for the database level
+        String locationUri = "";
+        if 
(glueProperties.getOptional(GlueCatalogOptions.LOCATION_KEY).isPresent()) {
+            locationUri = 
glueProperties.getOptional(GlueCatalogOptions.LOCATION_KEY).get();
+        }
+        // initialize aws client factories
+        AwsProperties awsProperties = new AwsProperties(glueProperties);
+
+        // create glue client
+        GlueClient glueClient = 
AwsClientFactories.factory(awsProperties).glue();
+        this.glueOperator = new GlueOperator(locationUri, getName(), 
awsProperties, glueClient);
+    }
+
+    /**
+     * Open the catalog. Used for any required preparation in initialization 
phase.
+     *
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public void open() throws CatalogException {}
+
+    /**
+     * Close the catalog when it is no longer needed and release any resource 
that it might be
+     * holding.
+     *
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public void close() throws CatalogException {
+        try {
+            glueOperator.closeClient();
+        } catch (Exception e) {
+            LOG.warn("Glue Client is not closed properly!");
+        }
+    }
+
+    // ------ databases ------
+
+    /**
+     * Create a database.
+     *
+     * @param name Name of the database to be created
+     * @param database The database definition
+     * @param ignoreIfExists Flag to specify behavior when a database with the 
given name already
+     *     exists: if set to false, throw a DatabaseAlreadyExistException, if 
set to true, do
+     *     nothing.
+     * @throws DatabaseAlreadyExistException if the given database already 
exists and ignoreIfExists
+     *     is false
+     * @throws CatalogException in case of any runtime exception
+     */
+    @Override
+    public void createDatabase(String name, CatalogDatabase database, boolean 
ignoreIfExists)
+            throws DatabaseAlreadyExistException, CatalogException {
+
+        checkArgument(!StringUtils.isNullOrWhitespaceOnly(name));
+        checkNotNull(database, "Database cannot be null.");
+
+        // glue supports lowercase naming convention
+        name = name.toLowerCase(Locale.ROOT);

Review Comment:
   Suggest extracting this to a helper method since it is repeated 



##########
flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueOperator.java:
##########
@@ -0,0 +1,1639 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue.util;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogFunctionImpl;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.FunctionLanguage;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.glue.GlueCatalogConstants;
+import org.apache.flink.table.catalog.glue.GlueCatalogOptions;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.ManagedTableFactory;
+import org.apache.flink.table.resource.ResourceType;
+import org.apache.flink.table.types.DataType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.AlreadyExistsException;
+import software.amazon.awssdk.services.glue.model.BatchDeleteTableRequest;
+import software.amazon.awssdk.services.glue.model.BatchDeleteTableResponse;
+import software.amazon.awssdk.services.glue.model.Column;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.CreateDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.CreatePartitionRequest;
+import software.amazon.awssdk.services.glue.model.CreatePartitionResponse;
+import software.amazon.awssdk.services.glue.model.CreateTableRequest;
+import software.amazon.awssdk.services.glue.model.CreateTableResponse;
+import 
software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionRequest;
+import 
software.amazon.awssdk.services.glue.model.CreateUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.Database;
+import software.amazon.awssdk.services.glue.model.DatabaseInput;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.DeleteDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.DeletePartitionRequest;
+import software.amazon.awssdk.services.glue.model.DeletePartitionResponse;
+import software.amazon.awssdk.services.glue.model.DeleteTableRequest;
+import software.amazon.awssdk.services.glue.model.DeleteTableResponse;
+import 
software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionRequest;
+import 
software.amazon.awssdk.services.glue.model.DeleteUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import software.amazon.awssdk.services.glue.model.GetDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.GetDatabasesRequest;
+import software.amazon.awssdk.services.glue.model.GetDatabasesResponse;
+import software.amazon.awssdk.services.glue.model.GetPartitionRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionResponse;
+import software.amazon.awssdk.services.glue.model.GetPartitionsRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionsResponse;
+import software.amazon.awssdk.services.glue.model.GetTableRequest;
+import software.amazon.awssdk.services.glue.model.GetTableResponse;
+import software.amazon.awssdk.services.glue.model.GetTablesRequest;
+import software.amazon.awssdk.services.glue.model.GetTablesResponse;
+import 
software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionRequest;
+import 
software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionResponse;
+import 
software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsRequest;
+import 
software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsResponse;
+import software.amazon.awssdk.services.glue.model.GlueException;
+import software.amazon.awssdk.services.glue.model.GlueResponse;
+import software.amazon.awssdk.services.glue.model.Partition;
+import software.amazon.awssdk.services.glue.model.PartitionInput;
+import software.amazon.awssdk.services.glue.model.PrincipalType;
+import software.amazon.awssdk.services.glue.model.ResourceUri;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseRequest;
+import software.amazon.awssdk.services.glue.model.UpdateDatabaseResponse;
+import software.amazon.awssdk.services.glue.model.UpdateTableRequest;
+import software.amazon.awssdk.services.glue.model.UpdateTableResponse;
+import 
software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionRequest;
+import 
software.amazon.awssdk.services.glue.model.UpdateUserDefinedFunctionResponse;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunction;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunctionInput;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
+
+/**
+ * Utilities for Glue catalog operations. Important Note :
+ * 
https://aws.amazon.com/premiumsupport/knowledge-center/glue-crawler-internal-service-exception/
+ */
+public class GlueOperator {
+
+    /** Defines the location URI for database. This locationUri is high level 
path for catalog */
+    public final String locationUri;
+
+    /**
+     * Instance of AwsProperties which holds the configs related to configure 
glue and aws setup.
+     */
+    private final AwsProperties awsProperties;
+
+    /** http client for glue client. Current implementation for client is sync 
type. */
+    private final GlueClient glueClient;
+
+    private final String catalogName;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(GlueOperator.class);
+
+    public GlueOperator(
+            String locationUri,
+            String catalogName,
+            AwsProperties awsProperties,
+            GlueClient glueClient) {
+        this.locationUri = locationUri;

Review Comment:
   Validate public args for blank/null etc



##########
flink-catalog-aws-glue/pom.xml:
##########
@@ -0,0 +1,114 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-connector-aws-parent</artifactId>
+        <version>4.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>flink-catalog-aws-glue</artifactId>
+    <name>Flink : Catalog : AWS : Glue</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-aws-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>glue</artifactId>
+            <version>${aws.sdkv2.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>apache-client</artifactId>
+            <version>${aws.sdkv2.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>url-connection-client</artifactId>
+            <version>${aws.sdkv2.version}</version>
+        </dependency>
+
+        <!-- ArchUit test dependencies -->
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-architecture-tests-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java</artifactId>
+            <version>${flink.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>

Review Comment:
   Is it expected to shade this? Usually we have a plain and Uber (`sql`) jar. 
I would rather not shade this module incase users want to use it with the Table 
API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to