vahmed-hamdy commented on code in PR #47:
URL:
https://github.com/apache/flink-connector-aws/pull/47#discussion_r1721636724
##########
flink-catalog-aws/flink-catalog-aws-glue/src/test/java/org/apache/flink/table/catalog/glue/TypeMapperTest.java:
##########
@@ -0,0 +1,91 @@
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/** Unit tests for the {@link TypeMapper} class. */
+public class TypeMapperTest {
+
+ @Test
+ public void testMapFlinkTypeToGlueType_Primitives() {
+ assertEquals("int", TypeMapper.mapFlinkTypeToGlueType(new IntType()));
+ assertEquals("bigint", TypeMapper.mapFlinkTypeToGlueType(new
BigIntType()));
+ assertEquals("string", TypeMapper.mapFlinkTypeToGlueType(new
VarCharType(255)));
+ assertEquals("boolean", TypeMapper.mapFlinkTypeToGlueType(new
BooleanType()));
+ assertEquals("decimal", TypeMapper.mapFlinkTypeToGlueType(new
DecimalType(10, 0)));
+ assertEquals("float", TypeMapper.mapFlinkTypeToGlueType(new
FloatType()));
+ assertEquals("double", TypeMapper.mapFlinkTypeToGlueType(new
DoubleType()));
+ assertEquals("date", TypeMapper.mapFlinkTypeToGlueType(new
DateType()));
+ assertEquals("timestamp", TypeMapper.mapFlinkTypeToGlueType(new
TimestampType(5)));
+ }
+
+ @Test
+ public void testMapFlinkTypeToGlueType_Array() {
+ LogicalType arrayType = new ArrayType(new VarCharType(255));
+ assertEquals("array<string>",
TypeMapper.mapFlinkTypeToGlueType(arrayType));
+ }
+
+ @Test
+ public void testMapFlinkTypeToGlueType_Map() {
+ LogicalType mapType = new MapType(new VarCharType(255), new IntType());
+ assertEquals("map<string,int>",
TypeMapper.mapFlinkTypeToGlueType(mapType));
+ }
+
+ @Test
+ public void testMapFlinkTypeToGlueType_Row() {
+ RowType rowType =
+ RowType.of(
+ new LogicalType[] {new VarCharType(255), new
IntType()},
+ new String[] {"name", "age"});
+ assertEquals("struct<name:string,age:int>",
TypeMapper.mapFlinkTypeToGlueType(rowType));
+ }
+
+ @Test
+ public void testGlueTypeToFlinkType_Primitives() {
+ assertEquals(DataTypes.INT(), TypeMapper.glueTypeToFlinkType("int"));
+ assertEquals(DataTypes.BIGINT(),
TypeMapper.glueTypeToFlinkType("bigint"));
+ assertEquals(DataTypes.STRING(),
TypeMapper.glueTypeToFlinkType("string"));
+ assertEquals(DataTypes.BOOLEAN(),
TypeMapper.glueTypeToFlinkType("boolean"));
+ assertEquals(DataTypes.DECIMAL(10, 0),
TypeMapper.glueTypeToFlinkType("decimal"));
+ assertEquals(DataTypes.FLOAT(),
TypeMapper.glueTypeToFlinkType("float"));
+ assertEquals(DataTypes.DOUBLE(),
TypeMapper.glueTypeToFlinkType("double"));
+ assertEquals(DataTypes.DATE(), TypeMapper.glueTypeToFlinkType("date"));
+ assertEquals(DataTypes.TIMESTAMP(5),
TypeMapper.glueTypeToFlinkType("timestamp"));
+ }
+
+ @Test
+ public void testGlueTypeToFlinkType_Array() {
+ LogicalType arrayType = new ArrayType(new VarCharType(255));
+ assertEquals("array<string>",
TypeMapper.mapFlinkTypeToGlueType(arrayType));
+ }
+
+ @Test
+ public void testGlueTypeToFlinkType_Map() {
+ LogicalType mapType = new MapType(new VarCharType(255), new IntType());
+ assertEquals("map<string,int>",
TypeMapper.mapFlinkTypeToGlueType(mapType));
Review Comment:
this is not glue to Flink as test suggests
##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/constants/GlueCatalogConstants.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue.constants;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.glue.GlueCatalog;
+
+import java.util.regex.Pattern;
+
+/** Constants and Defined Values used for {@link GlueCatalog}. */
+@Internal
+public class GlueCatalogConstants {
+ public static final String COMMENT = "comment";
+ public static final String DEFAULT_SEPARATOR = ":";
+ public static final String LOCATION_SEPARATOR = "/";
+ public static final String LOCATION_URI = "path";
+ public static final String AND = "and";
+ public static final String NEXT_LINE = "\n";
+ public static final String SPACE = " ";
+
+ public static final String TABLE_OWNER = "owner";
+ public static final String TABLE_INPUT_FORMAT = "table.input.format";
+ public static final String TABLE_OUTPUT_FORMAT = "table.output.format";
+
+ public static final String FLINK_SCALA_FUNCTION_PREFIX = "flink:scala:";
+ public static final String FLINK_PYTHON_FUNCTION_PREFIX = "flink:python:";
+ public static final String FLINK_JAVA_FUNCTION_PREFIX = "flink:java:";
+
+ public static final String FLINK_CATALOG = "FLINK_CATALOG";
+
+ public static final Pattern GLUE_DB_PATTERN =
Pattern.compile("^[a-z0-9_]{1,255}$");
+ public static final String GLUE_EXCEPTION_MSG_IDENTIFIER = "GLUE
EXCEPTION";
+ public static final String TABLE_NOT_EXISTS_IDENTIFIER = "TABLE DOESN'T
EXIST";
Review Comment:
`TABLE_NOT_EXISTS_IDENTIFIER` Is this used?
##########
flink-catalog-aws/flink-catalog-aws-glue/pom.xml:
##########
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-catalog-aws-parent</artifactId>
+ <version>4.4-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>flink-catalog-aws-glue</artifactId>
+ <name>Flink : Catalog : AWS : Glue</name>
+
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-aws-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>glue</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>apache-client</artifactId>
+ </dependency>
+
+ <!-- ArchUit test dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-architecture-tests-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java</artifactId>
+ <version>${flink.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <version>1.18.22</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
Review Comment:
IIUC, this should be distributed as an uberjar, right?
Do we need to relocate the sdk dependencies?
##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/TypeMapper.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+/**
+ * The {@code TypeMapper} class provides utility methods to map Flink's {@link
LogicalType} to AWS.
+ * Glue data types and vice versa.
+ *
+ * <p>This class supports conversion between Flink's logical types and Glue
data types, handling
+ * both primitive types and complex types such as arrays, maps, and rows. The
mapping allows for
+ * seamless integration between Flink and AWS Glue, enabling Flink to read
from and write to Glue
+ * tables with the appropriate data types.
+ *
+ * <p>For complex types like arrays, maps, and rows, the conversion is handled
recursively, ensuring
+ * that nested types are also converted accurately.
+ *
+ * <p>This class currently supports the following mappings:
+ *
+ * <ul>
+ * <li>Flink {@code IntType} -> Glue {@code int}
+ * <li>Flink {@code BigIntType} -> Glue {@code bigint}
+ * <li>Flink {@code VarCharType} -> Glue {@code string}
+ * <li>Flink {@code BooleanType} -> Glue {@code boolean}
+ * <li>Flink {@code DecimalType} -> Glue {@code decimal}
+ * <li>Flink {@code FloatType} -> Glue {@code float}
+ * <li>Flink {@code DoubleType} -> Glue {@code double}
+ * <li>Flink {@code DateType} -> Glue {@code date}
+ * <li>Flink {@code TimestampType} -> Glue {@code timestamp}
+ * <li>Flink {@code ArrayType} -> Glue {@code array<elementType>}
+ * <li>Flink {@code MapType} -> Glue {@code map<keyType,valueType>}
+ * <li>Flink {@code RowType} -> Glue {@code struct<fieldName:fieldType, ...>}
+ * </ul>
+ *
+ * <p>Note: Struct type handling in {@code glueTypeToFlinkType} is currently
not supported and will
+ * throw an {@link UnsupportedOperationException}.
+ *
+ * @see org.apache.flink.table.types.logical.LogicalType
+ * @see org.apache.flink.table.api.DataTypes
+ * @see org.apache.flink.table.catalog.CatalogTable
+ * @see org.apache.flink.table.catalog.ResolvedCatalogTable
+ */
+public class TypeMapper {
Review Comment:
annotation missing
##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/resources/META-INF/NOTICE:
##########
@@ -0,0 +1,7 @@
+flink-catalog-aws-glue
+Copyright 2014-2023 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+This project bundles the following dependencies under the Apache Software
License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
Review Comment:
Is this file needed? Unless we bundle and relocate any dependencies I don't
think we need it.
##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueUtils.java:
##########
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.FunctionLanguage;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.glue.GlueCatalogOptions;
+import org.apache.flink.table.catalog.glue.TypeMapper;
+import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.types.AbstractDataType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.model.Column;
+import software.amazon.awssdk.services.glue.model.Database;
+import software.amazon.awssdk.services.glue.model.GlueResponse;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunction;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.commons.lang3.BooleanUtils.FALSE;
+import static org.apache.commons.lang3.BooleanUtils.TRUE;
+import static
org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants.EXPLAIN_EXTRAS;
+import static
org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants.IS_PERSISTED;
+import static
org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants.IS_PHYSICAL;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
+
+/** Utilities related glue Operation. */
+@Internal
+public class GlueUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GlueUtils.class);
+
+ /**
+ * Glue supports lowercase naming convention.
+ *
+ * @param name fully qualified name.
+ * @return modified name according to glue convention.
+ */
+ public static String getGlueConventionalName(String name) {
+ return name.toLowerCase(Locale.ROOT);
+ }
+
+ /**
+ * Extract database location from properties and remove location from
properties. fallback to
+ * create default location if not present
+ *
+ * @param databaseProperties database properties.
+ * @param databaseName fully qualified name for database.
+ * @param catalogPath catalog path.
+ * @return location for database.
+ */
+ public static String extractDatabaseLocation(
+ final Map<String, String> databaseProperties,
+ final String databaseName,
+ final String catalogPath) {
+ if (databaseProperties.containsKey(GlueCatalogConstants.LOCATION_URI))
{
+ return
databaseProperties.remove(GlueCatalogConstants.LOCATION_URI);
+ } else {
+ LOG.info("No location URI Set. Using Catalog Path as default");
+ return catalogPath + GlueCatalogConstants.LOCATION_SEPARATOR +
databaseName;
+ }
+ }
+
+ /**
+ * Extract table location from table properties and remove location from
properties. fallback to
+ * create default location if not present
+ *
+ * @param tableProperties table properties.
+ * @param tablePath fully qualified object for table.
+ * @param catalogPath catalog path.
+ * @return location for table.
+ */
+ public static String extractTableLocation(
+ final Map<String, String> tableProperties,
+ final ObjectPath tablePath,
+ final String catalogPath) {
+ if (tableProperties.containsKey(GlueCatalogConstants.LOCATION_URI)) {
+ return tableProperties.remove(GlueCatalogConstants.LOCATION_URI);
+ } else {
+ return catalogPath
+ + GlueCatalogConstants.LOCATION_SEPARATOR
+ + tablePath.getDatabaseName()
+ + GlueCatalogConstants.LOCATION_SEPARATOR
+ + tablePath.getObjectName();
+ }
+ }
+
+ /**
+ * Build CatalogDatabase instance using information from glue Database
instance.
+ *
+ * @param glueDatabase {@link Database }
+ * @return {@link CatalogDatabase } instance.
+ */
+ public static CatalogDatabase getCatalogDatabase(final Database
glueDatabase) {
+ Map<String, String> properties = new
HashMap<>(glueDatabase.parameters());
+ return new CatalogDatabaseImpl(properties, glueDatabase.description());
+ }
+
+ /**
+ * A Glue database name cannot be longer than 255 characters. The only
acceptable characters are
+ * lowercase letters, numbers, and the underscore character. More details:
<a
+ *
href="https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html">...</a>
+ *
+ * @param name name
+ */
+ public static void validate(String name) {
+ checkArgument(
+ name != null &&
name.matches(GlueCatalogConstants.GLUE_DB_PATTERN.pattern()),
+ "Database name does not comply with the Glue naming
convention. "
+ + "Check here
https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html");
+ }
+
+ /** validate response from client call. */
+ public static void validateGlueResponse(GlueResponse response) {
+ if (response != null && !response.sdkHttpResponse().isSuccessful()) {
+ throw new
CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER);
+ }
+ }
+
+ /**
+ * @param udf Instance of UserDefinedFunction
+ * @return ClassName for function
+ */
+ public static String getCatalogFunctionClassName(final UserDefinedFunction
udf) {
+ validateUDFClassName(udf.className());
+ String[] splitName =
udf.className().split(GlueCatalogConstants.DEFAULT_SEPARATOR);
+ return splitName[splitName.length - 1];
+ }
+
+ /**
+ * Validates UDF class name from glue.
+ *
+ * @param name name of UDF.
+ */
+ private static void validateUDFClassName(final String name) {
+ checkArgument(!isNullOrWhitespaceOnly(name));
+
+ if (name.split(GlueCatalogConstants.DEFAULT_SEPARATOR).length
+ != GlueCatalogConstants.UDF_CLASS_NAME_SIZE) {
+ throw new ValidationException("Improper ClassName: " + name);
+ }
+ }
+
+ /**
+ * Derive functionalLanguage from glue function name. Glue doesn't have
any attribute to save
+ * the functionalLanguage Name. Thus, storing FunctionalLanguage in the
name itself.
+ *
+ * @param glueFunction Function name from glue.
+ * @return Identifier for FunctionalLanguage.
+ */
+ public static FunctionLanguage getFunctionalLanguage(final
UserDefinedFunction glueFunction) {
+ if
(glueFunction.className().startsWith(GlueCatalogConstants.FLINK_JAVA_FUNCTION_PREFIX))
{
+ return FunctionLanguage.JAVA;
+ } else if (glueFunction
+ .className()
+
.startsWith(GlueCatalogConstants.FLINK_PYTHON_FUNCTION_PREFIX)) {
+ return FunctionLanguage.PYTHON;
+ } else if (glueFunction
+ .className()
+ .startsWith(GlueCatalogConstants.FLINK_SCALA_FUNCTION_PREFIX))
{
+ return FunctionLanguage.SCALA;
+ } else {
+ throw new CatalogException(
+ "Invalid Functional Language for className: " +
glueFunction.className());
+ }
+ }
+
+ /**
+ * Get expanded Query from CatalogBaseTable.
+ *
+ * @param table Instance of catalogBaseTable.
+ * @return expandedQuery for Glue Table.
+ */
+ public static String getExpandedQuery(CatalogBaseTable table) {
+ // https://issues.apache.org/jira/browse/FLINK-31961
+ return "";
+ }
+
+ /**
+ * Get Original Query from CatalogBaseTable.
+ *
+ * @param table Instance of CatalogBaseTable.
+ * @return OriginalQuery for Glue Table.
+ */
+ public static String getOriginalQuery(CatalogBaseTable table) {
+ // https://issues.apache.org/jira/browse/FLINK-31961
+ return "";
+ }
+
+ /**
+ * Extract table owner name and remove from properties.
+ *
+ * @param properties Map of properties.
+ * @return fully qualified owner name.
+ */
+ public static String extractTableOwner(Map<String, String> properties) {
Review Comment:
- Why do we remove?
- removing is not tested
##########
flink-catalog-aws/flink-catalog-aws-glue/src/test/java/org/apache/flink/table/catalog/glue/TypeMapperTest.java:
##########
@@ -0,0 +1,91 @@
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/** Unit tests for the {@link TypeMapper} class. */
+public class TypeMapperTest {
+
+ @Test
+ public void testMapFlinkTypeToGlueType_Primitives() {
Review Comment:
nit: Junit5 conventions don't suggest a prefix `test` in test names
https://junit.org/junit5/docs/current/user-guide/
##########
flink-catalog-aws/flink-catalog-aws-glue/src/test/java/org/apache/flink/table/catalog/glue/TypeMapperTest.java:
##########
@@ -0,0 +1,91 @@
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/** Unit tests for the {@link TypeMapper} class. */
+public class TypeMapperTest {
+
+ @Test
+ public void testMapFlinkTypeToGlueType_Primitives() {
+ assertEquals("int", TypeMapper.mapFlinkTypeToGlueType(new IntType()));
+ assertEquals("bigint", TypeMapper.mapFlinkTypeToGlueType(new
BigIntType()));
+ assertEquals("string", TypeMapper.mapFlinkTypeToGlueType(new
VarCharType(255)));
+ assertEquals("boolean", TypeMapper.mapFlinkTypeToGlueType(new
BooleanType()));
+ assertEquals("decimal", TypeMapper.mapFlinkTypeToGlueType(new
DecimalType(10, 0)));
+ assertEquals("float", TypeMapper.mapFlinkTypeToGlueType(new
FloatType()));
+ assertEquals("double", TypeMapper.mapFlinkTypeToGlueType(new
DoubleType()));
+ assertEquals("date", TypeMapper.mapFlinkTypeToGlueType(new
DateType()));
+ assertEquals("timestamp", TypeMapper.mapFlinkTypeToGlueType(new
TimestampType(5)));
+ }
+
+ @Test
+ public void testMapFlinkTypeToGlueType_Array() {
Review Comment:
nit: We can use parameterized tests for all of these tests
##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/operator/GlueTableOperator.java:
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue.operator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants;
+import org.apache.flink.table.catalog.glue.util.GlueUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.Column;
+import software.amazon.awssdk.services.glue.model.CreateTableRequest;
+import software.amazon.awssdk.services.glue.model.CreateTableResponse;
+import software.amazon.awssdk.services.glue.model.DeleteTableRequest;
+import software.amazon.awssdk.services.glue.model.DeleteTableResponse;
+import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
+import software.amazon.awssdk.services.glue.model.GetTableRequest;
+import software.amazon.awssdk.services.glue.model.GetTableResponse;
+import software.amazon.awssdk.services.glue.model.GetTablesRequest;
+import software.amazon.awssdk.services.glue.model.GetTablesResponse;
+import software.amazon.awssdk.services.glue.model.GlueException;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+import software.amazon.awssdk.services.glue.model.UpdateTableRequest;
+import software.amazon.awssdk.services.glue.model.UpdateTableResponse;
+
+import java.time.Instant;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Utilities for Glue Table related operations. */
+@Internal
+public class GlueTableOperator extends GlueOperator {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(GlueTableOperator.class);
+
+ public GlueTableOperator(String catalogName, GlueClient glueClient, String
glueCatalogId) {
+ super(catalogName, glueClient, glueCatalogId);
+ }
+
+ /**
+ * Create table in glue data catalog service.
+ *
+ * @param tablePath Fully qualified name of table. {@link ObjectPath}
+ * @param table instance of {@link CatalogBaseTable} containing table
related information.
+ * @throws CatalogException on unexpected error happens.
+ */
+ public void createGlueTable(final ObjectPath tablePath, final
CatalogBaseTable table)
+ throws CatalogException {
+
+ checkNotNull(tablePath, "tablePath cannot be null");
+ checkNotNull(table, "table cannot be null");
+ checkArgument(table instanceof ResolvedCatalogBaseTable, "table should
be resolved");
+
+ final Map<String, String> tableProperties = new
HashMap<>(table.getOptions());
+ String tableOwner = GlueUtils.extractTableOwner(tableProperties);
+ List<Column> glueTableColumns =
GlueUtils.getGlueColumnsFromCatalogTable(table);
+ StorageDescriptor.Builder storageDescriptorBuilder =
+ StorageDescriptor.builder()
+
.inputFormat(GlueUtils.extractInputFormat(tableProperties))
+
.outputFormat(GlueUtils.extractOutputFormat(tableProperties));
+
+ TableInput.Builder tableInputBuilder =
+ TableInput.builder()
+ .name(tablePath.getObjectName())
+ .description(table.getComment())
+ .tableType(table.getTableKind().name())
+ .lastAccessTime(Instant.now())
+ .owner(tableOwner)
+ .viewExpandedText(GlueUtils.getExpandedQuery(table))
Review Comment:
since this is not a mandatory parameter in builder and we don't really
support those yet, why do we set those?
##########
flink-catalog-aws/flink-catalog-aws-glue/src/test/java/org/apache/flink/table/catalog/glue/TypeMapperTest.java:
##########
@@ -0,0 +1,91 @@
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/** Unit tests for the {@link TypeMapper} class. */
+public class TypeMapperTest {
+
+ @Test
+ public void testMapFlinkTypeToGlueType_Primitives() {
+ assertEquals("int", TypeMapper.mapFlinkTypeToGlueType(new IntType()));
+ assertEquals("bigint", TypeMapper.mapFlinkTypeToGlueType(new
BigIntType()));
+ assertEquals("string", TypeMapper.mapFlinkTypeToGlueType(new
VarCharType(255)));
+ assertEquals("boolean", TypeMapper.mapFlinkTypeToGlueType(new
BooleanType()));
+ assertEquals("decimal", TypeMapper.mapFlinkTypeToGlueType(new
DecimalType(10, 0)));
+ assertEquals("float", TypeMapper.mapFlinkTypeToGlueType(new
FloatType()));
+ assertEquals("double", TypeMapper.mapFlinkTypeToGlueType(new
DoubleType()));
+ assertEquals("date", TypeMapper.mapFlinkTypeToGlueType(new
DateType()));
+ assertEquals("timestamp", TypeMapper.mapFlinkTypeToGlueType(new
TimestampType(5)));
+ }
+
+ @Test
+ public void testMapFlinkTypeToGlueType_Array() {
+ LogicalType arrayType = new ArrayType(new VarCharType(255));
+ assertEquals("array<string>",
TypeMapper.mapFlinkTypeToGlueType(arrayType));
+ }
+
+ @Test
+ public void testMapFlinkTypeToGlueType_Map() {
+ LogicalType mapType = new MapType(new VarCharType(255), new IntType());
+ assertEquals("map<string,int>",
TypeMapper.mapFlinkTypeToGlueType(mapType));
+ }
+
+ @Test
+ public void testMapFlinkTypeToGlueType_Row() {
+ RowType rowType =
+ RowType.of(
+ new LogicalType[] {new VarCharType(255), new
IntType()},
+ new String[] {"name", "age"});
+ assertEquals("struct<name:string,age:int>",
TypeMapper.mapFlinkTypeToGlueType(rowType));
+ }
+
+ @Test
+ public void testGlueTypeToFlinkType_Primitives() {
+ assertEquals(DataTypes.INT(), TypeMapper.glueTypeToFlinkType("int"));
+ assertEquals(DataTypes.BIGINT(),
TypeMapper.glueTypeToFlinkType("bigint"));
+ assertEquals(DataTypes.STRING(),
TypeMapper.glueTypeToFlinkType("string"));
+ assertEquals(DataTypes.BOOLEAN(),
TypeMapper.glueTypeToFlinkType("boolean"));
+ assertEquals(DataTypes.DECIMAL(10, 0),
TypeMapper.glueTypeToFlinkType("decimal"));
+ assertEquals(DataTypes.FLOAT(),
TypeMapper.glueTypeToFlinkType("float"));
+ assertEquals(DataTypes.DOUBLE(),
TypeMapper.glueTypeToFlinkType("double"));
+ assertEquals(DataTypes.DATE(), TypeMapper.glueTypeToFlinkType("date"));
+ assertEquals(DataTypes.TIMESTAMP(5),
TypeMapper.glueTypeToFlinkType("timestamp"));
+ }
+
+ @Test
+ public void testGlueTypeToFlinkType_Array() {
+ LogicalType arrayType = new ArrayType(new VarCharType(255));
+ assertEquals("array<string>",
TypeMapper.mapFlinkTypeToGlueType(arrayType));
Review Comment:
this is not glue to Flink as test suggests
##########
flink-catalog-aws/flink-catalog-aws-glue/src/test/java/org/apache/flink/table/catalog/glue/GlueCatalogTest.java:
##########
@@ -0,0 +1,998 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogFunctionImpl;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionImpl;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.FunctionLanguage;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedCatalogView;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants;
+import org.apache.flink.table.catalog.glue.operator.GlueDatabaseOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueFunctionOperator;
+import org.apache.flink.table.catalog.glue.operator.GluePartitionOperator;
+import org.apache.flink.table.catalog.glue.operator.GlueTableOperator;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.types.DataType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static
org.apache.flink.table.catalog.glue.GlueCatalogTestUtils.COLUMN_1;
+import static
org.apache.flink.table.catalog.glue.GlueCatalogTestUtils.COLUMN_2;
+import static org.apache.flink.table.catalog.glue.GlueCatalogTestUtils.COMMENT;
+import static
org.apache.flink.table.catalog.glue.GlueCatalogTestUtils.DATABASE_1;
+import static
org.apache.flink.table.catalog.glue.GlueCatalogTestUtils.DATABASE_2;
+import static
org.apache.flink.table.catalog.glue.GlueCatalogTestUtils.DATABASE_DESCRIPTION;
+import static
org.apache.flink.table.catalog.glue.GlueCatalogTestUtils.FUNCTION_1;
+import static org.apache.flink.table.catalog.glue.GlueCatalogTestUtils.TABLE_1;
+import static org.apache.flink.table.catalog.glue.GlueCatalogTestUtils.TABLE_2;
+import static org.apache.flink.table.catalog.glue.GlueCatalogTestUtils.TABLE_3;
+import static org.apache.flink.table.catalog.glue.GlueCatalogTestUtils.TABLE_4;
+import static org.apache.flink.table.catalog.glue.GlueCatalogTestUtils.TABLE_5;
+import static org.apache.flink.table.catalog.glue.GlueCatalogTestUtils.VIEW_1;
+import static org.apache.flink.table.catalog.glue.GlueCatalogTestUtils.VIEW_2;
+import static
org.apache.flink.table.catalog.glue.GlueCatalogTestUtils.getDatabaseParams;
+import static
org.apache.flink.table.catalog.glue.GlueCatalogTestUtils.getDummyCatalogDatabase;
+import static
org.apache.flink.table.catalog.glue.GlueCatalogTestUtils.getDummyCatalogTable;
+import static
org.apache.flink.table.catalog.glue.GlueCatalogTestUtils.getDummyCatalogTableWithPartition;
+import static
org.apache.flink.table.catalog.glue.GlueCatalogTestUtils.getDummyTableParams;
+import static
org.apache.flink.table.catalog.glue.GlueCatalogTestUtils.getPartitionSpecParams;
+
+class GlueCatalogTest {
+
+ public static final String WAREHOUSE_PATH = "s3://bucket";
+ private static final String CATALOG_NAME = "glue";
+ private static DummyGlueClient glue;
+ private static GlueCatalog glueCatalog;
+
+ @BeforeAll
+ static void setUp() {
+ glue = new DummyGlueClient();
+ String glueCatalogId = "dummy-catalog-Id";
+ GlueDatabaseOperator glueDatabaseOperator =
+ new GlueDatabaseOperator(CATALOG_NAME, glue, glueCatalogId);
+ GlueTableOperator glueTableOperator =
+ new GlueTableOperator(CATALOG_NAME, glue, glueCatalogId);
+ GluePartitionOperator gluePartitionOperator =
+ new GluePartitionOperator(CATALOG_NAME, glue, glueCatalogId);
+ GlueFunctionOperator glueFunctionOperator =
+ new GlueFunctionOperator(CATALOG_NAME, glue, glueCatalogId);
+ glueCatalog =
+ new GlueCatalog(
+ CATALOG_NAME,
+ GlueCatalog.DEFAULT_DB,
+ glue,
+ glueDatabaseOperator,
+ glueTableOperator,
+ gluePartitionOperator,
+ glueFunctionOperator);
+ }
+
+ @BeforeEach
+ public void clear() {
+ glue.setDatabaseMap(new HashMap<>());
+ glue.setTableMap(new HashMap<>());
+ glue.setPartitionMap(new HashMap<>());
+ glue.setUserDefinedFunctionMap(new HashMap<>());
+ }
+
+ // ------ Database
+ @Test
+ void testCreateDatabase() throws DatabaseNotExistException {
Review Comment:
the test is hard to follow IMO, could you separate testing happy case?
##########
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/util/GlueUtils.java:
##########
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.FunctionLanguage;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.glue.GlueCatalogOptions;
+import org.apache.flink.table.catalog.glue.TypeMapper;
+import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.types.AbstractDataType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.glue.model.Column;
+import software.amazon.awssdk.services.glue.model.Database;
+import software.amazon.awssdk.services.glue.model.GlueResponse;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.UserDefinedFunction;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.commons.lang3.BooleanUtils.FALSE;
+import static org.apache.commons.lang3.BooleanUtils.TRUE;
+import static
org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants.EXPLAIN_EXTRAS;
+import static
org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants.IS_PERSISTED;
+import static
org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants.IS_PHYSICAL;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
+
+/** Utilities related glue Operation. */
+@Internal
+public class GlueUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GlueUtils.class);
+
+ /**
+ * Glue supports lowercase naming convention.
+ *
+ * @param name fully qualified name.
+ * @return modified name according to glue convention.
+ */
+ public static String getGlueConventionalName(String name) {
+ return name.toLowerCase(Locale.ROOT);
+ }
+
+ /**
+ * Extract database location from properties and remove location from
properties. fallback to
+ * create default location if not present
+ *
+ * @param databaseProperties database properties.
+ * @param databaseName fully qualified name for database.
+ * @param catalogPath catalog path.
+ * @return location for database.
+ */
+ public static String extractDatabaseLocation(
+ final Map<String, String> databaseProperties,
+ final String databaseName,
+ final String catalogPath) {
+ if (databaseProperties.containsKey(GlueCatalogConstants.LOCATION_URI))
{
+ return
databaseProperties.remove(GlueCatalogConstants.LOCATION_URI);
+ } else {
+ LOG.info("No location URI Set. Using Catalog Path as default");
+ return catalogPath + GlueCatalogConstants.LOCATION_SEPARATOR +
databaseName;
+ }
+ }
+
+ /**
+ * Extract table location from table properties and remove location from
properties. fallback to
+ * create default location if not present
+ *
+ * @param tableProperties table properties.
+ * @param tablePath fully qualified object for table.
+ * @param catalogPath catalog path.
+ * @return location for table.
+ */
+ public static String extractTableLocation(
+ final Map<String, String> tableProperties,
+ final ObjectPath tablePath,
+ final String catalogPath) {
+ if (tableProperties.containsKey(GlueCatalogConstants.LOCATION_URI)) {
+ return tableProperties.remove(GlueCatalogConstants.LOCATION_URI);
+ } else {
+ return catalogPath
+ + GlueCatalogConstants.LOCATION_SEPARATOR
+ + tablePath.getDatabaseName()
+ + GlueCatalogConstants.LOCATION_SEPARATOR
+ + tablePath.getObjectName();
+ }
+ }
+
+ /**
+ * Build CatalogDatabase instance using information from glue Database
instance.
+ *
+ * @param glueDatabase {@link Database }
+ * @return {@link CatalogDatabase } instance.
+ */
+ public static CatalogDatabase getCatalogDatabase(final Database
glueDatabase) {
+ Map<String, String> properties = new
HashMap<>(glueDatabase.parameters());
+ return new CatalogDatabaseImpl(properties, glueDatabase.description());
+ }
+
+ /**
+ * A Glue database name cannot be longer than 255 characters. The only
acceptable characters are
+ * lowercase letters, numbers, and the underscore character. More details:
<a
+ *
href="https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html">...</a>
+ *
+ * @param name name
+ */
+ public static void validate(String name) {
+ checkArgument(
+ name != null &&
name.matches(GlueCatalogConstants.GLUE_DB_PATTERN.pattern()),
+ "Database name does not comply with the Glue naming
convention. "
+ + "Check here
https://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html");
+ }
+
+ /** validate response from client call. */
+ public static void validateGlueResponse(GlueResponse response) {
+ if (response != null && !response.sdkHttpResponse().isSuccessful()) {
+ throw new
CatalogException(GlueCatalogConstants.GLUE_EXCEPTION_MSG_IDENTIFIER);
+ }
+ }
+
+ /**
+ * @param udf Instance of UserDefinedFunction
+ * @return ClassName for function
+ */
+ public static String getCatalogFunctionClassName(final UserDefinedFunction
udf) {
+ validateUDFClassName(udf.className());
+ String[] splitName =
udf.className().split(GlueCatalogConstants.DEFAULT_SEPARATOR);
+ return splitName[splitName.length - 1];
+ }
+
+ /**
+ * Validates UDF class name from glue.
+ *
+ * @param name name of UDF.
+ */
+ private static void validateUDFClassName(final String name) {
+ checkArgument(!isNullOrWhitespaceOnly(name));
+
+ if (name.split(GlueCatalogConstants.DEFAULT_SEPARATOR).length
+ != GlueCatalogConstants.UDF_CLASS_NAME_SIZE) {
+ throw new ValidationException("Improper ClassName: " + name);
+ }
+ }
+
+ /**
+ * Derive functionalLanguage from glue function name. Glue doesn't have
any attribute to save
+ * the functionalLanguage Name. Thus, storing FunctionalLanguage in the
name itself.
+ *
+ * @param glueFunction Function name from glue.
+ * @return Identifier for FunctionalLanguage.
+ */
+ public static FunctionLanguage getFunctionalLanguage(final
UserDefinedFunction glueFunction) {
+ if
(glueFunction.className().startsWith(GlueCatalogConstants.FLINK_JAVA_FUNCTION_PREFIX))
{
+ return FunctionLanguage.JAVA;
+ } else if (glueFunction
+ .className()
+
.startsWith(GlueCatalogConstants.FLINK_PYTHON_FUNCTION_PREFIX)) {
+ return FunctionLanguage.PYTHON;
+ } else if (glueFunction
+ .className()
+ .startsWith(GlueCatalogConstants.FLINK_SCALA_FUNCTION_PREFIX))
{
+ return FunctionLanguage.SCALA;
+ } else {
+ throw new CatalogException(
+ "Invalid Functional Language for className: " +
glueFunction.className());
+ }
+ }
+
+ /**
+ * Get expanded Query from CatalogBaseTable.
+ *
+ * @param table Instance of catalogBaseTable.
+ * @return expandedQuery for Glue Table.
+ */
+ public static String getExpandedQuery(CatalogBaseTable table) {
+ // https://issues.apache.org/jira/browse/FLINK-31961
+ return "";
+ }
+
+ /**
+ * Get Original Query from CatalogBaseTable.
+ *
+ * @param table Instance of CatalogBaseTable.
+ * @return OriginalQuery for Glue Table.
+ */
+ public static String getOriginalQuery(CatalogBaseTable table) {
+ // https://issues.apache.org/jira/browse/FLINK-31961
+ return "";
+ }
+
+ /**
+ * Extract table owner name and remove from properties.
+ *
+ * @param properties Map of properties.
+ * @return fully qualified owner name.
+ */
+ public static String extractTableOwner(Map<String, String> properties) {
Review Comment:
nit: Ideally we return Optional and add it to builder if present
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]