luoyuxia commented on code in PR #47: URL: https://github.com/apache/flink-connector-aws/pull/47#discussion_r1119553822
########## flink-catalog-aws-glue/pom.xml: ########## @@ -0,0 +1,114 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-aws-parent</artifactId> + <version>4.1-SNAPSHOT</version> + </parent> + + <artifactId>flink-catalog-aws-glue</artifactId> + <name>Flink : Catalog : AWS : Glue</name> + <packaging>jar</packaging> + + <dependencies> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-aws-base</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>glue</artifactId> + <version>${aws.sdkv2.version}</version> + </dependency> + + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>apache-client</artifactId> + <version>${aws.sdkv2.version}</version> + </dependency> + + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>url-connection-client</artifactId> + <version>${aws.sdkv2.version}</version> + </dependency> + + <!-- ArchUit test dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-architecture-tests-test</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-common</artifactId> + <version>${flink.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java</artifactId> + <version>${flink.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> Review Comment: Should we add `maven-shade-plugin` in here just like `flink-sql-connector-dynamodb` does? ########## flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java: ########## @@ -0,0 +1,1029 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionImpl; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogView; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedCatalogView; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.exceptions.TablePartitionedException; +import org.apache.flink.table.catalog.glue.util.AwsClientFactories; +import org.apache.flink.table.catalog.glue.util.AwsProperties; +import org.apache.flink.table.catalog.glue.util.GlueOperator; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.functions.FunctionIdentifier; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.Partition; +import software.amazon.awssdk.services.glue.model.Table; + +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** A Glue catalog implementation that uses glue catalog. */ +public class GlueCatalog extends AbstractCatalog { + + /** instance of GlueOperator to facilitate glue related actions. */ + public GlueOperator glueOperator; + + /** Default database name if not passed as part of catalog. */ + public static final String DEFAULT_DB = "default"; + + private static final Logger LOG = LoggerFactory.getLogger(GlueCatalog.class); + + public GlueCatalog(String catalogName, String databaseName, ReadableConfig glueProperties) { + super(catalogName, databaseName); + checkNotNull(glueProperties); + initialize(glueProperties); + } + + public GlueCatalog(String catalogName) { + super(catalogName, DEFAULT_DB); + } + + public void initialize(ReadableConfig catalogProperties) { + // setLocationUri for the database level + String locationUri = catalogProperties.getOptional(GlueCatalogOptions.LOCATION_KEY).get(); + // .getOrDefault(GlueCatalogConstants.LOCATION_URI, ""); + // initialize aws client factories + AwsProperties awsProperties = new AwsProperties(catalogProperties); + + // create glue client + GlueClient glueClient = AwsClientFactories.factory(awsProperties).glue(); + this.glueOperator = new GlueOperator(locationUri, getName(), awsProperties, glueClient); + } + + /** + * Open the catalog. Used for any required preparation in initialization phase. + * + * @throws CatalogException in case of any runtime exception + */ + @Override + public void open() throws CatalogException {} + + /** + * Close the catalog when it is no longer needed and release any resource that it might be + * holding. + * + * @throws CatalogException in case of any runtime exception + */ + @Override + public void close() throws CatalogException { + try { + glueOperator.closeClient(); + } catch (Exception e) { + LOG.warn("Glue Client is not closed properly!"); + } + } + + // ------ databases ------ + + /** + * Create a database. + * + * @param name Name of the database to be created + * @param database The database definition + * @param ignoreIfExists Flag to specify behavior when a database with the given name already + * exists: if set to false, throw a DatabaseAlreadyExistException, if set to true, do + * nothing. + * @throws DatabaseAlreadyExistException if the given database already exists and ignoreIfExists + * is false + * @throws CatalogException in case of any runtime exception + */ + @Override + public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + + checkArgument(!StringUtils.isNullOrWhitespaceOnly(name)); + checkNotNull(database); + + // glue supports lowercase naming convention + name = name.toLowerCase(Locale.ROOT); + if (databaseExists(name)) { + if (!ignoreIfExists) { + throw new DatabaseAlreadyExistException(getName(), name); + } + } else { + glueOperator.createGlueDatabase(name, database); + } + } + + /** + * Drop a database. + * + * @param name Name of the database to be dropped. + * @param ignoreIfNotExists Flag to specify behavior when the database does not exist: if set to + * false, throw an exception, if set to true, do nothing. + * @param cascade Flag to specify behavior when the database contains table or function: if set + * to true, delete all tables and functions in the database and then delete the database, if + * set to false, throw an exception. + * @throws DatabaseNotExistException if the given database does not exist + * @throws DatabaseNotEmptyException if the given database is not empty and isRestrict is true + * @throws CatalogException in case of any runtime exception + */ + @Override + public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) + throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException { + + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(name), "Database name contains empty spaces"); Review Comment: Exception message may be like "DatabaseName cannot be null or empty"? ########## flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java: ########## @@ -0,0 +1,1029 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionImpl; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogView; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedCatalogView; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.exceptions.TablePartitionedException; +import org.apache.flink.table.catalog.glue.util.AwsClientFactories; +import org.apache.flink.table.catalog.glue.util.AwsProperties; +import org.apache.flink.table.catalog.glue.util.GlueOperator; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.functions.FunctionIdentifier; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.Partition; +import software.amazon.awssdk.services.glue.model.Table; + +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** A Glue catalog implementation that uses glue catalog. */ +public class GlueCatalog extends AbstractCatalog { + + /** instance of GlueOperator to facilitate glue related actions. */ + public GlueOperator glueOperator; + + /** Default database name if not passed as part of catalog. */ + public static final String DEFAULT_DB = "default"; + + private static final Logger LOG = LoggerFactory.getLogger(GlueCatalog.class); + + public GlueCatalog(String catalogName, String databaseName, ReadableConfig glueProperties) { + super(catalogName, databaseName); + checkNotNull(glueProperties); + initialize(glueProperties); + } + + public GlueCatalog(String catalogName) { + super(catalogName, DEFAULT_DB); + } + + public void initialize(ReadableConfig catalogProperties) { + // setLocationUri for the database level + String locationUri = catalogProperties.getOptional(GlueCatalogOptions.LOCATION_KEY).get(); + // .getOrDefault(GlueCatalogConstants.LOCATION_URI, ""); + // initialize aws client factories + AwsProperties awsProperties = new AwsProperties(catalogProperties); + + // create glue client + GlueClient glueClient = AwsClientFactories.factory(awsProperties).glue(); + this.glueOperator = new GlueOperator(locationUri, getName(), awsProperties, glueClient); + } + + /** + * Open the catalog. Used for any required preparation in initialization phase. + * + * @throws CatalogException in case of any runtime exception + */ + @Override + public void open() throws CatalogException {} + + /** + * Close the catalog when it is no longer needed and release any resource that it might be + * holding. + * + * @throws CatalogException in case of any runtime exception + */ + @Override + public void close() throws CatalogException { + try { + glueOperator.closeClient(); + } catch (Exception e) { + LOG.warn("Glue Client is not closed properly!"); + } + } + + // ------ databases ------ + + /** + * Create a database. + * + * @param name Name of the database to be created + * @param database The database definition + * @param ignoreIfExists Flag to specify behavior when a database with the given name already + * exists: if set to false, throw a DatabaseAlreadyExistException, if set to true, do + * nothing. + * @throws DatabaseAlreadyExistException if the given database already exists and ignoreIfExists + * is false + * @throws CatalogException in case of any runtime exception + */ + @Override + public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + + checkArgument(!StringUtils.isNullOrWhitespaceOnly(name)); + checkNotNull(database); + + // glue supports lowercase naming convention + name = name.toLowerCase(Locale.ROOT); + if (databaseExists(name)) { + if (!ignoreIfExists) { + throw new DatabaseAlreadyExistException(getName(), name); + } + } else { + glueOperator.createGlueDatabase(name, database); + } + } + + /** + * Drop a database. + * + * @param name Name of the database to be dropped. + * @param ignoreIfNotExists Flag to specify behavior when the database does not exist: if set to + * false, throw an exception, if set to true, do nothing. + * @param cascade Flag to specify behavior when the database contains table or function: if set + * to true, delete all tables and functions in the database and then delete the database, if + * set to false, throw an exception. + * @throws DatabaseNotExistException if the given database does not exist + * @throws DatabaseNotEmptyException if the given database is not empty and isRestrict is true + * @throws CatalogException in case of any runtime exception + */ + @Override + public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) + throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException { + + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(name), "Database name contains empty spaces"); + + // glue supports only lowercase naming convention + name = name.toLowerCase(Locale.ROOT); + + if (databaseExists(name)) { + if (cascade) { + // delete all tables in database + glueOperator.deleteTablesFromDatabase(name, listTables(name)); + // delete all functions in database + glueOperator.deleteFunctionsFromDatabase(name, listFunctions(name)); + } + + if (glueOperator.isDatabaseEmpty(name)) { + glueOperator.dropGlueDatabase(name); + } else { + throw new DatabaseNotEmptyException(getName(), name); + } + } else if (!ignoreIfNotExists) { + throw new DatabaseNotExistException(getName(), name); + } + } + + /** + * Modify an existing database. + * + * @param name Name of the database to be modified + * @param newDatabase The new database definition + * @param ignoreIfNotExists Flag to specify behavior when the given database does not exist: if + * set to false, throw an exception, if set to true, do nothing. + * @throws DatabaseNotExistException if the given database does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) + throws DatabaseNotExistException, CatalogException { + + // glue supports only lowercase naming convention + name = name.toLowerCase(Locale.ROOT); + + checkArgument( Review Comment: dito ########## flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogOptions.java: ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.table.catalog.CommonCatalogOptions; + +import java.util.HashSet; +import java.util.Set; + +import static org.apache.flink.table.catalog.glue.GlueCatalogConstants.GLUE_CATALOG_SKIP_NAME_VALIDATION_DEFAULT; + +/** A collection of {@link ConfigOption} which is used in GlueCatalog. */ +public class GlueCatalogOptions extends CommonCatalogOptions { + + public static final ConfigOption<String> LOCATION_KEY = + ConfigOptions.key(GlueCatalogConstants.LOCATION_URI).stringType().noDefaultValue(); + + public static final ConfigOption<String> INPUT_FORMAT = + ConfigOptions.key(GlueCatalogConstants.TABLE_INPUT_FORMAT) + .stringType() + .noDefaultValue(); Review Comment: It'll be better to also add a brief desc in here `withDescription(xxx)` . ########## flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogFactoryOptions.java: ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +/** {@link ConfigOption}s for {@link GlueCatalog}. */ +@Internal +public class GlueCatalogFactoryOptions { + + public static final String IDENTIFIER = "glue"; + + public static final ConfigOption<String> DEFAULT_DATABASE = + ConfigOptions.key(GlueCatalogOptions.DEFAULT_DATABASE_KEY) + .stringType() + .defaultValue(GlueCatalog.DEFAULT_DB); + + // public static final ConfigOption<String> LOCATION_URI = Review Comment: Please remove them if we don't need them. ########## flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogFactory.java: ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.factories.CatalogFactory; +import org.apache.flink.table.factories.FactoryUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Set; + +/** Catalog factory for {@link GlueCatalog}. */ +public class GlueCatalogFactory implements CatalogFactory { + + private static final Logger LOG = LoggerFactory.getLogger(GlueCatalogFactory.class); + + @Override + public String factoryIdentifier() { + return GlueCatalogFactoryOptions.IDENTIFIER; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + return GlueCatalogOptions.getAllConfigOptions(); + } + + @Override + public Catalog createCatalog(Context context) { + final FactoryUtil.CatalogFactoryHelper helper = + FactoryUtil.createCatalogFactoryHelper(this, context); + helper.validate(); + // String msg = Review Comment: please remove them if we don't need them in product code. ########## flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogConstants.java: ########## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.table.catalog.glue.util.AwsClientFactory; + +import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; + +import java.util.regex.Pattern; + +/** Configs for catalog meta-objects in {@link GlueCatalog}. */ +public class GlueCatalogConstants { + + public static final String COMMENT = "comment"; + public static final String DEFAULT_SEPARATOR = ":"; + public static final String LOCATION_SEPARATOR = "/"; + public static final String LOCATION_URI = "location-uri"; + public static final String AND = "and"; + public static final String NEXT_LINE = "\n"; + public static final String SPACE = " "; + + public static final String TABLE_OWNER = "owner"; + public static final String TABLE_INPUT_FORMAT = "table.input.format"; + public static final String TABLE_OUTPUT_FORMAT = "table.output.format"; + + public static final String FLINK_SCALA_FUNCTION_PREFIX = "flink:scala:"; + public static final String FLINK_PYTHON_FUNCTION_PREFIX = "flink:python:"; + public static final String FLINK_JAVA_FUNCTION_PREFIX = "flink:java:"; + + public static final String FLINK_CATALOG = "FLINK_CATALOG"; + + public static final Pattern GLUE_DB_PATTERN = Pattern.compile("^[a-z0-9_]{1,252}$"); + public static final String GLUE_EXCEPTION_MSG_IDENTIFIER = "GLUE EXCEPTION"; + public static final String TABLE_NOT_EXISTS_IDENTIFIER = "TABLE DOESN'T EXISTS"; + public static final String DEFAULT_PARTITION_NAME = "__GLUE_DEFAULT_PARTITION__"; + + // ---- glue configs + /** + * The type of {@link software.amazon.awssdk.http.SdkHttpClient} implementation used by {@link + * AwsClientFactory} If set, all AWS clients will use this specified HTTP client. If not set, + * HTTP_CLIENT_TYPE_DEFAULT will be used. For specific types supported, see HTTP_CLIENT_TYPE_* + * defined below. + */ + public static final String HTTP_CLIENT_TYPE = "http-client.type"; + + /** + * Used to configure the connection acquisition timeout in milliseconds for {@link + * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when + * {@link #HTTP_CLIENT_TYPE} is set to HTTP_CLIENT_TYPE_APACHE + * + * <p>For more details, see + * https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.Builder.html + */ + public static final String HTTP_CLIENT_APACHE_CONNECTION_ACQUISITION_TIMEOUT_MS = + "http-client.apache.connection-acquisition-timeout-ms"; + + /** + * If Glue should skip name validations It is recommended to stick to Glue best practice in + * https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html to make sure operations + * are Hive compatible. This is only added for users that have existing conventions using + * non-standard characters. When database name and table name validation are skipped, there is + * no guarantee that downstream systems would all support the names. + */ + public static final String GLUE_CATALOG_SKIP_NAME_VALIDATION = "glue.skip-name-validation"; + + /** + * Used to configure the connection max idle time in milliseconds for {@link + * software.amazon.awssdk.http.apache.ApacheHttpClient.Builder}. This flag only works when Review Comment: If this flag only works when{@link #HTTP_CLIENT_TYPE} is set to HTTP_CLIENT_TYPE_APACHE. Will it be better to validate it in method `createCatalog`? I mean if `HTTP_CLIENT_TYPE` is not set to `HTTP_CLIENT_TYPE_APACHE`, but `HTTP_CLIENT_APACHE_CONNECTION_MAX_IDLE_TIME_MS` is set, should we throw an exception to tell users or just ignore it? ########## flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsClientFactories.java: ########## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.util; + +import org.apache.flink.configuration.ReadableConfig; + +import software.amazon.awssdk.services.glue.GlueClient; + +/** Default factories. */ +public class AwsClientFactories { + + private AwsClientFactories() {} + + public static AwsClientFactory factory(AwsProperties properties) { + return new DefaultAwsClientFactory(properties); + } + + static class DefaultAwsClientFactory implements AwsClientFactory { + Review Comment: Add a serialVersionUID in here if it implements Serializable. ########## flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsClientFactory.java: ########## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.util; + +import org.apache.flink.configuration.ReadableConfig; + +import software.amazon.awssdk.services.glue.GlueClient; + +import java.io.Serializable; + +/** + * Interface to customize AWS clients used by Flink. A custom factory must have a no-arg. + * constructor, and use {@link #initialize(ReadableConfig)} to initialize the factory. + */ +public interface AwsClientFactory extends Serializable { + + /** + * create a AWS Glue client. + * + * @return glue client + */ + GlueClient glue(); + + /** + * Initialize AWS client factory from catalog properties. + * + * @param properties catalog properties + */ + void initialize(ReadableConfig properties); Review Comment: When this method will be called? I haven't seen where it will be used. ########## flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java: ########## @@ -0,0 +1,1029 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionImpl; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogView; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedCatalogView; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.exceptions.TablePartitionedException; +import org.apache.flink.table.catalog.glue.util.AwsClientFactories; +import org.apache.flink.table.catalog.glue.util.AwsProperties; +import org.apache.flink.table.catalog.glue.util.GlueOperator; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.functions.FunctionIdentifier; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.Partition; +import software.amazon.awssdk.services.glue.model.Table; + +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** A Glue catalog implementation that uses glue catalog. */ +public class GlueCatalog extends AbstractCatalog { + + /** instance of GlueOperator to facilitate glue related actions. */ + public GlueOperator glueOperator; + + /** Default database name if not passed as part of catalog. */ + public static final String DEFAULT_DB = "default"; + + private static final Logger LOG = LoggerFactory.getLogger(GlueCatalog.class); + + public GlueCatalog(String catalogName, String databaseName, ReadableConfig glueProperties) { + super(catalogName, databaseName); + checkNotNull(glueProperties); + initialize(glueProperties); + } + + public GlueCatalog(String catalogName) { + super(catalogName, DEFAULT_DB); + } + + public void initialize(ReadableConfig catalogProperties) { + // setLocationUri for the database level + String locationUri = catalogProperties.getOptional(GlueCatalogOptions.LOCATION_KEY).get(); Review Comment: check whether `catalogProperties.getOptional(GlueCatalogOptions.LOCATION_KEY)` exist? ########## flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java: ########## @@ -0,0 +1,1029 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionImpl; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogView; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedCatalogView; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.exceptions.TablePartitionedException; +import org.apache.flink.table.catalog.glue.util.AwsClientFactories; +import org.apache.flink.table.catalog.glue.util.AwsProperties; +import org.apache.flink.table.catalog.glue.util.GlueOperator; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.functions.FunctionIdentifier; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.Partition; +import software.amazon.awssdk.services.glue.model.Table; + +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** A Glue catalog implementation that uses glue catalog. */ +public class GlueCatalog extends AbstractCatalog { + + /** instance of GlueOperator to facilitate glue related actions. */ + public GlueOperator glueOperator; + + /** Default database name if not passed as part of catalog. */ + public static final String DEFAULT_DB = "default"; + + private static final Logger LOG = LoggerFactory.getLogger(GlueCatalog.class); + + public GlueCatalog(String catalogName, String databaseName, ReadableConfig glueProperties) { + super(catalogName, databaseName); + checkNotNull(glueProperties); + initialize(glueProperties); Review Comment: put it to method `open`? ########## flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java: ########## @@ -0,0 +1,1029 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionImpl; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogView; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedCatalogView; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.exceptions.TablePartitionedException; +import org.apache.flink.table.catalog.glue.util.AwsClientFactories; +import org.apache.flink.table.catalog.glue.util.AwsProperties; +import org.apache.flink.table.catalog.glue.util.GlueOperator; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.functions.FunctionIdentifier; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.Partition; +import software.amazon.awssdk.services.glue.model.Table; + +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** A Glue catalog implementation that uses glue catalog. */ +public class GlueCatalog extends AbstractCatalog { + + /** instance of GlueOperator to facilitate glue related actions. */ + public GlueOperator glueOperator; + + /** Default database name if not passed as part of catalog. */ + public static final String DEFAULT_DB = "default"; + + private static final Logger LOG = LoggerFactory.getLogger(GlueCatalog.class); + + public GlueCatalog(String catalogName, String databaseName, ReadableConfig glueProperties) { + super(catalogName, databaseName); + checkNotNull(glueProperties); + initialize(glueProperties); + } + + public GlueCatalog(String catalogName) { + super(catalogName, DEFAULT_DB); + } + + public void initialize(ReadableConfig catalogProperties) { + // setLocationUri for the database level + String locationUri = catalogProperties.getOptional(GlueCatalogOptions.LOCATION_KEY).get(); + // .getOrDefault(GlueCatalogConstants.LOCATION_URI, ""); + // initialize aws client factories + AwsProperties awsProperties = new AwsProperties(catalogProperties); + + // create glue client + GlueClient glueClient = AwsClientFactories.factory(awsProperties).glue(); + this.glueOperator = new GlueOperator(locationUri, getName(), awsProperties, glueClient); + } + + /** + * Open the catalog. Used for any required preparation in initialization phase. + * + * @throws CatalogException in case of any runtime exception + */ + @Override + public void open() throws CatalogException {} + + /** + * Close the catalog when it is no longer needed and release any resource that it might be + * holding. + * + * @throws CatalogException in case of any runtime exception + */ + @Override + public void close() throws CatalogException { + try { + glueOperator.closeClient(); + } catch (Exception e) { + LOG.warn("Glue Client is not closed properly!"); + } + } + + // ------ databases ------ + + /** + * Create a database. + * + * @param name Name of the database to be created + * @param database The database definition + * @param ignoreIfExists Flag to specify behavior when a database with the given name already + * exists: if set to false, throw a DatabaseAlreadyExistException, if set to true, do + * nothing. + * @throws DatabaseAlreadyExistException if the given database already exists and ignoreIfExists + * is false + * @throws CatalogException in case of any runtime exception + */ + @Override + public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + + checkArgument(!StringUtils.isNullOrWhitespaceOnly(name)); + checkNotNull(database); + + // glue supports lowercase naming convention + name = name.toLowerCase(Locale.ROOT); + if (databaseExists(name)) { + if (!ignoreIfExists) { + throw new DatabaseAlreadyExistException(getName(), name); + } + } else { + glueOperator.createGlueDatabase(name, database); + } + } + + /** + * Drop a database. + * + * @param name Name of the database to be dropped. + * @param ignoreIfNotExists Flag to specify behavior when the database does not exist: if set to + * false, throw an exception, if set to true, do nothing. + * @param cascade Flag to specify behavior when the database contains table or function: if set + * to true, delete all tables and functions in the database and then delete the database, if + * set to false, throw an exception. + * @throws DatabaseNotExistException if the given database does not exist + * @throws DatabaseNotEmptyException if the given database is not empty and isRestrict is true + * @throws CatalogException in case of any runtime exception + */ + @Override + public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) + throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException { + + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(name), "Database name contains empty spaces"); + + // glue supports only lowercase naming convention + name = name.toLowerCase(Locale.ROOT); + + if (databaseExists(name)) { + if (cascade) { + // delete all tables in database + glueOperator.deleteTablesFromDatabase(name, listTables(name)); + // delete all functions in database + glueOperator.deleteFunctionsFromDatabase(name, listFunctions(name)); + } + + if (glueOperator.isDatabaseEmpty(name)) { + glueOperator.dropGlueDatabase(name); + } else { + throw new DatabaseNotEmptyException(getName(), name); + } + } else if (!ignoreIfNotExists) { + throw new DatabaseNotExistException(getName(), name); + } + } + + /** + * Modify an existing database. + * + * @param name Name of the database to be modified + * @param newDatabase The new database definition + * @param ignoreIfNotExists Flag to specify behavior when the given database does not exist: if + * set to false, throw an exception, if set to true, do nothing. + * @throws DatabaseNotExistException if the given database does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) + throws DatabaseNotExistException, CatalogException { + + // glue supports only lowercase naming convention + name = name.toLowerCase(Locale.ROOT); + + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(name), "Database name contains empty spaces"); + checkNotNull(newDatabase, "Database cannot be Empty"); + + CatalogDatabase existingDatabase = glueOperator.getDatabase(name); + if (existingDatabase != null) { + if (existingDatabase.getClass() != newDatabase.getClass()) { + throw new CatalogException( + String.format( + "Database types don't match. Existing database is '%s' and new database is '%s'.", + existingDatabase.getClass().getName(), + newDatabase.getClass().getName())); + } + glueOperator.updateGlueDatabase(name, newDatabase); + } else if (!ignoreIfNotExists) { + throw new DatabaseNotExistException(getName(), name); + } + } + + /** + * Get the names of all databases in this catalog. + * + * @return a list of the names of all databases + * @throws CatalogException in case of any runtime exception + */ + @Override + public List<String> listDatabases() throws CatalogException { + return glueOperator.listGlueDatabases(); + } + + /** + * Get a database from this catalog. + * + * @param databaseName Name of the database + * @return The requested database + * @throws DatabaseNotExistException if the database does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public CatalogDatabase getDatabase(String databaseName) + throws DatabaseNotExistException, CatalogException { + + // glue supports only lowercase naming convention + databaseName = databaseName.toLowerCase(Locale.ROOT); + + checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName)); Review Comment: add exception message. ########## flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java: ########## @@ -0,0 +1,1029 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionImpl; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogView; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedCatalogView; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.exceptions.TablePartitionedException; +import org.apache.flink.table.catalog.glue.util.AwsClientFactories; +import org.apache.flink.table.catalog.glue.util.AwsProperties; +import org.apache.flink.table.catalog.glue.util.GlueOperator; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.functions.FunctionIdentifier; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.Partition; +import software.amazon.awssdk.services.glue.model.Table; + +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** A Glue catalog implementation that uses glue catalog. */ +public class GlueCatalog extends AbstractCatalog { + + /** instance of GlueOperator to facilitate glue related actions. */ + public GlueOperator glueOperator; + + /** Default database name if not passed as part of catalog. */ + public static final String DEFAULT_DB = "default"; + + private static final Logger LOG = LoggerFactory.getLogger(GlueCatalog.class); + + public GlueCatalog(String catalogName, String databaseName, ReadableConfig glueProperties) { + super(catalogName, databaseName); + checkNotNull(glueProperties); + initialize(glueProperties); + } + + public GlueCatalog(String catalogName) { + super(catalogName, DEFAULT_DB); + } + + public void initialize(ReadableConfig catalogProperties) { + // setLocationUri for the database level + String locationUri = catalogProperties.getOptional(GlueCatalogOptions.LOCATION_KEY).get(); + // .getOrDefault(GlueCatalogConstants.LOCATION_URI, ""); + // initialize aws client factories + AwsProperties awsProperties = new AwsProperties(catalogProperties); + + // create glue client + GlueClient glueClient = AwsClientFactories.factory(awsProperties).glue(); + this.glueOperator = new GlueOperator(locationUri, getName(), awsProperties, glueClient); + } + + /** + * Open the catalog. Used for any required preparation in initialization phase. + * + * @throws CatalogException in case of any runtime exception + */ + @Override + public void open() throws CatalogException {} + + /** + * Close the catalog when it is no longer needed and release any resource that it might be + * holding. + * + * @throws CatalogException in case of any runtime exception + */ + @Override + public void close() throws CatalogException { + try { + glueOperator.closeClient(); + } catch (Exception e) { + LOG.warn("Glue Client is not closed properly!"); + } + } + + // ------ databases ------ + + /** + * Create a database. + * + * @param name Name of the database to be created + * @param database The database definition + * @param ignoreIfExists Flag to specify behavior when a database with the given name already + * exists: if set to false, throw a DatabaseAlreadyExistException, if set to true, do + * nothing. + * @throws DatabaseAlreadyExistException if the given database already exists and ignoreIfExists + * is false + * @throws CatalogException in case of any runtime exception + */ + @Override + public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + + checkArgument(!StringUtils.isNullOrWhitespaceOnly(name)); + checkNotNull(database); + + // glue supports lowercase naming convention + name = name.toLowerCase(Locale.ROOT); + if (databaseExists(name)) { + if (!ignoreIfExists) { + throw new DatabaseAlreadyExistException(getName(), name); + } + } else { + glueOperator.createGlueDatabase(name, database); + } + } + + /** + * Drop a database. + * + * @param name Name of the database to be dropped. + * @param ignoreIfNotExists Flag to specify behavior when the database does not exist: if set to + * false, throw an exception, if set to true, do nothing. + * @param cascade Flag to specify behavior when the database contains table or function: if set + * to true, delete all tables and functions in the database and then delete the database, if + * set to false, throw an exception. + * @throws DatabaseNotExistException if the given database does not exist + * @throws DatabaseNotEmptyException if the given database is not empty and isRestrict is true + * @throws CatalogException in case of any runtime exception + */ + @Override + public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) + throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException { + + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(name), "Database name contains empty spaces"); + + // glue supports only lowercase naming convention + name = name.toLowerCase(Locale.ROOT); + + if (databaseExists(name)) { + if (cascade) { + // delete all tables in database + glueOperator.deleteTablesFromDatabase(name, listTables(name)); + // delete all functions in database + glueOperator.deleteFunctionsFromDatabase(name, listFunctions(name)); + } + + if (glueOperator.isDatabaseEmpty(name)) { + glueOperator.dropGlueDatabase(name); + } else { + throw new DatabaseNotEmptyException(getName(), name); + } + } else if (!ignoreIfNotExists) { + throw new DatabaseNotExistException(getName(), name); + } + } + + /** + * Modify an existing database. + * + * @param name Name of the database to be modified + * @param newDatabase The new database definition + * @param ignoreIfNotExists Flag to specify behavior when the given database does not exist: if + * set to false, throw an exception, if set to true, do nothing. + * @throws DatabaseNotExistException if the given database does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) + throws DatabaseNotExistException, CatalogException { + + // glue supports only lowercase naming convention + name = name.toLowerCase(Locale.ROOT); + + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(name), "Database name contains empty spaces"); + checkNotNull(newDatabase, "Database cannot be Empty"); + + CatalogDatabase existingDatabase = glueOperator.getDatabase(name); + if (existingDatabase != null) { + if (existingDatabase.getClass() != newDatabase.getClass()) { + throw new CatalogException( + String.format( + "Database types don't match. Existing database is '%s' and new database is '%s'.", + existingDatabase.getClass().getName(), + newDatabase.getClass().getName())); + } + glueOperator.updateGlueDatabase(name, newDatabase); + } else if (!ignoreIfNotExists) { + throw new DatabaseNotExistException(getName(), name); + } + } + + /** + * Get the names of all databases in this catalog. + * + * @return a list of the names of all databases + * @throws CatalogException in case of any runtime exception + */ + @Override + public List<String> listDatabases() throws CatalogException { + return glueOperator.listGlueDatabases(); + } + + /** + * Get a database from this catalog. + * + * @param databaseName Name of the database + * @return The requested database + * @throws DatabaseNotExistException if the database does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public CatalogDatabase getDatabase(String databaseName) + throws DatabaseNotExistException, CatalogException { + + // glue supports only lowercase naming convention + databaseName = databaseName.toLowerCase(Locale.ROOT); + + checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName)); + return glueOperator.getDatabase(databaseName); + } + + /** + * Check if a database exists in this catalog. + * + * @param databaseName Name of the database + * @return true if the given database exists in the catalog false otherwise + * @throws CatalogException in case of any runtime exception + */ + @Override + public boolean databaseExists(String databaseName) throws CatalogException { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName)); + try { + CatalogDatabase database = getDatabase(databaseName); + return database != null; + } catch (DatabaseNotExistException e) { + return false; + } + } + + // ------ tables ------ + + /** + * Creates a new table or view. + * + * <p>The framework will make sure to call this method with fully validated {@link + * ResolvedCatalogTable} or {@link ResolvedCatalogView}. Those instances are easy to serialize + * for a durable catalog implementation. + * + * @param tablePath path of the table or view to be created + * @param table the table definition + * @param ignoreIfExists flag to specify behavior when a table or view already exists at the + * given path: if set to false, it throws a TableAlreadyExistException, if set to true, do + * nothing. + * @throws TableAlreadyExistException if table already exists and ignoreIfExists is false + * @throws DatabaseNotExistException if the database in tablePath doesn't exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + checkNotNull(tablePath); Review Comment: add exception message. ########## flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogFactory.java: ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.factories.CatalogFactory; +import org.apache.flink.table.factories.FactoryUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Set; + +/** Catalog factory for {@link GlueCatalog}. */ +public class GlueCatalogFactory implements CatalogFactory { + + private static final Logger LOG = LoggerFactory.getLogger(GlueCatalogFactory.class); + + @Override + public String factoryIdentifier() { + return GlueCatalogFactoryOptions.IDENTIFIER; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + return Collections.emptySet(); Review Comment: Just want to double check in here. GlueCatalog don't have any require options? ########## flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java: ########## @@ -0,0 +1,1029 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionImpl; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogView; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedCatalogView; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.exceptions.TablePartitionedException; +import org.apache.flink.table.catalog.glue.util.AwsClientFactories; +import org.apache.flink.table.catalog.glue.util.AwsProperties; +import org.apache.flink.table.catalog.glue.util.GlueOperator; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.functions.FunctionIdentifier; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.Partition; +import software.amazon.awssdk.services.glue.model.Table; + +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** A Glue catalog implementation that uses glue catalog. */ +public class GlueCatalog extends AbstractCatalog { + + /** instance of GlueOperator to facilitate glue related actions. */ + public GlueOperator glueOperator; + + /** Default database name if not passed as part of catalog. */ + public static final String DEFAULT_DB = "default"; + + private static final Logger LOG = LoggerFactory.getLogger(GlueCatalog.class); + + public GlueCatalog(String catalogName, String databaseName, ReadableConfig glueProperties) { + super(catalogName, databaseName); + checkNotNull(glueProperties); + initialize(glueProperties); + } + + public GlueCatalog(String catalogName) { + super(catalogName, DEFAULT_DB); + } + + public void initialize(ReadableConfig catalogProperties) { + // setLocationUri for the database level + String locationUri = catalogProperties.getOptional(GlueCatalogOptions.LOCATION_KEY).get(); + // .getOrDefault(GlueCatalogConstants.LOCATION_URI, ""); + // initialize aws client factories + AwsProperties awsProperties = new AwsProperties(catalogProperties); + + // create glue client + GlueClient glueClient = AwsClientFactories.factory(awsProperties).glue(); + this.glueOperator = new GlueOperator(locationUri, getName(), awsProperties, glueClient); + } + + /** + * Open the catalog. Used for any required preparation in initialization phase. + * + * @throws CatalogException in case of any runtime exception + */ + @Override + public void open() throws CatalogException {} + + /** + * Close the catalog when it is no longer needed and release any resource that it might be + * holding. + * + * @throws CatalogException in case of any runtime exception + */ + @Override + public void close() throws CatalogException { + try { + glueOperator.closeClient(); + } catch (Exception e) { + LOG.warn("Glue Client is not closed properly!"); + } + } + + // ------ databases ------ + + /** + * Create a database. + * + * @param name Name of the database to be created + * @param database The database definition + * @param ignoreIfExists Flag to specify behavior when a database with the given name already + * exists: if set to false, throw a DatabaseAlreadyExistException, if set to true, do + * nothing. + * @throws DatabaseAlreadyExistException if the given database already exists and ignoreIfExists + * is false + * @throws CatalogException in case of any runtime exception + */ + @Override + public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + + checkArgument(!StringUtils.isNullOrWhitespaceOnly(name)); + checkNotNull(database); + + // glue supports lowercase naming convention + name = name.toLowerCase(Locale.ROOT); + if (databaseExists(name)) { + if (!ignoreIfExists) { + throw new DatabaseAlreadyExistException(getName(), name); + } + } else { + glueOperator.createGlueDatabase(name, database); + } + } + + /** + * Drop a database. + * + * @param name Name of the database to be dropped. + * @param ignoreIfNotExists Flag to specify behavior when the database does not exist: if set to + * false, throw an exception, if set to true, do nothing. + * @param cascade Flag to specify behavior when the database contains table or function: if set + * to true, delete all tables and functions in the database and then delete the database, if + * set to false, throw an exception. + * @throws DatabaseNotExistException if the given database does not exist + * @throws DatabaseNotEmptyException if the given database is not empty and isRestrict is true + * @throws CatalogException in case of any runtime exception + */ + @Override + public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) + throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException { + + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(name), "Database name contains empty spaces"); + + // glue supports only lowercase naming convention + name = name.toLowerCase(Locale.ROOT); + + if (databaseExists(name)) { + if (cascade) { + // delete all tables in database + glueOperator.deleteTablesFromDatabase(name, listTables(name)); + // delete all functions in database + glueOperator.deleteFunctionsFromDatabase(name, listFunctions(name)); + } + + if (glueOperator.isDatabaseEmpty(name)) { + glueOperator.dropGlueDatabase(name); + } else { + throw new DatabaseNotEmptyException(getName(), name); + } + } else if (!ignoreIfNotExists) { + throw new DatabaseNotExistException(getName(), name); + } + } + + /** + * Modify an existing database. + * + * @param name Name of the database to be modified + * @param newDatabase The new database definition + * @param ignoreIfNotExists Flag to specify behavior when the given database does not exist: if + * set to false, throw an exception, if set to true, do nothing. + * @throws DatabaseNotExistException if the given database does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) + throws DatabaseNotExistException, CatalogException { + + // glue supports only lowercase naming convention + name = name.toLowerCase(Locale.ROOT); + + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(name), "Database name contains empty spaces"); + checkNotNull(newDatabase, "Database cannot be Empty"); + + CatalogDatabase existingDatabase = glueOperator.getDatabase(name); + if (existingDatabase != null) { + if (existingDatabase.getClass() != newDatabase.getClass()) { + throw new CatalogException( + String.format( + "Database types don't match. Existing database is '%s' and new database is '%s'.", + existingDatabase.getClass().getName(), + newDatabase.getClass().getName())); + } + glueOperator.updateGlueDatabase(name, newDatabase); + } else if (!ignoreIfNotExists) { + throw new DatabaseNotExistException(getName(), name); + } + } + + /** + * Get the names of all databases in this catalog. + * + * @return a list of the names of all databases + * @throws CatalogException in case of any runtime exception + */ + @Override + public List<String> listDatabases() throws CatalogException { + return glueOperator.listGlueDatabases(); + } + + /** + * Get a database from this catalog. + * + * @param databaseName Name of the database + * @return The requested database + * @throws DatabaseNotExistException if the database does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public CatalogDatabase getDatabase(String databaseName) + throws DatabaseNotExistException, CatalogException { + + // glue supports only lowercase naming convention + databaseName = databaseName.toLowerCase(Locale.ROOT); + + checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName)); + return glueOperator.getDatabase(databaseName); + } + + /** + * Check if a database exists in this catalog. + * + * @param databaseName Name of the database + * @return true if the given database exists in the catalog false otherwise + * @throws CatalogException in case of any runtime exception + */ + @Override + public boolean databaseExists(String databaseName) throws CatalogException { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName)); + try { + CatalogDatabase database = getDatabase(databaseName); + return database != null; + } catch (DatabaseNotExistException e) { + return false; + } + } + + // ------ tables ------ + + /** + * Creates a new table or view. + * + * <p>The framework will make sure to call this method with fully validated {@link + * ResolvedCatalogTable} or {@link ResolvedCatalogView}. Those instances are easy to serialize + * for a durable catalog implementation. + * + * @param tablePath path of the table or view to be created + * @param table the table definition + * @param ignoreIfExists flag to specify behavior when a table or view already exists at the + * given path: if set to false, it throws a TableAlreadyExistException, if set to true, do + * nothing. + * @throws TableAlreadyExistException if table already exists and ignoreIfExists is false + * @throws DatabaseNotExistException if the database in tablePath doesn't exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + checkNotNull(tablePath); + checkNotNull(table); + + if (!databaseExists(tablePath.getDatabaseName())) { + throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName()); + } + + if (tableExists(tablePath)) { + if (!ignoreIfExists) { + throw new TableAlreadyExistException(getName(), tablePath); + } + } else { + glueOperator.createGlueTable(tablePath, table, false); + } + } + + /** + * Modifies an existing table or view. Note that the new and old {@link CatalogBaseTable} must + * be of the same kind. For example, this doesn't allow altering a regular table to partitioned + * table, or altering a view to a table, and vice versa. + * + * <p>The framework will make sure to call this method with fully validated {@link + * ResolvedCatalogTable} or {@link ResolvedCatalogView}. Those instances are easy to serialize + * for a durable catalog implementation. + * + * @param tablePath path of the table or view to be modified + * @param newTable the new table definition + * @param ignoreIfNotExists flag to specify behavior when the table or view does not exist: if + * set to false, throw an exception, if set to true, do nothing. + * @throws TableNotExistException if the table does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public void alterTable( + ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + + checkNotNull(tablePath); Review Comment: dito ########## flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsProperties.java: ########## @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.util; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.catalog.glue.GlueCatalogOptions; + +import org.apache.flink.shaded.guava30.com.google.common.base.Strings; + +import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder; +import software.amazon.awssdk.core.client.builder.SdkClientBuilder; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; +import software.amazon.awssdk.services.glue.GlueClientBuilder; + +import java.net.URI; +import java.time.Duration; + +import static org.apache.flink.table.catalog.glue.GlueCatalogConstants.GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT; +import static org.apache.flink.table.catalog.glue.GlueCatalogConstants.GLUE_CATALOG_SKIP_NAME_VALIDATION_DEFAULT; +import static org.apache.flink.table.catalog.glue.GlueCatalogConstants.HTTP_CLIENT_TYPE_APACHE; +import static org.apache.flink.table.catalog.glue.GlueCatalogConstants.HTTP_CLIENT_TYPE_DEFAULT; +import static org.apache.flink.table.catalog.glue.GlueCatalogConstants.HTTP_CLIENT_TYPE_URLCONNECTION; + +/** Aws properties for glue and other clients. */ +public class AwsProperties { + + private Long httpClientUrlConnectionConnectionTimeoutMs; + + private Long httpClientUrlConnectionSocketTimeoutMs; + + private Long httpClientApacheConnectionAcquisitionTimeoutMs; + + private Long httpClientApacheConnectionMaxIdleTimeMs; + + private Long httpClientApacheConnectionTimeToLiveMs; + + private Long httpClientApacheConnectionTimeoutMs; + + private Boolean httpClientApacheExpectContinueEnabled; + + private Integer httpClientApacheMaxConnections; + + private Long httpClientApacheSocketTimeoutMs; + + private Boolean httpClientApacheTcpKeepAliveEnabled; + + private Boolean httpClientApacheUseIdleConnectionReaperEnabled; + + private String glueEndpoint; + + private String glueCatalogId; + + private Boolean glueCatalogSkipArchive; + + private Boolean glueCatalogSkipNameValidation; + + /** http client. */ + private String httpClientType; + + public AwsProperties(ReadableConfig properties) { + + this.httpClientType = properties.getOptional(GlueCatalogOptions.HTTP_CLIENT_TYPE).get(); + + this.httpClientUrlConnectionConnectionTimeoutMs = + properties + .getOptional( + GlueCatalogOptions.HTTP_CLIENT_URLCONNECTION_CONNECTION_TIMEOUT_MS) + .get(); + + this.httpClientUrlConnectionSocketTimeoutMs = + properties + .getOptional(GlueCatalogOptions.HTTP_CLIENT_URLCONNECTION_SOCKET_TIMEOUT_MS) + .get(); + + this.httpClientApacheConnectionAcquisitionTimeoutMs = + properties + .getOptional( + GlueCatalogOptions + .HTTP_CLIENT_APACHE_CONNECTION_ACQUISITION_TIMEOUT_MS) + .get(); + + this.httpClientApacheConnectionMaxIdleTimeMs = + properties + .getOptional( + GlueCatalogOptions.HTTP_CLIENT_APACHE_CONNECTION_MAX_IDLE_TIME_MS) + .get(); + + this.httpClientApacheConnectionTimeToLiveMs = + properties + .getOptional( + GlueCatalogOptions.HTTP_CLIENT_APACHE_CONNECTION_TIME_TO_LIVE_MS) + .get(); + + this.httpClientApacheConnectionTimeoutMs = + properties + .getOptional(GlueCatalogOptions.HTTP_CLIENT_APACHE_CONNECTION_TIMEOUT_MS) + .get(); + + this.httpClientApacheExpectContinueEnabled = + properties + .getOptional(GlueCatalogOptions.HTTP_CLIENT_APACHE_EXPECT_CONTINUE_ENABLED) + .orElse(false); + this.httpClientApacheMaxConnections = + properties + .getOptional(GlueCatalogOptions.HTTP_CLIENT_APACHE_MAX_CONNECTIONS) + .orElse(1); + + this.httpClientApacheSocketTimeoutMs = + properties + .getOptional(GlueCatalogOptions.HTTP_CLIENT_APACHE_SOCKET_TIMEOUT_MS) + .orElse(0L); + + this.httpClientApacheTcpKeepAliveEnabled = + properties + .getOptional(GlueCatalogOptions.HTTP_CLIENT_APACHE_TCP_KEEP_ALIVE_ENABLED) + .orElse(false); + this.httpClientApacheUseIdleConnectionReaperEnabled = + properties + .getOptional( + GlueCatalogOptions + .HTTP_CLIENT_APACHE_USE_IDLE_CONNECTION_REAPER_ENABLED) + .orElse(false); + + this.glueEndpoint = properties.getOptional(GlueCatalogOptions.GLUE_CATALOG_ENDPOINT).get(); Review Comment: Will be better to check if the option exists. ########## flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalog.java: ########## @@ -0,0 +1,1029 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionImpl; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogView; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedCatalogView; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.exceptions.TablePartitionedException; +import org.apache.flink.table.catalog.glue.util.AwsClientFactories; +import org.apache.flink.table.catalog.glue.util.AwsProperties; +import org.apache.flink.table.catalog.glue.util.GlueOperator; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.functions.FunctionIdentifier; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.Partition; +import software.amazon.awssdk.services.glue.model.Table; + +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** A Glue catalog implementation that uses glue catalog. */ +public class GlueCatalog extends AbstractCatalog { + + /** instance of GlueOperator to facilitate glue related actions. */ + public GlueOperator glueOperator; + + /** Default database name if not passed as part of catalog. */ + public static final String DEFAULT_DB = "default"; + + private static final Logger LOG = LoggerFactory.getLogger(GlueCatalog.class); + + public GlueCatalog(String catalogName, String databaseName, ReadableConfig glueProperties) { + super(catalogName, databaseName); + checkNotNull(glueProperties); + initialize(glueProperties); + } + + public GlueCatalog(String catalogName) { + super(catalogName, DEFAULT_DB); + } + + public void initialize(ReadableConfig catalogProperties) { + // setLocationUri for the database level + String locationUri = catalogProperties.getOptional(GlueCatalogOptions.LOCATION_KEY).get(); + // .getOrDefault(GlueCatalogConstants.LOCATION_URI, ""); + // initialize aws client factories + AwsProperties awsProperties = new AwsProperties(catalogProperties); + + // create glue client + GlueClient glueClient = AwsClientFactories.factory(awsProperties).glue(); + this.glueOperator = new GlueOperator(locationUri, getName(), awsProperties, glueClient); + } + + /** + * Open the catalog. Used for any required preparation in initialization phase. + * + * @throws CatalogException in case of any runtime exception + */ + @Override + public void open() throws CatalogException {} + + /** + * Close the catalog when it is no longer needed and release any resource that it might be + * holding. + * + * @throws CatalogException in case of any runtime exception + */ + @Override + public void close() throws CatalogException { + try { + glueOperator.closeClient(); + } catch (Exception e) { + LOG.warn("Glue Client is not closed properly!"); + } + } + + // ------ databases ------ + + /** + * Create a database. + * + * @param name Name of the database to be created + * @param database The database definition + * @param ignoreIfExists Flag to specify behavior when a database with the given name already + * exists: if set to false, throw a DatabaseAlreadyExistException, if set to true, do + * nothing. + * @throws DatabaseAlreadyExistException if the given database already exists and ignoreIfExists + * is false + * @throws CatalogException in case of any runtime exception + */ + @Override + public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + + checkArgument(!StringUtils.isNullOrWhitespaceOnly(name)); + checkNotNull(database); + + // glue supports lowercase naming convention + name = name.toLowerCase(Locale.ROOT); + if (databaseExists(name)) { + if (!ignoreIfExists) { + throw new DatabaseAlreadyExistException(getName(), name); + } + } else { + glueOperator.createGlueDatabase(name, database); + } + } + + /** + * Drop a database. + * + * @param name Name of the database to be dropped. + * @param ignoreIfNotExists Flag to specify behavior when the database does not exist: if set to + * false, throw an exception, if set to true, do nothing. + * @param cascade Flag to specify behavior when the database contains table or function: if set + * to true, delete all tables and functions in the database and then delete the database, if + * set to false, throw an exception. + * @throws DatabaseNotExistException if the given database does not exist + * @throws DatabaseNotEmptyException if the given database is not empty and isRestrict is true + * @throws CatalogException in case of any runtime exception + */ + @Override + public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) + throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException { + + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(name), "Database name contains empty spaces"); + + // glue supports only lowercase naming convention + name = name.toLowerCase(Locale.ROOT); + + if (databaseExists(name)) { + if (cascade) { + // delete all tables in database + glueOperator.deleteTablesFromDatabase(name, listTables(name)); + // delete all functions in database + glueOperator.deleteFunctionsFromDatabase(name, listFunctions(name)); + } + + if (glueOperator.isDatabaseEmpty(name)) { + glueOperator.dropGlueDatabase(name); + } else { + throw new DatabaseNotEmptyException(getName(), name); + } + } else if (!ignoreIfNotExists) { + throw new DatabaseNotExistException(getName(), name); + } + } + + /** + * Modify an existing database. + * + * @param name Name of the database to be modified + * @param newDatabase The new database definition + * @param ignoreIfNotExists Flag to specify behavior when the given database does not exist: if + * set to false, throw an exception, if set to true, do nothing. + * @throws DatabaseNotExistException if the given database does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) + throws DatabaseNotExistException, CatalogException { + + // glue supports only lowercase naming convention + name = name.toLowerCase(Locale.ROOT); + + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(name), "Database name contains empty spaces"); + checkNotNull(newDatabase, "Database cannot be Empty"); + + CatalogDatabase existingDatabase = glueOperator.getDatabase(name); + if (existingDatabase != null) { + if (existingDatabase.getClass() != newDatabase.getClass()) { + throw new CatalogException( + String.format( + "Database types don't match. Existing database is '%s' and new database is '%s'.", + existingDatabase.getClass().getName(), + newDatabase.getClass().getName())); + } + glueOperator.updateGlueDatabase(name, newDatabase); + } else if (!ignoreIfNotExists) { + throw new DatabaseNotExistException(getName(), name); + } + } + + /** + * Get the names of all databases in this catalog. + * + * @return a list of the names of all databases + * @throws CatalogException in case of any runtime exception + */ + @Override + public List<String> listDatabases() throws CatalogException { + return glueOperator.listGlueDatabases(); + } + + /** + * Get a database from this catalog. + * + * @param databaseName Name of the database + * @return The requested database + * @throws DatabaseNotExistException if the database does not exist + * @throws CatalogException in case of any runtime exception + */ + @Override + public CatalogDatabase getDatabase(String databaseName) + throws DatabaseNotExistException, CatalogException { + + // glue supports only lowercase naming convention + databaseName = databaseName.toLowerCase(Locale.ROOT); + + checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName)); + return glueOperator.getDatabase(databaseName); + } + + /** + * Check if a database exists in this catalog. + * + * @param databaseName Name of the database + * @return true if the given database exists in the catalog false otherwise + * @throws CatalogException in case of any runtime exception + */ + @Override + public boolean databaseExists(String databaseName) throws CatalogException { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName)); Review Comment: add exception message. ########## flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogFactory.java: ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.factories.CatalogFactory; +import org.apache.flink.table.factories.FactoryUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Set; + +/** Catalog factory for {@link GlueCatalog}. */ +public class GlueCatalogFactory implements CatalogFactory { + + private static final Logger LOG = LoggerFactory.getLogger(GlueCatalogFactory.class); Review Comment: please remove them if we don't need them in product code. ########## flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsProperties.java: ########## @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.util; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.catalog.glue.GlueCatalogOptions; + +import org.apache.flink.shaded.guava30.com.google.common.base.Strings; + +import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder; +import software.amazon.awssdk.core.client.builder.SdkClientBuilder; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; +import software.amazon.awssdk.services.glue.GlueClientBuilder; + +import java.net.URI; +import java.time.Duration; + +import static org.apache.flink.table.catalog.glue.GlueCatalogConstants.GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT; +import static org.apache.flink.table.catalog.glue.GlueCatalogConstants.GLUE_CATALOG_SKIP_NAME_VALIDATION_DEFAULT; +import static org.apache.flink.table.catalog.glue.GlueCatalogConstants.HTTP_CLIENT_TYPE_APACHE; +import static org.apache.flink.table.catalog.glue.GlueCatalogConstants.HTTP_CLIENT_TYPE_DEFAULT; +import static org.apache.flink.table.catalog.glue.GlueCatalogConstants.HTTP_CLIENT_TYPE_URLCONNECTION; + +/** Aws properties for glue and other clients. */ +public class AwsProperties { + + private Long httpClientUrlConnectionConnectionTimeoutMs; Review Comment: These fields can be finnal. ########## flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsClientFactory.java: ########## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.util; + +import org.apache.flink.configuration.ReadableConfig; + +import software.amazon.awssdk.services.glue.GlueClient; + +import java.io.Serializable; + +/** + * Interface to customize AWS clients used by Flink. A custom factory must have a no-arg. + * constructor, and use {@link #initialize(ReadableConfig)} to initialize the factory. + */ +public interface AwsClientFactory extends Serializable { Review Comment: Must it extend `Serializable`? ########## flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsProperties.java: ########## @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.util; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.catalog.glue.GlueCatalogOptions; + +import org.apache.flink.shaded.guava30.com.google.common.base.Strings; + +import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder; +import software.amazon.awssdk.core.client.builder.SdkClientBuilder; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; +import software.amazon.awssdk.services.glue.GlueClientBuilder; + +import java.net.URI; +import java.time.Duration; + +import static org.apache.flink.table.catalog.glue.GlueCatalogConstants.GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT; +import static org.apache.flink.table.catalog.glue.GlueCatalogConstants.GLUE_CATALOG_SKIP_NAME_VALIDATION_DEFAULT; +import static org.apache.flink.table.catalog.glue.GlueCatalogConstants.HTTP_CLIENT_TYPE_APACHE; +import static org.apache.flink.table.catalog.glue.GlueCatalogConstants.HTTP_CLIENT_TYPE_DEFAULT; +import static org.apache.flink.table.catalog.glue.GlueCatalogConstants.HTTP_CLIENT_TYPE_URLCONNECTION; + +/** Aws properties for glue and other clients. */ +public class AwsProperties { + + private Long httpClientUrlConnectionConnectionTimeoutMs; + + private Long httpClientUrlConnectionSocketTimeoutMs; + + private Long httpClientApacheConnectionAcquisitionTimeoutMs; + + private Long httpClientApacheConnectionMaxIdleTimeMs; + + private Long httpClientApacheConnectionTimeToLiveMs; + + private Long httpClientApacheConnectionTimeoutMs; + + private Boolean httpClientApacheExpectContinueEnabled; + + private Integer httpClientApacheMaxConnections; + + private Long httpClientApacheSocketTimeoutMs; + + private Boolean httpClientApacheTcpKeepAliveEnabled; + + private Boolean httpClientApacheUseIdleConnectionReaperEnabled; + + private String glueEndpoint; + + private String glueCatalogId; + + private Boolean glueCatalogSkipArchive; Review Comment: seem it haven't been used. ########## flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsProperties.java: ########## @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.util; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.catalog.glue.GlueCatalogOptions; + +import org.apache.flink.shaded.guava30.com.google.common.base.Strings; + +import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder; +import software.amazon.awssdk.core.client.builder.SdkClientBuilder; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; +import software.amazon.awssdk.services.glue.GlueClientBuilder; + +import java.net.URI; +import java.time.Duration; + +import static org.apache.flink.table.catalog.glue.GlueCatalogConstants.GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT; +import static org.apache.flink.table.catalog.glue.GlueCatalogConstants.GLUE_CATALOG_SKIP_NAME_VALIDATION_DEFAULT; +import static org.apache.flink.table.catalog.glue.GlueCatalogConstants.HTTP_CLIENT_TYPE_APACHE; +import static org.apache.flink.table.catalog.glue.GlueCatalogConstants.HTTP_CLIENT_TYPE_DEFAULT; +import static org.apache.flink.table.catalog.glue.GlueCatalogConstants.HTTP_CLIENT_TYPE_URLCONNECTION; + +/** Aws properties for glue and other clients. */ +public class AwsProperties { + + private Long httpClientUrlConnectionConnectionTimeoutMs; + + private Long httpClientUrlConnectionSocketTimeoutMs; + + private Long httpClientApacheConnectionAcquisitionTimeoutMs; + + private Long httpClientApacheConnectionMaxIdleTimeMs; + + private Long httpClientApacheConnectionTimeToLiveMs; + + private Long httpClientApacheConnectionTimeoutMs; + + private Boolean httpClientApacheExpectContinueEnabled; + + private Integer httpClientApacheMaxConnections; + + private Long httpClientApacheSocketTimeoutMs; + + private Boolean httpClientApacheTcpKeepAliveEnabled; + + private Boolean httpClientApacheUseIdleConnectionReaperEnabled; + + private String glueEndpoint; + + private String glueCatalogId; + + private Boolean glueCatalogSkipArchive; + + private Boolean glueCatalogSkipNameValidation; Review Comment: seem it haven't been used. ########## flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/mapper/GlueDatatypeMapper.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.mapper; + +import org.apache.flink.table.types.DataType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Implementation to define datatype from flink to glue catalog and vice-versa. */ +public class GlueDatatypeMapper<T extends DataType> implements DatatypeMapper<T> { + + private static final Logger LOG = LoggerFactory.getLogger(GlueDatatypeMapper.class); Review Comment: It haven't been used. ########## flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/AwsProperties.java: ########## @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.util; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.catalog.glue.GlueCatalogOptions; + +import org.apache.flink.shaded.guava30.com.google.common.base.Strings; + +import software.amazon.awssdk.awscore.client.builder.AwsSyncClientBuilder; +import software.amazon.awssdk.core.client.builder.SdkClientBuilder; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; +import software.amazon.awssdk.services.glue.GlueClientBuilder; + +import java.net.URI; +import java.time.Duration; + +import static org.apache.flink.table.catalog.glue.GlueCatalogConstants.GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT; +import static org.apache.flink.table.catalog.glue.GlueCatalogConstants.GLUE_CATALOG_SKIP_NAME_VALIDATION_DEFAULT; +import static org.apache.flink.table.catalog.glue.GlueCatalogConstants.HTTP_CLIENT_TYPE_APACHE; +import static org.apache.flink.table.catalog.glue.GlueCatalogConstants.HTTP_CLIENT_TYPE_DEFAULT; +import static org.apache.flink.table.catalog.glue.GlueCatalogConstants.HTTP_CLIENT_TYPE_URLCONNECTION; + +/** Aws properties for glue and other clients. */ +public class AwsProperties { + + private Long httpClientUrlConnectionConnectionTimeoutMs; + + private Long httpClientUrlConnectionSocketTimeoutMs; + + private Long httpClientApacheConnectionAcquisitionTimeoutMs; + + private Long httpClientApacheConnectionMaxIdleTimeMs; + + private Long httpClientApacheConnectionTimeToLiveMs; + + private Long httpClientApacheConnectionTimeoutMs; + + private Boolean httpClientApacheExpectContinueEnabled; + + private Integer httpClientApacheMaxConnections; + + private Long httpClientApacheSocketTimeoutMs; + + private Boolean httpClientApacheTcpKeepAliveEnabled; + + private Boolean httpClientApacheUseIdleConnectionReaperEnabled; + + private String glueEndpoint; + + private String glueCatalogId; + + private Boolean glueCatalogSkipArchive; + + private Boolean glueCatalogSkipNameValidation; + + /** http client. */ + private String httpClientType; + + public AwsProperties(ReadableConfig properties) { + + this.httpClientType = properties.getOptional(GlueCatalogOptions.HTTP_CLIENT_TYPE).get(); Review Comment: If the `ConfigOption` has default value, we can use `properties.get(xx)` directly. ########## flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/mapper/GlueDatatypeMapper.java: ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.mapper; + +import org.apache.flink.table.types.DataType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation to define datatype from flink to glue catalog and vice-versa. + */ +public class GlueDatatypeMapper<T extends DataType> implements DatatypeMapper<T> { + + private static final Logger LOG = LoggerFactory.getLogger(GlueDatatypeMapper.class); + + @Override + public String mapFlinkTypeToGlueType(T type) { + StringBuilder sb = new StringBuilder(); + if (type != null) { + getStringifiedDatatype(type, sb); + + } + return sb.toString(); + } + + private void getStringifiedDatatype(DataType dataType, StringBuilder sb) { Review Comment: +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
