This is an automated email from the ASF dual-hosted git repository.
fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 6a3e0f36b [#3371] feat(flink-connector): support basic table operation
(#3795)
6a3e0f36b is described below
commit 6a3e0f36bf1ef0f5344a0684a22538ae4af79fe5
Author: Peidian li <[email protected]>
AuthorDate: Fri Jul 5 11:43:54 2024 +0800
[#3371] feat(flink-connector): support basic table operation (#3795)
### What changes were proposed in this pull request?
- Support table operation on Flink
### Why are the changes needed?
- Fix: #3371
### Does this PR introduce _any_ user-facing change?
- no
### How was this patch tested?
- add Uts and ITs
---
flink-connector/build.gradle.kts | 1 +
.../flink/connector/PropertiesConverter.java | 20 +++
.../flink/connector/catalog/BaseCatalog.java | 135 ++++++++++++++--
.../flink/connector/hive/GravitinoHiveCatalog.java | 29 ++++
.../connector/hive/HivePropertiesConverter.java | 23 +++
.../gravitino/flink/connector/utils/TypeUtils.java | 58 +++++++
.../hive/TestHivePropertiesConverter.java | 1 +
.../connector/integration/test/FlinkCommonIT.java | 171 ++++++++++++++++++++-
.../connector/integration/test/FlinkEnvIT.java | 28 +++-
.../integration/test/hive/FlinkHiveCatalogIT.java | 38 +++--
.../integration/test/utils/TestUtils.java | 32 ++++
.../flink/connector/utils/TestTypeUtils.java | 56 +++++++
12 files changed, 563 insertions(+), 29 deletions(-)
diff --git a/flink-connector/build.gradle.kts b/flink-connector/build.gradle.kts
index f34f1ff10..ff28a72c4 100644
--- a/flink-connector/build.gradle.kts
+++ b/flink-connector/build.gradle.kts
@@ -131,6 +131,7 @@ dependencies {
exclude("com.google.code.findbugs", "jsr305")
}
testImplementation("org.apache.flink:flink-table-planner_$scalaVersion:$flinkVersion")
+ testImplementation("org.apache.flink:flink-test-utils:$flinkVersion")
testRuntimeOnly(libs.junit.jupiter.engine)
}
diff --git
a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/PropertiesConverter.java
b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/PropertiesConverter.java
index 6b03a18b6..981044958 100644
---
a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/PropertiesConverter.java
+++
b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/PropertiesConverter.java
@@ -70,4 +70,24 @@ public interface PropertiesConverter {
default Map<String, String> toFlinkDatabaseProperties(Map<String, String>
gravitinoProperties) {
return gravitinoProperties;
}
+
+ /**
+ * Converts properties from Gravitino table properties to Flink connector
table properties.
+ *
+ * @param gravitinoProperties The table properties provided by Gravitino.
+ * @return The table properties for the Flink connector.
+ */
+ default Map<String, String> toFlinkTableProperties(Map<String, String>
gravitinoProperties) {
+ return gravitinoProperties;
+ }
+
+ /**
+ * Converts properties from Flink connector table properties to Gravitino
table properties.
+ *
+ * @param flinkProperties The table properties provided by Flink.
+ * @return The table properties for the Gravitino.
+ */
+ default Map<String, String> toGravitinoTableProperties(Map<String, String>
flinkProperties) {
+ return flinkProperties;
+ }
}
diff --git
a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/catalog/BaseCatalog.java
b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/catalog/BaseCatalog.java
index f98670b4c..ae6127c1c 100644
---
a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/catalog/BaseCatalog.java
+++
b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/catalog/BaseCatalog.java
@@ -20,19 +20,30 @@
package com.datastrato.gravitino.flink.connector.catalog;
import com.datastrato.gravitino.Catalog;
+import com.datastrato.gravitino.NameIdentifier;
+import com.datastrato.gravitino.Namespace;
import com.datastrato.gravitino.Schema;
import com.datastrato.gravitino.SchemaChange;
import com.datastrato.gravitino.exceptions.NoSuchCatalogException;
import com.datastrato.gravitino.exceptions.NoSuchSchemaException;
+import com.datastrato.gravitino.exceptions.NoSuchTableException;
import com.datastrato.gravitino.exceptions.NonEmptySchemaException;
import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException;
+import com.datastrato.gravitino.exceptions.TableAlreadyExistsException;
import com.datastrato.gravitino.flink.connector.PropertiesConverter;
+import com.datastrato.gravitino.flink.connector.utils.TypeUtils;
+import com.datastrato.gravitino.rel.Column;
+import com.datastrato.gravitino.rel.Table;
+import com.datastrato.gravitino.rel.TableChange;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.commons.compress.utils.Lists;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
@@ -41,7 +52,9 @@ import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
@@ -58,6 +71,7 @@ import
org.apache.flink.table.catalog.exceptions.TablePartitionedException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.types.DataType;
/**
* The BaseCatalog that provides a default implementation for all methods in
the {@link
@@ -149,8 +163,17 @@ public abstract class BaseCatalog extends AbstractCatalog {
}
@Override
- public List<String> listTables(String s) throws DatabaseNotExistException,
CatalogException {
- throw new UnsupportedOperationException();
+ public List<String> listTables(String databaseName)
+ throws DatabaseNotExistException, CatalogException {
+ try {
+ return
Stream.of(catalog().asTableCatalog().listTables(Namespace.of(databaseName)))
+ .map(NameIdentifier::name)
+ .collect(Collectors.toList());
+ } catch (NoSuchSchemaException e) {
+ throw new DatabaseNotExistException(catalogName(), databaseName, e);
+ } catch (Exception e) {
+ throw new CatalogException(e);
+ }
}
@Override
@@ -159,32 +182,95 @@ public abstract class BaseCatalog extends AbstractCatalog
{
}
@Override
- public CatalogBaseTable getTable(ObjectPath objectPath)
+ public CatalogBaseTable getTable(ObjectPath tablePath)
throws TableNotExistException, CatalogException {
- throw new UnsupportedOperationException();
+ try {
+ Table table =
+ catalog()
+ .asTableCatalog()
+ .loadTable(NameIdentifier.of(tablePath.getDatabaseName(),
tablePath.getObjectName()));
+ return toFlinkTable(table);
+ } catch (NoSuchTableException e) {
+ throw new TableNotExistException(catalogName(), tablePath, e);
+ } catch (Exception e) {
+ throw new CatalogException(e);
+ }
}
@Override
- public boolean tableExists(ObjectPath objectPath) throws CatalogException {
- throw new UnsupportedOperationException();
+ public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+ try {
+ return catalog()
+ .asTableCatalog()
+ .tableExists(NameIdentifier.of(tablePath.getDatabaseName(),
tablePath.getObjectName()));
+ } catch (Exception e) {
+ throw new CatalogException(e);
+ }
}
@Override
- public void dropTable(ObjectPath objectPath, boolean b)
+ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
- throw new UnsupportedOperationException();
+ boolean dropped =
+ catalog()
+ .asTableCatalog()
+ .dropTable(NameIdentifier.of(tablePath.getDatabaseName(),
tablePath.getObjectName()));
+ if (!dropped && !ignoreIfNotExists) {
+ throw new TableNotExistException(catalogName(), tablePath);
+ }
}
@Override
- public void renameTable(ObjectPath objectPath, String s, boolean b)
+ public void renameTable(ObjectPath tablePath, String newTableName, boolean
ignoreIfNotExists)
throws TableNotExistException, TableAlreadyExistException,
CatalogException {
- throw new UnsupportedOperationException();
+ NameIdentifier identifier =
+ NameIdentifier.of(Namespace.of(tablePath.getDatabaseName()),
newTableName);
+
+ if (catalog().asTableCatalog().tableExists(identifier)) {
+ throw new TableAlreadyExistException(
+ catalogName(), ObjectPath.fromString(tablePath.getDatabaseName() +
newTableName));
+ }
+
+ try {
+ catalog()
+ .asTableCatalog()
+ .alterTable(
+ NameIdentifier.of(tablePath.getDatabaseName(),
tablePath.getObjectName()),
+ TableChange.rename(newTableName));
+ } catch (NoSuchTableException e) {
+ if (!ignoreIfNotExists) {
+ throw new TableNotExistException(catalogName(), tablePath, e);
+ }
+ } catch (Exception e) {
+ throw new CatalogException(e);
+ }
}
@Override
- public void createTable(ObjectPath objectPath, CatalogBaseTable
catalogBaseTable, boolean b)
+ public void createTable(ObjectPath tablePath, CatalogBaseTable table,
boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException,
CatalogException {
- throw new UnsupportedOperationException();
+ NameIdentifier identifier =
+ NameIdentifier.of(tablePath.getDatabaseName(),
tablePath.getObjectName());
+
+ ResolvedCatalogBaseTable<?> resolvedTable = (ResolvedCatalogBaseTable<?>)
table;
+ Column[] columns =
+ resolvedTable.getResolvedSchema().getColumns().stream()
+ .map(this::toGravitinoColumn)
+ .toArray(Column[]::new);
+ String comment = table.getComment();
+ Map<String, String> properties =
+ propertiesConverter.toGravitinoTableProperties(table.getOptions());
+ try {
+ catalog().asTableCatalog().createTable(identifier, columns, comment,
properties);
+ } catch (NoSuchSchemaException e) {
+ throw new DatabaseNotExistException(catalogName(),
tablePath.getDatabaseName(), e);
+ } catch (TableAlreadyExistsException e) {
+ if (!ignoreIfExists) {
+ throw new TableAlreadyExistException(catalogName(), tablePath, e);
+ }
+ } catch (Exception e) {
+ throw new CatalogException(e);
+ }
}
@Override
@@ -351,6 +437,31 @@ public abstract class BaseCatalog extends AbstractCatalog {
protected abstract PropertiesConverter getPropertiesConverter();
+ protected CatalogBaseTable toFlinkTable(Table table) {
+ org.apache.flink.table.api.Schema.Builder builder =
+ org.apache.flink.table.api.Schema.newBuilder();
+ for (Column column : table.columns()) {
+ DataType flinkType = TypeUtils.toFlinkType(column.dataType());
+ builder
+ .column(column.name(), column.nullable() ? flinkType.nullable() :
flinkType.notNull())
+ .withComment(column.comment());
+ }
+ Map<String, String> flinkTableProperties =
+ propertiesConverter.toFlinkTableProperties(table.properties());
+ return CatalogTable.of(
+ builder.build(), table.comment(), ImmutableList.of(),
flinkTableProperties);
+ }
+
+ private Column toGravitinoColumn(org.apache.flink.table.catalog.Column
column) {
+ return Column.of(
+ column.getName(),
+ TypeUtils.toGravitinoType(column.getDataType().getLogicalType()),
+ column.getComment().orElse(null),
+ column.getDataType().getLogicalType().isNullable(),
+ false,
+ null);
+ }
+
@VisibleForTesting
static SchemaChange[] getSchemaChange(CatalogDatabase current,
CatalogDatabase updated) {
Map<String, String> currentProperties = current.getProperties();
diff --git
a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/hive/GravitinoHiveCatalog.java
b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/hive/GravitinoHiveCatalog.java
index aae5fb6ff..857caad70 100644
---
a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/hive/GravitinoHiveCatalog.java
+++
b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/hive/GravitinoHiveCatalog.java
@@ -22,7 +22,12 @@ import
com.datastrato.gravitino.flink.connector.PropertiesConverter;
import com.datastrato.gravitino.flink.connector.catalog.BaseCatalog;
import java.util.Optional;
import javax.annotation.Nullable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.factories.Factory;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -43,6 +48,18 @@ public class GravitinoHiveCatalog extends BaseCatalog {
this.hiveCatalog = new HiveCatalog(catalogName, defaultDatabase, hiveConf,
hiveVersion);
}
+ @Override
+ public void open() throws CatalogException {
+ super.open();
+ hiveCatalog.open();
+ }
+
+ @Override
+ public void close() throws CatalogException {
+ super.close();
+ hiveCatalog.close();
+ }
+
public HiveConf getHiveConf() {
return hiveCatalog.getHiveConf();
}
@@ -56,4 +73,16 @@ public class GravitinoHiveCatalog extends BaseCatalog {
protected PropertiesConverter getPropertiesConverter() {
return HivePropertiesConverter.INSTANCE;
}
+
+ @Override
+ public CatalogTableStatistics getTableStatistics(ObjectPath objectPath)
+ throws TableNotExistException, CatalogException {
+ return hiveCatalog.getTableStatistics(objectPath);
+ }
+
+ @Override
+ public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath)
+ throws TableNotExistException, CatalogException {
+ return hiveCatalog.getTableColumnStatistics(tablePath);
+ }
}
diff --git
a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/hive/HivePropertiesConverter.java
b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/hive/HivePropertiesConverter.java
index b780a5e99..9a2771ee3 100644
---
a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/hive/HivePropertiesConverter.java
+++
b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/hive/HivePropertiesConverter.java
@@ -20,10 +20,12 @@
package com.datastrato.gravitino.flink.connector.hive;
import com.datastrato.gravitino.catalog.hive.HiveCatalogPropertiesMeta;
+import com.datastrato.gravitino.catalog.hive.HiveTablePropertiesMetadata;
import com.datastrato.gravitino.flink.connector.PropertiesConverter;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -76,4 +78,25 @@ public class HivePropertiesConverter implements
PropertiesConverter {
});
return flinkCatalogProperties;
}
+
+ @Override
+ public Map<String, String> toFlinkTableProperties(Map<String, String>
gravitinoProperties) {
+ Map<String, String> properties =
+ gravitinoProperties.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ entry -> {
+ String key = entry.getKey();
+ if
(key.startsWith(HiveTablePropertiesMetadata.SERDE_PARAMETER_PREFIX)) {
+ return key.substring(
+
HiveTablePropertiesMetadata.SERDE_PARAMETER_PREFIX.length());
+ } else {
+ return key;
+ }
+ },
+ Map.Entry::getValue,
+ (existingValue, newValue) -> newValue));
+ properties.put("connector", "hive");
+ return properties;
+ }
}
diff --git
a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/utils/TypeUtils.java
b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/utils/TypeUtils.java
new file mode 100644
index 000000000..d6907ebb7
--- /dev/null
+++
b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/utils/TypeUtils.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datastrato.gravitino.flink.connector.utils;
+
+import com.datastrato.gravitino.rel.types.Type;
+import com.datastrato.gravitino.rel.types.Types;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+public class TypeUtils {
+
+ private TypeUtils() {}
+
+ public static Type toGravitinoType(LogicalType logicalType) {
+ switch (logicalType.getTypeRoot()) {
+ case VARCHAR:
+ return Types.StringType.get();
+ case DOUBLE:
+ return Types.DoubleType.get();
+ case INTEGER:
+ return Types.IntegerType.get();
+ default:
+ throw new UnsupportedOperationException(
+ "Not support type: " + logicalType.asSummaryString());
+ }
+ }
+
+ public static DataType toFlinkType(Type gravitinoType) {
+ switch (gravitinoType.name()) {
+ case DOUBLE:
+ return DataTypes.DOUBLE();
+ case STRING:
+ return DataTypes.STRING();
+ case INTEGER:
+ return DataTypes.INT();
+ default:
+ throw new UnsupportedOperationException("Not support " +
gravitinoType.toString());
+ }
+ }
+}
diff --git
a/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/hive/TestHivePropertiesConverter.java
b/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/hive/TestHivePropertiesConverter.java
index 397e1220c..807db5b3f 100644
---
a/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/hive/TestHivePropertiesConverter.java
+++
b/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/hive/TestHivePropertiesConverter.java
@@ -1,4 +1,5 @@
/*
+ * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
diff --git
a/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/FlinkCommonIT.java
b/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/FlinkCommonIT.java
index a304c106c..a59f29637 100644
---
a/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/FlinkCommonIT.java
+++
b/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/FlinkCommonIT.java
@@ -19,18 +19,31 @@
package com.datastrato.gravitino.flink.connector.integration.test;
+import static
com.datastrato.gravitino.flink.connector.integration.test.utils.TestUtils.assertColumns;
+import static
com.datastrato.gravitino.flink.connector.integration.test.utils.TestUtils.toFlinkPhysicalColumn;
+import static
com.datastrato.gravitino.rel.expressions.transforms.Transforms.EMPTY_TRANSFORM;
+
import com.datastrato.gravitino.Catalog;
+import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.Schema;
import com.datastrato.gravitino.catalog.hive.HiveSchemaPropertiesMetadata;
import
com.datastrato.gravitino.flink.connector.integration.test.utils.TestUtils;
+import com.datastrato.gravitino.rel.Column;
+import com.datastrato.gravitino.rel.Table;
+import com.datastrato.gravitino.rel.types.Types;
+import com.google.common.collect.ImmutableMap;
+import java.util.Optional;
+import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ResultKind;
import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
-@Tag("gravitino-docker-it")
public abstract class FlinkCommonIT extends FlinkEnvIT {
protected abstract Catalog currentCatalog();
@@ -163,4 +176,158 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
}
});
}
+
+ @Test
+ public void testCreateSimpleTable() {
+ String databaseName = "test_create_no_partition_table_db";
+ String tableName = "test_create_no_partition_table";
+ String comment = "test comment";
+ String key = "test key";
+ String value = "test value";
+
+ doWithSchema(
+ currentCatalog(),
+ databaseName,
+ catalog -> {
+ TableResult result =
+ sql(
+ "CREATE TABLE %s "
+ + "(string_type STRING COMMENT 'string_type', "
+ + " double_type DOUBLE COMMENT 'double_type')"
+ + " COMMENT '%s' WITH ("
+ + "'%s' = '%s')",
+ tableName, comment, key, value);
+ TestUtils.assertTableResult(result, ResultKind.SUCCESS);
+
+ Table table =
+
catalog.asTableCatalog().loadTable(NameIdentifier.of(databaseName, tableName));
+ Assertions.assertNotNull(table);
+ Assertions.assertEquals(comment, table.comment());
+ Assertions.assertEquals(value, table.properties().get(key));
+ Column[] columns =
+ new Column[] {
+ Column.of("string_type", Types.StringType.get(),
"string_type", true, false, null),
+ Column.of("double_type", Types.DoubleType.get(), "double_type")
+ };
+ assertColumns(columns, table.columns());
+ Assertions.assertArrayEquals(EMPTY_TRANSFORM, table.partitioning());
+
+ TestUtils.assertTableResult(
+ sql("INSERT INTO %s VALUES ('A', 1.0), ('B', 2.0)", tableName),
+ ResultKind.SUCCESS_WITH_CONTENT,
+ Row.of(-1L));
+ TestUtils.assertTableResult(
+ sql("SELECT * FROM %s", tableName),
+ ResultKind.SUCCESS_WITH_CONTENT,
+ Row.of("A", 1.0),
+ Row.of("B", 2.0));
+ },
+ true);
+ }
+
+ @Test
+ public void testListTables() {
+ String newSchema = "test_list_table_catalog";
+ Column[] columns = new Column[] {Column.of("user_id",
Types.IntegerType.get(), "USER_ID")};
+ doWithSchema(
+ currentCatalog(),
+ newSchema,
+ catalog -> {
+ catalog
+ .asTableCatalog()
+ .createTable(
+ NameIdentifier.of(newSchema, "test_table1"),
+ columns,
+ "comment1",
+ ImmutableMap.of());
+ catalog
+ .asTableCatalog()
+ .createTable(
+ NameIdentifier.of(newSchema, "test_table2"),
+ columns,
+ "comment2",
+ ImmutableMap.of());
+ TableResult result = sql("SHOW TABLES");
+ TestUtils.assertTableResult(
+ result,
+ ResultKind.SUCCESS_WITH_CONTENT,
+ Row.of("test_table1"),
+ Row.of("test_table2"));
+ },
+ true);
+ }
+
+ @Test
+ public void testDropTable() {
+ String databaseName = "test_drop_table_db";
+ doWithSchema(
+ currentCatalog(),
+ databaseName,
+ catalog -> {
+ String tableName = "test_drop_table";
+ Column[] columns =
+ new Column[] {Column.of("user_id", Types.IntegerType.get(),
"USER_ID")};
+ NameIdentifier identifier = NameIdentifier.of(databaseName,
tableName);
+ catalog.asTableCatalog().createTable(identifier, columns,
"comment1", ImmutableMap.of());
+
Assertions.assertTrue(catalog.asTableCatalog().tableExists(identifier));
+
+ TableResult result = sql("DROP TABLE %s", tableName);
+ TestUtils.assertTableResult(result, ResultKind.SUCCESS);
+
Assertions.assertFalse(catalog.asTableCatalog().tableExists(identifier));
+ },
+ true);
+ }
+
+ @Test
+ public void testGetSimpleTable() {
+ String databaseName = "test_get_simple_table";
+ Column[] columns =
+ new Column[] {
+ Column.of("string_type", Types.StringType.get(), "string_type",
true, false, null),
+ Column.of("double_type", Types.DoubleType.get(), "double_type")
+ };
+
+ doWithSchema(
+ currentCatalog(),
+ databaseName,
+ catalog -> {
+ String tableName = "test_desc_table";
+ String comment = "comment1";
+ catalog
+ .asTableCatalog()
+ .createTable(
+ NameIdentifier.of(databaseName, "test_desc_table"),
+ columns,
+ comment,
+ ImmutableMap.of("k1", "v1"));
+
+ Optional<org.apache.flink.table.catalog.Catalog> flinkCatalog =
+ tableEnv.getCatalog(catalog.name());
+ Assertions.assertTrue(flinkCatalog.isPresent());
+ try {
+ CatalogBaseTable table =
+ flinkCatalog.get().getTable(new ObjectPath(databaseName,
tableName));
+ Assertions.assertNotNull(table);
+ Assertions.assertEquals(CatalogBaseTable.TableKind.TABLE,
table.getTableKind());
+ Assertions.assertEquals(comment, table.getComment());
+
+ org.apache.flink.table.catalog.Column[] expected =
+ new org.apache.flink.table.catalog.Column[] {
+
org.apache.flink.table.catalog.Column.physical("string_type",
DataTypes.STRING())
+ .withComment("string_type"),
+
org.apache.flink.table.catalog.Column.physical("double_type",
DataTypes.DOUBLE())
+ .withComment("double_type")
+ };
+ org.apache.flink.table.catalog.Column[] actual =
+
toFlinkPhysicalColumn(table.getUnresolvedSchema().getColumns());
+ Assertions.assertArrayEquals(expected, actual);
+
+ CatalogTable catalogTable = (CatalogTable) table;
+ Assertions.assertFalse(catalogTable.isPartitioned());
+ } catch (TableNotExistException e) {
+ Assertions.fail(e);
+ }
+ },
+ true);
+ }
}
diff --git
a/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/FlinkEnvIT.java
b/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/FlinkEnvIT.java
index 13ef56660..fc574ffae 100644
---
a/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/FlinkEnvIT.java
+++
b/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/FlinkEnvIT.java
@@ -1,4 +1,5 @@
/*
+ * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
@@ -25,12 +26,14 @@ import
com.datastrato.gravitino.integration.test.container.ContainerSuite;
import com.datastrato.gravitino.integration.test.container.HiveContainer;
import com.datastrato.gravitino.integration.test.util.AbstractIT;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
import com.google.errorprone.annotations.FormatMethod;
import com.google.errorprone.annotations.FormatString;
import java.io.IOException;
import java.util.Collections;
import java.util.function.Consumer;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
@@ -125,7 +128,9 @@ public abstract class FlinkEnvIT extends AbstractIT {
"table.catalog-store.kind",
GravitinoCatalogStoreFactoryOptions.GRAVITINO);
configuration.setString("table.catalog-store.gravitino.gravitino.metalake",
GRAVITINO_METALAKE);
configuration.setString("table.catalog-store.gravitino.gravitino.uri",
gravitinoUri);
- tableEnv = TableEnvironment.create(configuration);
+ EnvironmentSettings.Builder builder =
+ EnvironmentSettings.newInstance().withConfiguration(configuration);
+ tableEnv = TableEnvironment.create(builder.inBatchMode().build());
}
private static void stopHdfsEnv() {
@@ -154,6 +159,27 @@ public abstract class FlinkEnvIT extends AbstractIT {
return tableEnv.executeSql(String.format(sql, args));
}
+ protected static void doWithSchema(
+ Catalog catalog, String schemaName, Consumer<Catalog> action, boolean
dropSchema) {
+ Preconditions.checkNotNull(catalog);
+ Preconditions.checkNotNull(schemaName);
+ try {
+ tableEnv.useCatalog(catalog.name());
+ if (!catalog.asSchemas().schemaExists(schemaName)) {
+ catalog
+ .asSchemas()
+ .createSchema(
+ schemaName, null, ImmutableMap.of("location", warehouse + "/"
+ schemaName));
+ }
+ tableEnv.useDatabase(schemaName);
+ action.accept(catalog);
+ } finally {
+ if (dropSchema) {
+ catalog.asSchemas().dropSchema(schemaName, true);
+ }
+ }
+ }
+
protected static void doWithCatalog(Catalog catalog, Consumer<Catalog>
action) {
Preconditions.checkNotNull(catalog);
tableEnv.useCatalog(catalog.name());
diff --git
a/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
b/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
index b85c0d3f6..4db72cc9e 100644
---
a/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
+++
b/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
@@ -1,4 +1,5 @@
/*
+ * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
@@ -23,6 +24,7 @@ import
com.datastrato.gravitino.flink.connector.PropertiesConverter;
import com.datastrato.gravitino.flink.connector.hive.GravitinoHiveCatalog;
import
com.datastrato.gravitino.flink.connector.hive.GravitinoHiveCatalogFactoryOptions;
import com.datastrato.gravitino.flink.connector.integration.test.FlinkCommonIT;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.util.Arrays;
@@ -42,28 +44,37 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
@Tag("gravitino-docker-test")
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class FlinkHiveCatalogIT extends FlinkCommonIT {
+ private static final String DEFAULT_HIVE_CATALOG =
"test_flink_hive_schema_catalog";
private static com.datastrato.gravitino.Catalog hiveCatalog;
@BeforeAll
static void hiveStartUp() {
+ initDefaultHiveCatalog();
+ }
+
+ @AfterAll
+ static void hiveStop() {
+ Preconditions.checkNotNull(metalake);
+ metalake.dropCatalog(DEFAULT_HIVE_CATALOG);
+ }
+
+ protected static void initDefaultHiveCatalog() {
+ Preconditions.checkNotNull(metalake);
hiveCatalog =
metalake.createCatalog(
- "test_flink_hive_schema_catalog",
+ DEFAULT_HIVE_CATALOG,
com.datastrato.gravitino.Catalog.Type.RELATIONAL,
"hive",
null,
ImmutableMap.of("metastore.uris", hiveMetastoreUri));
}
- @AfterAll
- static void hiveStop() {
- metalake.dropCatalog("test_flink_hive_schema_catalog");
- }
-
@Test
public void testCreateGravitinoHiveCatalog() {
tableEnv.useCatalog(DEFAULT_CATALOG);
@@ -75,8 +86,7 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
configuration.set(
CommonCatalogOptions.CATALOG_TYPE,
GravitinoHiveCatalogFactoryOptions.IDENTIFIER);
configuration.set(HiveCatalogFactoryOptions.HIVE_CONF_DIR,
"src/test/resources/flink-tests");
- configuration.set(
- GravitinoHiveCatalogFactoryOptions.HIVE_METASTORE_URIS,
"thrift://127.0.0.1:9084");
+ configuration.set(GravitinoHiveCatalogFactoryOptions.HIVE_METASTORE_URIS,
hiveMetastoreUri);
CatalogDescriptor catalogDescriptor = CatalogDescriptor.of(catalogName,
configuration);
tableEnv.createCatalog(catalogName, catalogDescriptor);
Assertions.assertTrue(metalake.catalogExists(catalogName));
@@ -84,7 +94,7 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
// Check the catalog properties.
com.datastrato.gravitino.Catalog gravitinoCatalog =
metalake.loadCatalog(catalogName);
Map<String, String> properties = gravitinoCatalog.properties();
- Assertions.assertEquals("thrift://127.0.0.1:9084",
properties.get(METASTORE_URIS));
+ Assertions.assertEquals(hiveMetastoreUri, properties.get(METASTORE_URIS));
Map<String, String> flinkProperties =
gravitinoCatalog.properties().entrySet().stream()
.filter(e ->
e.getKey().startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX))
@@ -141,16 +151,16 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
"create catalog %s with ("
+ "'type'='gravitino-hive', "
+ "'hive-conf-dir'='src/test/resources/flink-tests',"
- + "'hive.metastore.uris'='thrift://127.0.0.1:9084',"
+ + "'hive.metastore.uris'='%s',"
+ "'unknown.key'='unknown.value'"
+ ")",
- catalogName));
+ catalogName, hiveMetastoreUri));
Assertions.assertTrue(metalake.catalogExists(catalogName));
// Check the properties of the created catalog.
com.datastrato.gravitino.Catalog gravitinoCatalog =
metalake.loadCatalog(catalogName);
Map<String, String> properties = gravitinoCatalog.properties();
- Assertions.assertEquals("thrift://127.0.0.1:9084",
properties.get(METASTORE_URIS));
+ Assertions.assertEquals(hiveMetastoreUri, properties.get(METASTORE_URIS));
Map<String, String> flinkProperties =
properties.entrySet().stream()
.filter(e ->
e.getKey().startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX))
@@ -245,7 +255,7 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
"flink.bypass.hive.test",
"hive.config",
"metastore.uris",
- "thrift://127.0.0.1:9084"));
+ hiveMetastoreUri));
Assertions.assertNotNull(gravitinoCatalog);
Assertions.assertEquals(catalogName, gravitinoCatalog.name());
Assertions.assertTrue(metalake.catalogExists(catalogName));
@@ -261,7 +271,7 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
Assertions.assertTrue(hiveConf.size() > 0, "Should have hive conf");
Assertions.assertEquals("hive.config", hiveConf.get("hive.test"));
Assertions.assertEquals(
- "thrift://127.0.0.1:9084",
hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname));
+ hiveMetastoreUri,
hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname));
// drop the catalog.
tableEnv.useCatalog(DEFAULT_CATALOG);
diff --git
a/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/utils/TestUtils.java
b/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/utils/TestUtils.java
index 08a2f6b1c..e08b1cd7c 100644
---
a/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/utils/TestUtils.java
+++
b/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/utils/TestUtils.java
@@ -1,4 +1,5 @@
/*
+ * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
@@ -17,10 +18,13 @@
*/
package com.datastrato.gravitino.flink.connector.integration.test.utils;
+import com.datastrato.gravitino.rel.Column;
import com.google.common.collect.Lists;
import java.util.List;
import org.apache.flink.table.api.ResultKind;
+import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.Assertions;
@@ -42,4 +46,32 @@ public class TestUtils {
}
}
}
+
+ public static void assertColumns(Column[] expected, Column[] actual) {
+ Assertions.assertEquals(expected.length, actual.length);
+ for (int i = 0; i < expected.length; i++) {
+ Assertions.assertEquals(expected[i].name(), actual[i].name());
+ Assertions.assertEquals(expected[i].comment(), actual[i].comment());
+ Assertions.assertEquals(
+ expected[i].dataType().simpleString(),
actual[i].dataType().simpleString());
+ Assertions.assertEquals(expected[i].defaultValue(),
actual[i].defaultValue());
+ Assertions.assertEquals(expected[i].autoIncrement(),
actual[i].autoIncrement());
+ Assertions.assertEquals(expected[i].nullable(), actual[i].nullable());
+ }
+ }
+
+ public static org.apache.flink.table.catalog.Column[] toFlinkPhysicalColumn(
+ List<Schema.UnresolvedColumn> unresolvedPhysicalColumns) {
+ return unresolvedPhysicalColumns.stream()
+ .map(
+ column -> {
+ Schema.UnresolvedPhysicalColumn unresolvedPhysicalColumn =
+ (Schema.UnresolvedPhysicalColumn) column;
+ return org.apache.flink.table.catalog.Column.physical(
+ unresolvedPhysicalColumn.getName(),
+ (DataType) unresolvedPhysicalColumn.getDataType())
+
.withComment(unresolvedPhysicalColumn.getComment().orElse(null));
+ })
+ .toArray(org.apache.flink.table.catalog.Column[]::new);
+ }
}
diff --git
a/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/utils/TestTypeUtils.java
b/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/utils/TestTypeUtils.java
new file mode 100644
index 000000000..1a381d232
--- /dev/null
+++
b/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/utils/TestTypeUtils.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datastrato.gravitino.flink.connector.utils;
+
+import com.datastrato.gravitino.rel.types.Types;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.UnresolvedUserDefinedType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestTypeUtils {
+
+ @Test
+ public void testToGravitinoType() {
+ Assertions.assertEquals(
+ Types.StringType.get(), TypeUtils.toGravitinoType(new
VarCharType(Integer.MAX_VALUE)));
+ Assertions.assertEquals(Types.DoubleType.get(),
TypeUtils.toGravitinoType(new DoubleType()));
+ Assertions.assertEquals(Types.IntegerType.get(),
TypeUtils.toGravitinoType(new IntType()));
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () ->
+ TypeUtils.toGravitinoType(
+ new UnresolvedUserDefinedType(UnresolvedIdentifier.of("a",
"b", "c"))));
+ }
+
+ @Test
+ public void testToFlinkType() {
+ Assertions.assertEquals(DataTypes.DOUBLE(),
TypeUtils.toFlinkType(Types.DoubleType.get()));
+ Assertions.assertEquals(DataTypes.STRING(),
TypeUtils.toFlinkType(Types.StringType.get()));
+ Assertions.assertEquals(DataTypes.INT(),
TypeUtils.toFlinkType(Types.IntegerType.get()));
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> TypeUtils.toFlinkType(Types.UnparsedType.of("unknown")));
+ }
+}