vahmed-hamdy commented on code in PR #47: URL: https://github.com/apache/flink-connector-aws/pull/47#discussion_r1720826288
########## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/constants/AWSGlueConfigConstants.java: ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.constants; + +import org.apache.flink.annotation.PublicEvolving; + +/** Configuration keys for AWS Glue Data Catalog service usage. */ +@PublicEvolving +public class AWSGlueConfigConstants { + + /** + * Configure an alternative endpoint of the Glue service for GlueCatalog to access. + * + * <p>This could be used to use GlueCatalog with any glue-compatible metastore service that has + * a different endpoint + */ + public static final String GLUE_CATALOG_ENDPOINT = "aws.glue.endpoint"; + + /** + * The ID of the Glue Data Catalog where the tables reside. If none is provided, Glue + * automatically uses the caller's AWS account ID by default. + * + * <p>For more details, see <a + * href="https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-catalog-databases.html">...</a> + */ + public static final String GLUE_CATALOG_ID = "aws.glue.id"; Review Comment: could we use `aws.glue.catalog-id` since we also have `account-id` ########## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogOptions.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.connector.aws.config.AWSConfigConstants; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.catalog.glue.constants.AWSGlueConfigConstants; +import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants; + +import software.amazon.awssdk.regions.Region; + +import java.util.HashSet; +import java.util.Set; + +import static org.apache.flink.table.catalog.glue.GlueCatalog.DEFAULT_DB; + +/** Collection of {@link ConfigOption} used in GlueCatalog. */ +@Internal +public class GlueCatalogOptions extends CommonCatalogOptions { + + public static final String IDENTIFIER = "glue"; + public static final ConfigOption<String> DEFAULT_DATABASE = + ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY) + .stringType() + .defaultValue(DEFAULT_DB); + + public static final ConfigOption<String> INPUT_FORMAT = + ConfigOptions.key(GlueCatalogConstants.TABLE_INPUT_FORMAT) + .stringType() + .noDefaultValue(); + + public static final ConfigOption<String> OUTPUT_FORMAT = + ConfigOptions.key(GlueCatalogConstants.TABLE_OUTPUT_FORMAT) + .stringType() + .noDefaultValue(); + + public static final ConfigOption<String> GLUE_CATALOG_ENDPOINT = + ConfigOptions.key(AWSGlueConfigConstants.GLUE_CATALOG_ENDPOINT) + .stringType() + .noDefaultValue(); + + public static final ConfigOption<String> GLUE_CATALOG_ID = + ConfigOptions.key(AWSGlueConfigConstants.GLUE_CATALOG_ID).stringType().noDefaultValue(); + + public static final ConfigOption<String> GLUE_ACCOUNT_ID = + ConfigOptions.key(AWSGlueConfigConstants.GLUE_ACCOUNT_ID).stringType().noDefaultValue(); + + public static final ConfigOption<String> CREDENTIAL_PROVIDER = + ConfigOptions.key(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER) + .stringType() + .defaultValue(String.valueOf(AWSConfigConstants.CredentialProvider.AUTO)); + + public static final ConfigOption<String> HTTP_CLIENT_TYPE = + ConfigOptions.key(AWSConfigConstants.HTTP_CLIENT_TYPE) + .stringType() + .defaultValue(AWSConfigConstants.CLIENT_TYPE_APACHE); + + public static final ConfigOption<String> REGION = + ConfigOptions.key(AWSConfigConstants.AWS_REGION) + .stringType() + .defaultValue(Region.US_WEST_1.toString()); + + public static Set<ConfigOption<?>> getAllConfigOptions() { + Set<ConfigOption<?>> configOptions = new HashSet<>(); + configOptions.add(INPUT_FORMAT); + configOptions.add(OUTPUT_FORMAT); + configOptions.add(GLUE_CATALOG_ENDPOINT); + configOptions.add(GLUE_ACCOUNT_ID); + configOptions.add(GLUE_CATALOG_ID); + configOptions.add(DEFAULT_DATABASE); + configOptions.add(HTTP_CLIENT_TYPE); + configOptions.add(REGION); + configOptions.add(CREDENTIAL_PROVIDER); + return configOptions; + } + + public static Set<ConfigOption<?>> getRequiredConfigOptions() { + return new HashSet<>(); Review Comment: Why not have `GLUE_CATALOG_ID` and `REGION` as required options? ########## flink-catalog-aws/flink-catalog-aws-glue/src/main/resources/META-INF/NOTICE: ########## @@ -0,0 +1,7 @@ +flink-catalog-aws-glue +Copyright 2014-2023 The Apache Software Foundation Review Comment: 2024 ########## flink-catalog-aws/flink-catalog-aws-glue/pom.xml: ########## @@ -0,0 +1,96 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-catalog-aws-parent</artifactId> + <version>4.4-SNAPSHOT</version> + </parent> + + <artifactId>flink-catalog-aws-glue</artifactId> + <name>Flink : Catalog : AWS : Glue</name> Review Comment: ```suggestion <name>Flink : Catalog : AWS : Glue Data Catalog</name> ``` ########## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogOptions.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.connector.aws.config.AWSConfigConstants; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.catalog.glue.constants.AWSGlueConfigConstants; +import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants; + +import software.amazon.awssdk.regions.Region; + +import java.util.HashSet; +import java.util.Set; + +import static org.apache.flink.table.catalog.glue.GlueCatalog.DEFAULT_DB; + +/** Collection of {@link ConfigOption} used in GlueCatalog. */ +@Internal Review Comment: this is not `Internal` suggest to go with `PublicEvolving` ########## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueUtils.java: ########## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.FunctionLanguage; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.glue.GlueCatalogOptions; +import org.apache.flink.table.catalog.glue.TypeMapper; +import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.types.AbstractDataType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.glue.model.Column; +import software.amazon.awssdk.services.glue.model.Database; +import software.amazon.awssdk.services.glue.model.GlueResponse; +import software.amazon.awssdk.services.glue.model.Table; +import software.amazon.awssdk.services.glue.model.UserDefinedFunction; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.commons.lang3.BooleanUtils.FALSE; +import static org.apache.commons.lang3.BooleanUtils.TRUE; +import static org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants.EXPLAIN_EXTRAS; +import static org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants.IS_PERSISTED; +import static org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants.IS_PHYSICAL; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly; + +/** Utilities related glue Operation. */ +@Internal +public class GlueUtils { + + private static final Logger LOG = LoggerFactory.getLogger(GlueUtils.class); + + /** + * Glue supports lowercase naming convention. + * + * @param name fully qualified name. + * @return modified name according to glue convention. + */ + public static String getGlueConventionalName(String name) { + return name.toLowerCase(Locale.ROOT); + } + + /** + * Extract database location from properties and remove location from properties. fallback to + * create default location if not present + * + * @param databaseProperties database properties. + * @param databaseName fully qualified name for database. + * @param catalogPath catalog path. + * @return location for database. + */ + public static String extractDatabaseLocation( + final Map<String, String> databaseProperties, + final String databaseName, + final String catalogPath) { + if (databaseProperties.containsKey(GlueCatalogConstants.LOCATION_URI)) { + return databaseProperties.remove(GlueCatalogConstants.LOCATION_URI); + } else { + LOG.info("No location URI Set. Using Catalog Path as default"); + return catalogPath + GlueCatalogConstants.LOCATION_SEPARATOR + databaseName; + } + } + + /** + * Extract table location from table properties and remove location from properties. fallback to + * create default location if not present + * + * @param tableProperties table properties. + * @param tablePath fully qualified object for table. + * @param catalogPath catalog path. + * @return location for table. + */ + public static String extractTableLocation( + final Map<String, String> tableProperties, + final ObjectPath tablePath, + final String catalogPath) { + if (tableProperties.containsKey(GlueCatalogConstants.LOCATION_URI)) { + return tableProperties.remove(GlueCatalogConstants.LOCATION_URI); + } else { + return catalogPath + + GlueCatalogConstants.LOCATION_SEPARATOR + + tablePath.getDatabaseName() + + GlueCatalogConstants.LOCATION_SEPARATOR + + tablePath.getObjectName(); + } + } + + /** + * Build CatalogDatabase instance using information from glue Database instance. + * + * @param glueDatabase {@link Database } + * @return {@link CatalogDatabase } instance. + */ + public static CatalogDatabase getCatalogDatabase(final Database glueDatabase) { + Map<String, String> properties = new HashMap<>(glueDatabase.parameters()); + return new CatalogDatabaseImpl(properties, glueDatabase.description()); + } + + /** + * A Glue database name cannot be longer than 255 characters. The only acceptable characters are + * lowercase letters, numbers, and the underscore character. More details: <a + * href="https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html">...</a> + * + * @param name name + */ + public static void validate(String name) { Review Comment: nit: `validateDatabaseName` ########## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/operator/GlueOperator.java: ########## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.operator; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.StringUtils; + +import software.amazon.awssdk.services.glue.GlueClient; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Glue related operation. Important Note : * <a + * href="https://aws.amazon.com/premiumsupport/knowledge-center/glue-crawler-internal-service-exception/">...</a> + */ +@Internal +public abstract class GlueOperator { + + public final String glueCatalogId; + + protected final GlueClient glueClient; + + public final String catalogName; Review Comment: `public` ? ########## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueUtils.java: ########## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.FunctionLanguage; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.glue.GlueCatalogOptions; +import org.apache.flink.table.catalog.glue.TypeMapper; +import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.types.AbstractDataType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.glue.model.Column; +import software.amazon.awssdk.services.glue.model.Database; +import software.amazon.awssdk.services.glue.model.GlueResponse; +import software.amazon.awssdk.services.glue.model.Table; +import software.amazon.awssdk.services.glue.model.UserDefinedFunction; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.commons.lang3.BooleanUtils.FALSE; +import static org.apache.commons.lang3.BooleanUtils.TRUE; +import static org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants.EXPLAIN_EXTRAS; +import static org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants.IS_PERSISTED; +import static org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants.IS_PHYSICAL; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly; + +/** Utilities related glue Operation. */ +@Internal +public class GlueUtils { + + private static final Logger LOG = LoggerFactory.getLogger(GlueUtils.class); + + /** + * Glue supports lowercase naming convention. + * + * @param name fully qualified name. + * @return modified name according to glue convention. + */ + public static String getGlueConventionalName(String name) { + return name.toLowerCase(Locale.ROOT); + } + + /** + * Extract database location from properties and remove location from properties. fallback to + * create default location if not present + * + * @param databaseProperties database properties. + * @param databaseName fully qualified name for database. + * @param catalogPath catalog path. + * @return location for database. + */ + public static String extractDatabaseLocation( + final Map<String, String> databaseProperties, + final String databaseName, + final String catalogPath) { + if (databaseProperties.containsKey(GlueCatalogConstants.LOCATION_URI)) { + return databaseProperties.remove(GlueCatalogConstants.LOCATION_URI); + } else { + LOG.info("No location URI Set. Using Catalog Path as default"); + return catalogPath + GlueCatalogConstants.LOCATION_SEPARATOR + databaseName; + } + } + + /** + * Extract table location from table properties and remove location from properties. fallback to + * create default location if not present + * + * @param tableProperties table properties. + * @param tablePath fully qualified object for table. + * @param catalogPath catalog path. + * @return location for table. + */ + public static String extractTableLocation( Review Comment: is this used? ########## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java: ########## @@ -0,0 +1,1128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.aws.util.AWSClientUtil; +import org.apache.flink.connector.aws.util.AWSGeneralUtil; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionImpl; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogView; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedCatalogView; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.exceptions.TablePartitionedException; +import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants; +import org.apache.flink.table.catalog.glue.operator.GlueDatabaseOperator; +import org.apache.flink.table.catalog.glue.operator.GlueFunctionOperator; +import org.apache.flink.table.catalog.glue.operator.GluePartitionOperator; +import org.apache.flink.table.catalog.glue.operator.GlueTableOperator; +import org.apache.flink.table.catalog.glue.util.GlueUtils; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.functions.FunctionIdentifier; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.GetTablesRequest; +import software.amazon.awssdk.services.glue.model.GetTablesResponse; +import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsRequest; +import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsResponse; +import software.amazon.awssdk.services.glue.model.GlueException; +import software.amazon.awssdk.services.glue.model.Partition; +import software.amazon.awssdk.services.glue.model.Table; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly; + +/** Glue catalog implementation that uses AWS Glue Data Catalog as persistence at backend. */ +@PublicEvolving Review Comment: should be `@Internal` IMO, It is not user-facing right? ########## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogOptions.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.connector.aws.config.AWSConfigConstants; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.catalog.glue.constants.AWSGlueConfigConstants; +import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants; + +import software.amazon.awssdk.regions.Region; + +import java.util.HashSet; +import java.util.Set; + +import static org.apache.flink.table.catalog.glue.GlueCatalog.DEFAULT_DB; + +/** Collection of {@link ConfigOption} used in GlueCatalog. */ +@Internal +public class GlueCatalogOptions extends CommonCatalogOptions { + + public static final String IDENTIFIER = "glue"; + public static final ConfigOption<String> DEFAULT_DATABASE = + ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY) + .stringType() + .defaultValue(DEFAULT_DB); + + public static final ConfigOption<String> INPUT_FORMAT = + ConfigOptions.key(GlueCatalogConstants.TABLE_INPUT_FORMAT) + .stringType() + .noDefaultValue(); + + public static final ConfigOption<String> OUTPUT_FORMAT = + ConfigOptions.key(GlueCatalogConstants.TABLE_OUTPUT_FORMAT) + .stringType() + .noDefaultValue(); + + public static final ConfigOption<String> GLUE_CATALOG_ENDPOINT = + ConfigOptions.key(AWSGlueConfigConstants.GLUE_CATALOG_ENDPOINT) + .stringType() + .noDefaultValue(); + + public static final ConfigOption<String> GLUE_CATALOG_ID = + ConfigOptions.key(AWSGlueConfigConstants.GLUE_CATALOG_ID).stringType().noDefaultValue(); + + public static final ConfigOption<String> GLUE_ACCOUNT_ID = + ConfigOptions.key(AWSGlueConfigConstants.GLUE_ACCOUNT_ID).stringType().noDefaultValue(); + + public static final ConfigOption<String> CREDENTIAL_PROVIDER = + ConfigOptions.key(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER) + .stringType() + .defaultValue(String.valueOf(AWSConfigConstants.CredentialProvider.AUTO)); + + public static final ConfigOption<String> HTTP_CLIENT_TYPE = + ConfigOptions.key(AWSConfigConstants.HTTP_CLIENT_TYPE) + .stringType() + .defaultValue(AWSConfigConstants.CLIENT_TYPE_APACHE); + + public static final ConfigOption<String> REGION = + ConfigOptions.key(AWSConfigConstants.AWS_REGION) + .stringType() + .defaultValue(Region.US_WEST_1.toString()); Review Comment: Why `us-west-1` ? I prefer to have no default and make it required like the case in the connectors ########## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueUtils.java: ########## @@ -0,0 +1,419 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.FunctionLanguage; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.glue.GlueCatalogOptions; +import org.apache.flink.table.catalog.glue.TypeMapper; +import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.types.AbstractDataType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.glue.model.Column; +import software.amazon.awssdk.services.glue.model.Database; +import software.amazon.awssdk.services.glue.model.GlueResponse; +import software.amazon.awssdk.services.glue.model.Table; +import software.amazon.awssdk.services.glue.model.UserDefinedFunction; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.commons.lang3.BooleanUtils.FALSE; +import static org.apache.commons.lang3.BooleanUtils.TRUE; +import static org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants.EXPLAIN_EXTRAS; +import static org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants.IS_PERSISTED; +import static org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants.IS_PHYSICAL; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly; + +/** Utilities related glue Operation. */ +@Internal +public class GlueUtils { + + private static final Logger LOG = LoggerFactory.getLogger(GlueUtils.class); + + /** + * Glue supports lowercase naming convention. + * + * @param name fully qualified name. + * @return modified name according to glue convention. + */ + public static String getGlueConventionalName(String name) { + return name.toLowerCase(Locale.ROOT); + } + + /** + * Extract database location from properties and remove location from properties. fallback to + * create default location if not present + * + * @param databaseProperties database properties. + * @param databaseName fully qualified name for database. + * @return location for database. + */ + public static String extractDatabaseLocation( + final Map<String, String> databaseProperties, + final String databaseName, + final String catalogPath) { + if (databaseProperties.containsKey(GlueCatalogConstants.LOCATION_URI)) { + return databaseProperties.remove(GlueCatalogConstants.LOCATION_URI); + } else { + LOG.info("No location URI Set. Using Catalog Path as default"); + return catalogPath + GlueCatalogConstants.LOCATION_SEPARATOR + databaseName; + } + } + + /** + * Extract table location from table properties and remove location from properties. fallback to + * create default location if not present + * + * @param tableProperties table properties. + * @param tablePath fully qualified object for table. + * @return location for table. + */ + public static String extractTableLocation( + final Map<String, String> tableProperties, + final ObjectPath tablePath, + final String catalogPath) { + if (tableProperties.containsKey(GlueCatalogConstants.LOCATION_URI)) { + return tableProperties.remove(GlueCatalogConstants.LOCATION_URI); + } else { + return catalogPath + + GlueCatalogConstants.LOCATION_SEPARATOR + + tablePath.getDatabaseName() + + GlueCatalogConstants.LOCATION_SEPARATOR + + tablePath.getObjectName(); + } + } + + /** + * Build CatalogDatabase instance using information from glue Database instance. + * + * @param glueDatabase {@link Database } + * @return {@link CatalogDatabase } instance. + */ + public static CatalogDatabase getCatalogDatabase(final Database glueDatabase) { + Map<String, String> properties = new HashMap<>(glueDatabase.parameters()); + return new CatalogDatabaseImpl(properties, glueDatabase.description()); + } + + /** + * A Glue database name cannot be longer than 255 characters. The only acceptable characters are + * lowercase letters, numbers, and the underscore character. More details: <a + * href="https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html">...</a> + * + * @param name name + */ + public static void validate(String name) { + checkArgument( + name != null && name.matches(GlueCatalogConstants.GLUE_DB_PATTERN.pattern()), + "Database name does not comply with the Glue naming convention. " + + "Check here https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html"); + } + + /** validate response from client call. */ + public static void validateGlueResponse(GlueResponse response) { + if (response != null && !response.sdkHttpResponse().isSuccessful()) { + throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER); Review Comment: This is not descriptive, could we make use of `response.sdkHttpResponse().statusText()`? ########## flink-catalog-aws/flink-catalog-aws-glue/src/test/java/org/apache/flink/table/catalog/glue/util/GlueCatalogOptionsUtilsTest.java: ########## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.util; + +import org.junit.jupiter.api.Test; + +class GlueCatalogOptionsUtilsTest { + + @Test + void testGetValidatedConfigurations() {} Review Comment: please remove If not needed ########## flink-catalog-aws/flink-catalog-aws-glue/src/test/java/org/apache/flink/table/catalog/glue/factory/GlueCatalogFactoryTest.java: ########## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.factory; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.catalog.glue.GlueCatalogOptions; +import org.apache.flink.table.factories.CatalogFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.util.TestLogger; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.connector.aws.config.AWSConfigConstants.AWS_REGION; +import static org.apache.flink.table.catalog.glue.GlueCatalogOptions.CREDENTIAL_PROVIDER; +import static org.apache.flink.table.catalog.glue.GlueCatalogOptions.DEFAULT_DATABASE; +import static org.apache.flink.table.catalog.glue.GlueCatalogOptions.GLUE_ACCOUNT_ID; +import static org.apache.flink.table.catalog.glue.GlueCatalogOptions.GLUE_CATALOG_ENDPOINT; +import static org.apache.flink.table.catalog.glue.GlueCatalogOptions.GLUE_CATALOG_ID; +import static org.apache.flink.table.catalog.glue.GlueCatalogOptions.HTTP_CLIENT_TYPE; +import static org.apache.flink.table.catalog.glue.GlueCatalogOptions.INPUT_FORMAT; +import static org.apache.flink.table.catalog.glue.GlueCatalogOptions.OUTPUT_FORMAT; +import static org.apache.flink.table.catalog.glue.GlueCatalogOptions.REGION; + +class GlueCatalogFactoryTest extends TestLogger { + + public static GlueCatalogFactory factory; + + @BeforeAll + public static void setup() { + factory = new GlueCatalogFactory(); Review Comment: We should test is using `FactoryUtil.createCatalog()` method to ensure the catalog is resolved correctly via the planner ########## flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/config/AWSConfigConstants.java: ########## @@ -154,6 +154,54 @@ public enum CredentialProvider { /** Read Request timeout for {@link SdkAsyncHttpClient}. */ public static final String HTTP_CLIENT_READ_TIMEOUT_MILLIS = "aws.http-client.read-timeout"; + /** + * The type of {@link software.amazon.awssdk.http.SdkHttpClient}. If set, all AWS clients will + * use this specified HTTP client. If not set, HTTP_CLIENT_TYPE_DEFAULT will be used. For + * specific types supported, see HTTP_CLIENT_TYPE_* defined below. + */ + public static final String HTTP_CLIENT_TYPE = "http-client.type"; + + // ---- glue configs + + /** + * Used to configure the connection timeout in milliseconds for {@link + * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when + * {@link #HTTP_CLIENT_TYPE} is set to HTTP_CLIENT_TYPE_APACHE + * + * <p>For more details, see <a + * href="https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html">...</a> + */ + public static final String HTTP_CLIENT_CONNECTION_TIMEOUT_MS = + "http-client.connection-timeout-ms"; + + /** + * Used to configure the max connections number for {@link + * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when + * {@link #HTTP_CLIENT_TYPE} is set to HTTP_CLIENT_TYPE_APACHE + * + * <p>For more details, see <a + * href="https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html">...</a> + */ + public static final String HTTP_CLIENT_APACHE_MAX_CONNECTIONS = + "http-client.apache.max-connections"; + + /** + * Used to configure the socket timeout in milliseconds for {@link + * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when + * {@link #HTTP_CLIENT_TYPE} is set to HTTP_CLIENT_TYPE_APACHE + * + * <p>For more details, see <a + * href="https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html">...</a> + */ + public static final String HTTP_CLIENT_SOCKET_TIMEOUT_MS = "http-client.socket-timeout-ms"; Review Comment: same as above ########## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/factory/GlueCatalogFactory.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.factory; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.glue.GlueCatalog; +import org.apache.flink.table.catalog.glue.GlueCatalogOptions; +import org.apache.flink.table.catalog.glue.util.GlueCatalogOptionsUtils; +import org.apache.flink.table.factories.CatalogFactory; +import org.apache.flink.table.factories.FactoryUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; + +/** Catalog factory for {@link GlueCatalog}. */ +@PublicEvolving +public class GlueCatalogFactory implements CatalogFactory { + + private static final Logger LOG = LoggerFactory.getLogger(GlueCatalogFactory.class); + + @Override + public String factoryIdentifier() { + return GlueCatalogOptions.IDENTIFIER; + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + Set<ConfigOption<?>> allConfigs = GlueCatalogOptions.getAllConfigOptions(); + allConfigs.removeAll(GlueCatalogOptions.getRequiredConfigOptions()); + return allConfigs; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + return GlueCatalogOptions.getRequiredConfigOptions(); + } + + @Override + public Catalog createCatalog(Context context) { + final FactoryUtil.CatalogFactoryHelper helper = + FactoryUtil.createCatalogFactoryHelper(this, context); + GlueCatalogOptionsUtils optionsUtils = + new GlueCatalogOptionsUtils(context.getOptions(), context.getConfiguration()); + helper.validateExcept(optionsUtils.getNonValidatedPrefixes().toArray(new String[0])); Review Comment: We should use `validateExcept`, check FLINK-25779 ########## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/factory/GlueCatalogFactory.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.factory; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.glue.GlueCatalog; +import org.apache.flink.table.catalog.glue.GlueCatalogOptions; +import org.apache.flink.table.catalog.glue.util.GlueCatalogOptionsUtils; +import org.apache.flink.table.factories.CatalogFactory; +import org.apache.flink.table.factories.FactoryUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; + +/** Catalog factory for {@link GlueCatalog}. */ +public class GlueCatalogFactory implements CatalogFactory { Review Comment: At a second thought, I believe this should be `@Internal` this is not user facing right? ########## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/operator/GlueOperator.java: ########## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.operator; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.StringUtils; + +import software.amazon.awssdk.services.glue.GlueClient; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Glue related operation. Important Note : * <a + * href="https://aws.amazon.com/premiumsupport/knowledge-center/glue-crawler-internal-service-exception/">...</a> + */ +@Internal +public abstract class GlueOperator { + + public final String glueCatalogId; Review Comment: `public` ? ########## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueUtils.java: ########## @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.FunctionLanguage; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.glue.GlueCatalogOptions; +import org.apache.flink.table.catalog.glue.TypeMapper; +import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.types.AbstractDataType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.glue.model.Column; +import software.amazon.awssdk.services.glue.model.Database; +import software.amazon.awssdk.services.glue.model.GlueResponse; +import software.amazon.awssdk.services.glue.model.Table; +import software.amazon.awssdk.services.glue.model.UserDefinedFunction; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.commons.lang3.BooleanUtils.FALSE; +import static org.apache.commons.lang3.BooleanUtils.TRUE; +import static org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants.EXPLAIN_EXTRAS; +import static org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants.IS_PERSISTED; +import static org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants.IS_PHYSICAL; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly; + +/** Utilities related glue Operation. */ +@Internal +public class GlueUtils { + + private static final Logger LOG = LoggerFactory.getLogger(GlueUtils.class); + + /** + * Glue supports lowercase naming convention. + * + * @param name fully qualified name. + * @return modified name according to glue convention. + */ + public static String getGlueConventionalName(String name) { + return name.toLowerCase(Locale.ROOT); + } + + /** + * Extract database location from properties and remove location from properties. fallback to + * create default location if not present + * + * @param databaseProperties database properties. + * @param databaseName fully qualified name for database. + * @param catalogPath catalog path. + * @return location for database. + */ + public static String extractDatabaseLocation( Review Comment: is this used? ########## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java: ########## @@ -0,0 +1,1128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.aws.util.AWSClientUtil; +import org.apache.flink.connector.aws.util.AWSGeneralUtil; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionImpl; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogView; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedCatalogView; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.exceptions.TablePartitionedException; +import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants; +import org.apache.flink.table.catalog.glue.operator.GlueDatabaseOperator; +import org.apache.flink.table.catalog.glue.operator.GlueFunctionOperator; +import org.apache.flink.table.catalog.glue.operator.GluePartitionOperator; +import org.apache.flink.table.catalog.glue.operator.GlueTableOperator; +import org.apache.flink.table.catalog.glue.util.GlueUtils; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.functions.FunctionIdentifier; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.GetTablesRequest; +import software.amazon.awssdk.services.glue.model.GetTablesResponse; +import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsRequest; +import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsResponse; +import software.amazon.awssdk.services.glue.model.GlueException; +import software.amazon.awssdk.services.glue.model.Partition; +import software.amazon.awssdk.services.glue.model.Table; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly; + +/** Glue catalog implementation that uses AWS Glue Data Catalog as persistence at backend. */ +@PublicEvolving +public class GlueCatalog extends AbstractCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(GlueCatalog.class); + + /** instance of GlueOperator to facilitate glue related actions. */ + public GlueDatabaseOperator glueDatabaseOperator; + + public GlueTableOperator glueTableOperator; + public GluePartitionOperator gluePartitionOperator; + public GlueFunctionOperator glueFunctionOperator; + + public GlueClient glueClient; + + /** Default database name if not passed as part of catalog. */ + public static final String DEFAULT_DB = "default"; + + public GlueCatalog( + String catalogName, + String databaseName, + ReadableConfig catalogConfig, + Properties glueClientProperties) { + super(catalogName, databaseName); + checkNotNull(catalogConfig, "Catalog config cannot be null."); + String glueCatalogId = Review Comment: We don't accept null or empty catalogId in `GlupOperator`, why not just enforce it as required option. we wouldn't need this work around then. ########## flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/config/AWSConfigConstants.java: ########## @@ -154,6 +154,77 @@ public enum CredentialProvider { /** Read Request timeout for {@link SdkAsyncHttpClient}. */ public static final String HTTP_CLIENT_READ_TIMEOUT_MILLIS = "aws.http-client.read-timeout"; + /** + * The type of {@link software.amazon.awssdk.http.SdkHttpClient}. If set, all AWS clients will + * use this specified HTTP client. If not set, HTTP_CLIENT_TYPE_DEFAULT will be used. For + * specific types supported, see HTTP_CLIENT_TYPE_* defined below. + */ + public static final String HTTP_CLIENT_TYPE = "http-client.type"; + + // ---- glue configs + + /** + * Used to configure the connection timeout in milliseconds for {@link + * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when + * {@link #HTTP_CLIENT_TYPE} is set to HTTP_CLIENT_TYPE_APACHE + * + * <p>For more details, see <a + * href="https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html">...</a> + */ + public static final String HTTP_CLIENT_CONNECTION_TIMEOUT_MS = + "http-client.connection-timeout-ms"; Review Comment: bump ########## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java: ########## @@ -0,0 +1,1128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.aws.util.AWSClientUtil; +import org.apache.flink.connector.aws.util.AWSGeneralUtil; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionImpl; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogView; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedCatalogView; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.exceptions.TablePartitionedException; +import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants; +import org.apache.flink.table.catalog.glue.operator.GlueDatabaseOperator; +import org.apache.flink.table.catalog.glue.operator.GlueFunctionOperator; +import org.apache.flink.table.catalog.glue.operator.GluePartitionOperator; +import org.apache.flink.table.catalog.glue.operator.GlueTableOperator; +import org.apache.flink.table.catalog.glue.util.GlueUtils; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.functions.FunctionIdentifier; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.GetTablesRequest; +import software.amazon.awssdk.services.glue.model.GetTablesResponse; +import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsRequest; +import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsResponse; +import software.amazon.awssdk.services.glue.model.GlueException; +import software.amazon.awssdk.services.glue.model.Partition; +import software.amazon.awssdk.services.glue.model.Table; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly; + +/** Glue catalog implementation that uses AWS Glue Data Catalog as persistence at backend. */ +@PublicEvolving +public class GlueCatalog extends AbstractCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(GlueCatalog.class); + + /** instance of GlueOperator to facilitate glue related actions. */ + public GlueDatabaseOperator glueDatabaseOperator; + + public GlueTableOperator glueTableOperator; + public GluePartitionOperator gluePartitionOperator; + public GlueFunctionOperator glueFunctionOperator; + + public GlueClient glueClient; + + /** Default database name if not passed as part of catalog. */ + public static final String DEFAULT_DB = "default"; + + public GlueCatalog( + String catalogName, + String databaseName, + ReadableConfig catalogConfig, + Properties glueClientProperties) { + super(catalogName, databaseName); + checkNotNull(catalogConfig, "Catalog config cannot be null."); + String glueCatalogId = + String.valueOf( + catalogConfig.getOptional(GlueCatalogOptions.GLUE_CATALOG_ID).orElse(null)); + glueClient = createClient(glueClientProperties); + this.glueDatabaseOperator = new GlueDatabaseOperator(getName(), glueClient, glueCatalogId); + this.glueTableOperator = new GlueTableOperator(getName(), glueClient, glueCatalogId); + this.gluePartitionOperator = + new GluePartitionOperator(getName(), glueClient, glueCatalogId); + this.glueFunctionOperator = new GlueFunctionOperator(getName(), glueClient, glueCatalogId); + } + + private static GlueClient createClient(Properties glueClientProperties) { + return AWSClientUtil.createAwsSyncClient( + glueClientProperties, + AWSGeneralUtil.createSyncHttpClient( + glueClientProperties, ApacheHttpClient.builder()), + GlueClient.builder(), + GlueCatalogConstants.BASE_GLUE_USER_AGENT_PREFIX_FORMAT, + GlueCatalogConstants.GLUE_CLIENT_USER_AGENT_PREFIX); + } + + @VisibleForTesting + public GlueCatalog( + String catalogName, + String databaseName, + GlueClient glueClient, + GlueDatabaseOperator glueDatabaseOperator, + GlueTableOperator glueTableOperator, + GluePartitionOperator gluePartitionOperator, + GlueFunctionOperator glueFunctionOperator) { + super(catalogName, databaseName); + this.glueClient = glueClient; + this.glueDatabaseOperator = glueDatabaseOperator; + this.glueTableOperator = glueTableOperator; + this.gluePartitionOperator = gluePartitionOperator; + this.glueFunctionOperator = glueFunctionOperator; + } + + /** + * Open the catalog. Used for any required preparation in initialization phase. + * + * @throws CatalogException in case of any runtime exception + */ + @Override + public void open() throws CatalogException {} + + /** + * Close the catalog when it is no longer needed and release any resource that it might be + * holding. + * + * @throws CatalogException in case of any runtime exception + */ + @Override + public void close() throws CatalogException { + try { + glueClient.close(); + } catch (Exception e) { + throw new CatalogException("Glue Client is not closed properly!", e); + } + } + + // ------ databases ------ + + /** + * Create a database. + * + * @param databaseName Name of the database to be created + * @param database The database definition + * @param ignoreIfExists Flag to specify behavior when a database with the given name already + * exists: if set to false, throw a DatabaseAlreadyExistException, if set to true, do + * nothing. + * @throws DatabaseAlreadyExistException if the given database already exists and ignoreIfExists + * is false + * @throws CatalogException in case of any runtime exception + */ + @Override + public void createDatabase( + String databaseName, CatalogDatabase database, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(databaseName), + "Database name cannot be null or empty."); + checkNotNull(database, "Database cannot be null."); + databaseName = GlueUtils.getGlueConventionalName(databaseName); + if (databaseExists(databaseName) && !ignoreIfExists) { + throw new DatabaseAlreadyExistException(getName(), databaseName); + } else { + glueDatabaseOperator.createGlueDatabase(databaseName, database); Review Comment: Should we check if the database exists and `ignoreIfExists` is `true`. this seems like a bug looking at the `GlueDatabaseOprator` could we add tests as well in `GlueFactoryTest` ########## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java: ########## @@ -0,0 +1,1128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.aws.util.AWSClientUtil; +import org.apache.flink.connector.aws.util.AWSGeneralUtil; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionImpl; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogView; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedCatalogView; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.exceptions.TablePartitionedException; +import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants; +import org.apache.flink.table.catalog.glue.operator.GlueDatabaseOperator; +import org.apache.flink.table.catalog.glue.operator.GlueFunctionOperator; +import org.apache.flink.table.catalog.glue.operator.GluePartitionOperator; +import org.apache.flink.table.catalog.glue.operator.GlueTableOperator; +import org.apache.flink.table.catalog.glue.util.GlueUtils; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.functions.FunctionIdentifier; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.GetTablesRequest; +import software.amazon.awssdk.services.glue.model.GetTablesResponse; +import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsRequest; +import software.amazon.awssdk.services.glue.model.GetUserDefinedFunctionsResponse; +import software.amazon.awssdk.services.glue.model.GlueException; +import software.amazon.awssdk.services.glue.model.Partition; +import software.amazon.awssdk.services.glue.model.Table; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly; + +/** Glue catalog implementation that uses AWS Glue Data Catalog as persistence at backend. */ +@PublicEvolving +public class GlueCatalog extends AbstractCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(GlueCatalog.class); + + /** instance of GlueOperator to facilitate glue related actions. */ + public GlueDatabaseOperator glueDatabaseOperator; + + public GlueTableOperator glueTableOperator; + public GluePartitionOperator gluePartitionOperator; + public GlueFunctionOperator glueFunctionOperator; + + public GlueClient glueClient; + + /** Default database name if not passed as part of catalog. */ + public static final String DEFAULT_DB = "default"; + + public GlueCatalog( + String catalogName, + String databaseName, + ReadableConfig catalogConfig, + Properties glueClientProperties) { + super(catalogName, databaseName); + checkNotNull(catalogConfig, "Catalog config cannot be null."); + String glueCatalogId = + String.valueOf( + catalogConfig.getOptional(GlueCatalogOptions.GLUE_CATALOG_ID).orElse(null)); + glueClient = createClient(glueClientProperties); + this.glueDatabaseOperator = new GlueDatabaseOperator(getName(), glueClient, glueCatalogId); + this.glueTableOperator = new GlueTableOperator(getName(), glueClient, glueCatalogId); + this.gluePartitionOperator = + new GluePartitionOperator(getName(), glueClient, glueCatalogId); + this.glueFunctionOperator = new GlueFunctionOperator(getName(), glueClient, glueCatalogId); + } + + private static GlueClient createClient(Properties glueClientProperties) { + return AWSClientUtil.createAwsSyncClient( + glueClientProperties, + AWSGeneralUtil.createSyncHttpClient( + glueClientProperties, ApacheHttpClient.builder()), + GlueClient.builder(), + GlueCatalogConstants.BASE_GLUE_USER_AGENT_PREFIX_FORMAT, + GlueCatalogConstants.GLUE_CLIENT_USER_AGENT_PREFIX); + } + + @VisibleForTesting + public GlueCatalog( + String catalogName, + String databaseName, + GlueClient glueClient, + GlueDatabaseOperator glueDatabaseOperator, + GlueTableOperator glueTableOperator, + GluePartitionOperator gluePartitionOperator, + GlueFunctionOperator glueFunctionOperator) { + super(catalogName, databaseName); + this.glueClient = glueClient; + this.glueDatabaseOperator = glueDatabaseOperator; + this.glueTableOperator = glueTableOperator; + this.gluePartitionOperator = gluePartitionOperator; + this.glueFunctionOperator = glueFunctionOperator; + } + + /** + * Open the catalog. Used for any required preparation in initialization phase. + * + * @throws CatalogException in case of any runtime exception + */ + @Override + public void open() throws CatalogException {} + + /** + * Close the catalog when it is no longer needed and release any resource that it might be + * holding. + * + * @throws CatalogException in case of any runtime exception + */ + @Override + public void close() throws CatalogException { + try { + glueClient.close(); + } catch (Exception e) { + throw new CatalogException("Glue Client is not closed properly!", e); + } + } + + // ------ databases ------ + + /** + * Create a database. + * + * @param databaseName Name of the database to be created + * @param database The database definition + * @param ignoreIfExists Flag to specify behavior when a database with the given name already + * exists: if set to false, throw a DatabaseAlreadyExistException, if set to true, do + * nothing. + * @throws DatabaseAlreadyExistException if the given database already exists and ignoreIfExists + * is false + * @throws CatalogException in case of any runtime exception + */ + @Override + public void createDatabase( + String databaseName, CatalogDatabase database, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(databaseName), + "Database name cannot be null or empty."); + checkNotNull(database, "Database cannot be null."); + databaseName = GlueUtils.getGlueConventionalName(databaseName); + if (databaseExists(databaseName) && !ignoreIfExists) { + throw new DatabaseAlreadyExistException(getName(), databaseName); + } else { + glueDatabaseOperator.createGlueDatabase(databaseName, database); + LOG.info("Created Database {}.", databaseName); + } + } + + /** + * Drop a database. + * + * @param databaseName Name of the database to be dropped. + * @param ignoreIfNotExists Flag to specify behavior when the database does not exist: if set to + * false, throw an exception, if set to true, do nothing. + * @param cascade Flag to specify behavior when the database contains table or function: if set + * to true, delete all tables and functions in the database and then delete the database, if + * set to false, throw an exception. + * @throws DatabaseNotExistException if the given database does not exist + * @throws DatabaseNotEmptyException if the given database is not empty and isRestrict is true + * @throws CatalogException in case of any runtime exception + */ + @Override + public void dropDatabase(String databaseName, boolean ignoreIfNotExists, boolean cascade) + throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException { + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(databaseName), + "Database name cannot be null or empty."); + databaseName = GlueUtils.getGlueConventionalName(databaseName); + if (databaseExists(databaseName)) { + if (cascade) { + List<String> tables = listTables(databaseName); + if (!tables.isEmpty()) { + glueDatabaseOperator.deleteTablesFromDatabase(databaseName, tables); + LOG.info("{} Tables deleted from Database {}.", tables.size(), databaseName); + } + List<String> functions = listFunctions(databaseName); + if (!functions.isEmpty()) { + glueDatabaseOperator.deleteFunctionsFromDatabase(databaseName, functions); + LOG.info( + "{} Functions deleted from Database {}.", + functions.size(), + databaseName); + } + } + if (!isDatabaseEmpty(databaseName)) { + throw new DatabaseNotEmptyException(getName(), databaseName); + } + glueDatabaseOperator.dropGlueDatabase(databaseName); + LOG.info("Dropped Database: {}.", databaseName); + } else if (!ignoreIfNotExists) { + throw new DatabaseNotExistException(getName(), databaseName); + } + } + + /** + * Modify existing database. + * + * @param name Name of the database to be modified + * @param newDatabase The new database definition + * @param ignoreIfNotExists Flag to specify behavior when the given database does not exist: if + * set to false, throw an exception, if set to true, do nothing. + * @throws DatabaseNotExistException if the given database does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) + throws DatabaseNotExistException, CatalogException { + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(name), + "Database name cannot be null or empty."); + checkNotNull(newDatabase, "Database cannot be null."); + name = GlueUtils.getGlueConventionalName(name); + try { + CatalogDatabase existingDatabase = glueDatabaseOperator.getDatabase(name); + if (existingDatabase != null) { + if (existingDatabase.getClass() != newDatabase.getClass()) { + throw new CatalogException( + String.format( + "Database types don't match. Existing database is '%s' and new database is '%s'.", + existingDatabase.getClass().getName(), + newDatabase.getClass().getName())); + } + glueDatabaseOperator.updateGlueDatabase(name, newDatabase); + } + } catch (DatabaseNotExistException e) { + if (!ignoreIfNotExists) { + throw new DatabaseNotExistException(getName(), name); Review Comment: Why not `throw e`, this is unnecessary masking ########## flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/table/util/HttpClientOptionUtils.java: ########## @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.aws.table.util; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.aws.config.AWSConfigConstants; +import org.apache.flink.connector.base.table.options.ConfigurationValidator; +import org.apache.flink.connector.base.table.options.TableOptionsUtils; +import org.apache.flink.connector.base.table.util.ConfigurationValidatorUtil; + +import software.amazon.awssdk.http.Protocol; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** Class for handling AWS HTTP Client config options. */ +@PublicEvolving +public class HttpClientOptionUtils implements TableOptionsUtils, ConfigurationValidator { + public static final String CLIENT_PREFIX = "http-client."; + private static final String CLIENT_HTTP_PROTOCOL_VERSION_OPTION = "protocol.version"; + private static final String CLIENT_HTTP_MAX_CONNECTION_TIMEOUT_MS = "connection-timeout-ms"; + private static final String CLIENT_HTTP_MAX_SOCKET_TIMEOUT_MS = "socket-timeout-ms"; + private static final String APACHE_MAX_CONNECTIONS = "apache.max-connections"; Review Comment: bump -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
