Samrat002 commented on code in PR #47: URL: https://github.com/apache/flink-connector-aws/pull/47#discussion_r1719546294
########## flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueUtils.java: ########## @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.glue.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.FunctionLanguage; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.glue.GlueCatalogOptions; +import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.glue.model.Column; +import software.amazon.awssdk.services.glue.model.Database; +import software.amazon.awssdk.services.glue.model.GlueResponse; +import software.amazon.awssdk.services.glue.model.Table; +import software.amazon.awssdk.services.glue.model.UserDefinedFunction; + +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly; + +/** Utilities related glue Operation. */ +@Internal +public class GlueUtils { + + private static final Logger LOG = LoggerFactory.getLogger(GlueUtils.class); + + /** + * Glue supports lowercase naming convention. + * + * @param name fully qualified name. + * @return modified name according to glue convention. + */ + public static String getGlueConventionalName(String name) { + + return name.toLowerCase(Locale.ROOT); + } + + /** + * Extract location from database properties if present and remove location from properties. + * fallback to create default location if not present + * + * @param databaseProperties database properties. + * @param databaseName fully qualified name for database. + * @return location for database. + */ + public static String extractDatabaseLocation( + final Map<String, String> databaseProperties, + final String databaseName, + final String catalogPath) { + if (databaseProperties.containsKey(GlueCatalogConstants.LOCATION_URI)) { + return databaseProperties.remove(GlueCatalogConstants.LOCATION_URI); + } else { + LOG.info("No location URI Set. Using Catalog Path as default"); + return catalogPath + GlueCatalogConstants.LOCATION_SEPARATOR + databaseName; + } + } + + /** + * Extract location from database properties if present and remove location from properties. + * fallback to create default location if not present + * + * @param tableProperties table properties. + * @param tablePath fully qualified object for table. + * @return location for table. + */ + public static String extractTableLocation( + final Map<String, String> tableProperties, + final ObjectPath tablePath, + final String catalogPath) { + if (tableProperties.containsKey(GlueCatalogConstants.LOCATION_URI)) { + return tableProperties.remove(GlueCatalogConstants.LOCATION_URI); + } else { + return catalogPath + + GlueCatalogConstants.LOCATION_SEPARATOR + + tablePath.getDatabaseName() + + GlueCatalogConstants.LOCATION_SEPARATOR + + tablePath.getObjectName(); + } + } + + /** + * Build CatalogDatabase instance using information from glue Database instance. + * + * @param glueDatabase {@link Database } + * @return {@link CatalogDatabase } instance. + */ + public static CatalogDatabase getCatalogDatabase(final Database glueDatabase) { + Map<String, String> properties = new HashMap<>(glueDatabase.parameters()); + return new CatalogDatabaseImpl(properties, glueDatabase.description()); + } + + /** + * A Glue database name cannot be longer than 252 characters. The only acceptable characters are + * lowercase letters, numbers, and the underscore character. More details: <a + * href="https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html">...</a> + * + * @param name name + */ + public static void validate(String name) { + checkArgument( + name != null && GlueCatalogConstants.GLUE_DB_PATTERN.matcher(name).find(), + "Database name is not according to Glue Norms, " + + "check here https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html"); + } + + /** validate response from client call. */ + public static void validateGlueResponse(GlueResponse response) { + if (response != null && !response.sdkHttpResponse().isSuccessful()) { + throw new CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER); + } + } + + /** + * @param udf Instance of UserDefinedFunction + * @return ClassName for function + */ + public static String getCatalogFunctionClassName(final UserDefinedFunction udf) { + validateUDFClassName(udf.className()); + String[] splitName = udf.className().split(GlueCatalogConstants.DEFAULT_SEPARATOR); + return splitName[splitName.length - 1]; + } + + /** + * Validates UDF class name from glue. + * + * @param name name of UDF. + */ + private static void validateUDFClassName(final String name) { + checkArgument(!isNullOrWhitespaceOnly(name)); + + if (name.split(GlueCatalogConstants.DEFAULT_SEPARATOR).length + != GlueCatalogConstants.UDF_CLASS_NAME_SIZE) { + throw new ValidationException("Improper Classname"); + } + } + + /** + * Derive functionalLanguage from glue function name. Glue doesn't have any attribute to save + * the functionalLanguage Name. Thus, storing FunctionalLanguage in the name itself. + * + * @param glueFunction Function name from glue. + * @return Identifier for FunctionalLanguage. + */ + public static FunctionLanguage getFunctionalLanguage(final UserDefinedFunction glueFunction) { + if (glueFunction.className().startsWith(GlueCatalogConstants.FLINK_JAVA_FUNCTION_PREFIX)) { + return FunctionLanguage.JAVA; + } else if (glueFunction + .className() + .startsWith(GlueCatalogConstants.FLINK_PYTHON_FUNCTION_PREFIX)) { + return FunctionLanguage.PYTHON; + } else if (glueFunction + .className() + .startsWith(GlueCatalogConstants.FLINK_SCALA_FUNCTION_PREFIX)) { + return FunctionLanguage.SCALA; + } else { + throw new CatalogException("Invalid Functional Language"); + } + } + + /** + * Get expanded Query from CatalogBaseTable. + * + * @param table Instance of catalogBaseTable. + * @return expandedQuery for Glue Table. + */ + public static String getExpandedQuery(CatalogBaseTable table) { + // https://issues.apache.org/jira/browse/FLINK-31961 + return ""; Review Comment: throwing unsupported exception lead to failure in other operation . for now wanted to keep empty and then pick FLINK-31961 to implement the same -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
