This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 6a3e0f36b [#3371] feat(flink-connector): support basic table operation 
(#3795)
6a3e0f36b is described below

commit 6a3e0f36bf1ef0f5344a0684a22538ae4af79fe5
Author: Peidian li <[email protected]>
AuthorDate: Fri Jul 5 11:43:54 2024 +0800

    [#3371] feat(flink-connector): support basic table operation (#3795)
    
    ### What changes were proposed in this pull request?
    
    - Support table operation on Flink
    
    ### Why are the changes needed?
    
    - Fix: #3371
    
    ### Does this PR introduce _any_ user-facing change?
    
    - no
    
    ### How was this patch tested?
    - add Uts and ITs
---
 flink-connector/build.gradle.kts                   |   1 +
 .../flink/connector/PropertiesConverter.java       |  20 +++
 .../flink/connector/catalog/BaseCatalog.java       | 135 ++++++++++++++--
 .../flink/connector/hive/GravitinoHiveCatalog.java |  29 ++++
 .../connector/hive/HivePropertiesConverter.java    |  23 +++
 .../gravitino/flink/connector/utils/TypeUtils.java |  58 +++++++
 .../hive/TestHivePropertiesConverter.java          |   1 +
 .../connector/integration/test/FlinkCommonIT.java  | 171 ++++++++++++++++++++-
 .../connector/integration/test/FlinkEnvIT.java     |  28 +++-
 .../integration/test/hive/FlinkHiveCatalogIT.java  |  38 +++--
 .../integration/test/utils/TestUtils.java          |  32 ++++
 .../flink/connector/utils/TestTypeUtils.java       |  56 +++++++
 12 files changed, 563 insertions(+), 29 deletions(-)

diff --git a/flink-connector/build.gradle.kts b/flink-connector/build.gradle.kts
index f34f1ff10..ff28a72c4 100644
--- a/flink-connector/build.gradle.kts
+++ b/flink-connector/build.gradle.kts
@@ -131,6 +131,7 @@ dependencies {
     exclude("com.google.code.findbugs", "jsr305")
   }
   
testImplementation("org.apache.flink:flink-table-planner_$scalaVersion:$flinkVersion")
+  testImplementation("org.apache.flink:flink-test-utils:$flinkVersion")
 
   testRuntimeOnly(libs.junit.jupiter.engine)
 }
diff --git 
a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/PropertiesConverter.java
 
b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/PropertiesConverter.java
index 6b03a18b6..981044958 100644
--- 
a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/PropertiesConverter.java
+++ 
b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/PropertiesConverter.java
@@ -70,4 +70,24 @@ public interface PropertiesConverter {
   default Map<String, String> toFlinkDatabaseProperties(Map<String, String> 
gravitinoProperties) {
     return gravitinoProperties;
   }
+
+  /**
+   * Converts properties from Gravitino table properties to Flink connector 
table properties.
+   *
+   * @param gravitinoProperties The table properties provided by Gravitino.
+   * @return The table properties for the Flink connector.
+   */
+  default Map<String, String> toFlinkTableProperties(Map<String, String> 
gravitinoProperties) {
+    return gravitinoProperties;
+  }
+
+  /**
+   * Converts properties from Flink connector table properties to Gravitino 
table properties.
+   *
+   * @param flinkProperties The table properties provided by Flink.
+   * @return The table properties for the Gravitino.
+   */
+  default Map<String, String> toGravitinoTableProperties(Map<String, String> 
flinkProperties) {
+    return flinkProperties;
+  }
 }
diff --git 
a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/catalog/BaseCatalog.java
 
b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/catalog/BaseCatalog.java
index f98670b4c..ae6127c1c 100644
--- 
a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/catalog/BaseCatalog.java
+++ 
b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/catalog/BaseCatalog.java
@@ -20,19 +20,30 @@
 package com.datastrato.gravitino.flink.connector.catalog;
 
 import com.datastrato.gravitino.Catalog;
+import com.datastrato.gravitino.NameIdentifier;
+import com.datastrato.gravitino.Namespace;
 import com.datastrato.gravitino.Schema;
 import com.datastrato.gravitino.SchemaChange;
 import com.datastrato.gravitino.exceptions.NoSuchCatalogException;
 import com.datastrato.gravitino.exceptions.NoSuchSchemaException;
+import com.datastrato.gravitino.exceptions.NoSuchTableException;
 import com.datastrato.gravitino.exceptions.NonEmptySchemaException;
 import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException;
+import com.datastrato.gravitino.exceptions.TableAlreadyExistsException;
 import com.datastrato.gravitino.flink.connector.PropertiesConverter;
+import com.datastrato.gravitino.flink.connector.utils.TypeUtils;
+import com.datastrato.gravitino.rel.Column;
+import com.datastrato.gravitino.rel.Table;
+import com.datastrato.gravitino.rel.TableChange;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.MapDifference;
 import com.google.common.collect.Maps;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.commons.compress.utils.Lists;
 import org.apache.flink.table.catalog.AbstractCatalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
@@ -41,7 +52,9 @@ import org.apache.flink.table.catalog.CatalogDatabaseImpl;
 import org.apache.flink.table.catalog.CatalogFunction;
 import org.apache.flink.table.catalog.CatalogPartition;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
@@ -58,6 +71,7 @@ import 
org.apache.flink.table.catalog.exceptions.TablePartitionedException;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.types.DataType;
 
 /**
  * The BaseCatalog that provides a default implementation for all methods in 
the {@link
@@ -149,8 +163,17 @@ public abstract class BaseCatalog extends AbstractCatalog {
   }
 
   @Override
-  public List<String> listTables(String s) throws DatabaseNotExistException, 
CatalogException {
-    throw new UnsupportedOperationException();
+  public List<String> listTables(String databaseName)
+      throws DatabaseNotExistException, CatalogException {
+    try {
+      return 
Stream.of(catalog().asTableCatalog().listTables(Namespace.of(databaseName)))
+          .map(NameIdentifier::name)
+          .collect(Collectors.toList());
+    } catch (NoSuchSchemaException e) {
+      throw new DatabaseNotExistException(catalogName(), databaseName, e);
+    } catch (Exception e) {
+      throw new CatalogException(e);
+    }
   }
 
   @Override
@@ -159,32 +182,95 @@ public abstract class BaseCatalog extends AbstractCatalog 
{
   }
 
   @Override
-  public CatalogBaseTable getTable(ObjectPath objectPath)
+  public CatalogBaseTable getTable(ObjectPath tablePath)
       throws TableNotExistException, CatalogException {
-    throw new UnsupportedOperationException();
+    try {
+      Table table =
+          catalog()
+              .asTableCatalog()
+              .loadTable(NameIdentifier.of(tablePath.getDatabaseName(), 
tablePath.getObjectName()));
+      return toFlinkTable(table);
+    } catch (NoSuchTableException e) {
+      throw new TableNotExistException(catalogName(), tablePath, e);
+    } catch (Exception e) {
+      throw new CatalogException(e);
+    }
   }
 
   @Override
-  public boolean tableExists(ObjectPath objectPath) throws CatalogException {
-    throw new UnsupportedOperationException();
+  public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+    try {
+      return catalog()
+          .asTableCatalog()
+          .tableExists(NameIdentifier.of(tablePath.getDatabaseName(), 
tablePath.getObjectName()));
+    } catch (Exception e) {
+      throw new CatalogException(e);
+    }
   }
 
   @Override
-  public void dropTable(ObjectPath objectPath, boolean b)
+  public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
       throws TableNotExistException, CatalogException {
-    throw new UnsupportedOperationException();
+    boolean dropped =
+        catalog()
+            .asTableCatalog()
+            .dropTable(NameIdentifier.of(tablePath.getDatabaseName(), 
tablePath.getObjectName()));
+    if (!dropped && !ignoreIfNotExists) {
+      throw new TableNotExistException(catalogName(), tablePath);
+    }
   }
 
   @Override
-  public void renameTable(ObjectPath objectPath, String s, boolean b)
+  public void renameTable(ObjectPath tablePath, String newTableName, boolean 
ignoreIfNotExists)
       throws TableNotExistException, TableAlreadyExistException, 
CatalogException {
-    throw new UnsupportedOperationException();
+    NameIdentifier identifier =
+        NameIdentifier.of(Namespace.of(tablePath.getDatabaseName()), 
newTableName);
+
+    if (catalog().asTableCatalog().tableExists(identifier)) {
+      throw new TableAlreadyExistException(
+          catalogName(), ObjectPath.fromString(tablePath.getDatabaseName() + 
newTableName));
+    }
+
+    try {
+      catalog()
+          .asTableCatalog()
+          .alterTable(
+              NameIdentifier.of(tablePath.getDatabaseName(), 
tablePath.getObjectName()),
+              TableChange.rename(newTableName));
+    } catch (NoSuchTableException e) {
+      if (!ignoreIfNotExists) {
+        throw new TableNotExistException(catalogName(), tablePath, e);
+      }
+    } catch (Exception e) {
+      throw new CatalogException(e);
+    }
   }
 
   @Override
-  public void createTable(ObjectPath objectPath, CatalogBaseTable 
catalogBaseTable, boolean b)
+  public void createTable(ObjectPath tablePath, CatalogBaseTable table, 
boolean ignoreIfExists)
       throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
-    throw new UnsupportedOperationException();
+    NameIdentifier identifier =
+        NameIdentifier.of(tablePath.getDatabaseName(), 
tablePath.getObjectName());
+
+    ResolvedCatalogBaseTable<?> resolvedTable = (ResolvedCatalogBaseTable<?>) 
table;
+    Column[] columns =
+        resolvedTable.getResolvedSchema().getColumns().stream()
+            .map(this::toGravitinoColumn)
+            .toArray(Column[]::new);
+    String comment = table.getComment();
+    Map<String, String> properties =
+        propertiesConverter.toGravitinoTableProperties(table.getOptions());
+    try {
+      catalog().asTableCatalog().createTable(identifier, columns, comment, 
properties);
+    } catch (NoSuchSchemaException e) {
+      throw new DatabaseNotExistException(catalogName(), 
tablePath.getDatabaseName(), e);
+    } catch (TableAlreadyExistsException e) {
+      if (!ignoreIfExists) {
+        throw new TableAlreadyExistException(catalogName(), tablePath, e);
+      }
+    } catch (Exception e) {
+      throw new CatalogException(e);
+    }
   }
 
   @Override
@@ -351,6 +437,31 @@ public abstract class BaseCatalog extends AbstractCatalog {
 
   protected abstract PropertiesConverter getPropertiesConverter();
 
+  protected CatalogBaseTable toFlinkTable(Table table) {
+    org.apache.flink.table.api.Schema.Builder builder =
+        org.apache.flink.table.api.Schema.newBuilder();
+    for (Column column : table.columns()) {
+      DataType flinkType = TypeUtils.toFlinkType(column.dataType());
+      builder
+          .column(column.name(), column.nullable() ? flinkType.nullable() : 
flinkType.notNull())
+          .withComment(column.comment());
+    }
+    Map<String, String> flinkTableProperties =
+        propertiesConverter.toFlinkTableProperties(table.properties());
+    return CatalogTable.of(
+        builder.build(), table.comment(), ImmutableList.of(), 
flinkTableProperties);
+  }
+
+  private Column toGravitinoColumn(org.apache.flink.table.catalog.Column 
column) {
+    return Column.of(
+        column.getName(),
+        TypeUtils.toGravitinoType(column.getDataType().getLogicalType()),
+        column.getComment().orElse(null),
+        column.getDataType().getLogicalType().isNullable(),
+        false,
+        null);
+  }
+
   @VisibleForTesting
   static SchemaChange[] getSchemaChange(CatalogDatabase current, 
CatalogDatabase updated) {
     Map<String, String> currentProperties = current.getProperties();
diff --git 
a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/hive/GravitinoHiveCatalog.java
 
b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/hive/GravitinoHiveCatalog.java
index aae5fb6ff..857caad70 100644
--- 
a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/hive/GravitinoHiveCatalog.java
+++ 
b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/hive/GravitinoHiveCatalog.java
@@ -22,7 +22,12 @@ import 
com.datastrato.gravitino.flink.connector.PropertiesConverter;
 import com.datastrato.gravitino.flink.connector.catalog.BaseCatalog;
 import java.util.Optional;
 import javax.annotation.Nullable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.factories.Factory;
 import org.apache.hadoop.hive.conf.HiveConf;
 
@@ -43,6 +48,18 @@ public class GravitinoHiveCatalog extends BaseCatalog {
     this.hiveCatalog = new HiveCatalog(catalogName, defaultDatabase, hiveConf, 
hiveVersion);
   }
 
+  @Override
+  public void open() throws CatalogException {
+    super.open();
+    hiveCatalog.open();
+  }
+
+  @Override
+  public void close() throws CatalogException {
+    super.close();
+    hiveCatalog.close();
+  }
+
   public HiveConf getHiveConf() {
     return hiveCatalog.getHiveConf();
   }
@@ -56,4 +73,16 @@ public class GravitinoHiveCatalog extends BaseCatalog {
   protected PropertiesConverter getPropertiesConverter() {
     return HivePropertiesConverter.INSTANCE;
   }
+
+  @Override
+  public CatalogTableStatistics getTableStatistics(ObjectPath objectPath)
+      throws TableNotExistException, CatalogException {
+    return hiveCatalog.getTableStatistics(objectPath);
+  }
+
+  @Override
+  public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath)
+      throws TableNotExistException, CatalogException {
+    return hiveCatalog.getTableColumnStatistics(tablePath);
+  }
 }
diff --git 
a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/hive/HivePropertiesConverter.java
 
b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/hive/HivePropertiesConverter.java
index b780a5e99..9a2771ee3 100644
--- 
a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/hive/HivePropertiesConverter.java
+++ 
b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/hive/HivePropertiesConverter.java
@@ -20,10 +20,12 @@
 package com.datastrato.gravitino.flink.connector.hive;
 
 import com.datastrato.gravitino.catalog.hive.HiveCatalogPropertiesMeta;
+import com.datastrato.gravitino.catalog.hive.HiveTablePropertiesMetadata;
 import com.datastrato.gravitino.flink.connector.PropertiesConverter;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.catalog.CommonCatalogOptions;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -76,4 +78,25 @@ public class HivePropertiesConverter implements 
PropertiesConverter {
         });
     return flinkCatalogProperties;
   }
+
+  @Override
+  public Map<String, String> toFlinkTableProperties(Map<String, String> 
gravitinoProperties) {
+    Map<String, String> properties =
+        gravitinoProperties.entrySet().stream()
+            .collect(
+                Collectors.toMap(
+                    entry -> {
+                      String key = entry.getKey();
+                      if 
(key.startsWith(HiveTablePropertiesMetadata.SERDE_PARAMETER_PREFIX)) {
+                        return key.substring(
+                            
HiveTablePropertiesMetadata.SERDE_PARAMETER_PREFIX.length());
+                      } else {
+                        return key;
+                      }
+                    },
+                    Map.Entry::getValue,
+                    (existingValue, newValue) -> newValue));
+    properties.put("connector", "hive");
+    return properties;
+  }
 }
diff --git 
a/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/utils/TypeUtils.java
 
b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/utils/TypeUtils.java
new file mode 100644
index 000000000..d6907ebb7
--- /dev/null
+++ 
b/flink-connector/src/main/java/com/datastrato/gravitino/flink/connector/utils/TypeUtils.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datastrato.gravitino.flink.connector.utils;
+
+import com.datastrato.gravitino.rel.types.Type;
+import com.datastrato.gravitino.rel.types.Types;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+public class TypeUtils {
+
+  private TypeUtils() {}
+
+  public static Type toGravitinoType(LogicalType logicalType) {
+    switch (logicalType.getTypeRoot()) {
+      case VARCHAR:
+        return Types.StringType.get();
+      case DOUBLE:
+        return Types.DoubleType.get();
+      case INTEGER:
+        return Types.IntegerType.get();
+      default:
+        throw new UnsupportedOperationException(
+            "Not support type: " + logicalType.asSummaryString());
+    }
+  }
+
+  public static DataType toFlinkType(Type gravitinoType) {
+    switch (gravitinoType.name()) {
+      case DOUBLE:
+        return DataTypes.DOUBLE();
+      case STRING:
+        return DataTypes.STRING();
+      case INTEGER:
+        return DataTypes.INT();
+      default:
+        throw new UnsupportedOperationException("Not support " + 
gravitinoType.toString());
+    }
+  }
+}
diff --git 
a/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/hive/TestHivePropertiesConverter.java
 
b/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/hive/TestHivePropertiesConverter.java
index 397e1220c..807db5b3f 100644
--- 
a/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/hive/TestHivePropertiesConverter.java
+++ 
b/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/hive/TestHivePropertiesConverter.java
@@ -1,4 +1,5 @@
 /*
+ * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
diff --git 
a/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/FlinkCommonIT.java
 
b/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/FlinkCommonIT.java
index a304c106c..a59f29637 100644
--- 
a/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/FlinkCommonIT.java
+++ 
b/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/FlinkCommonIT.java
@@ -19,18 +19,31 @@
 
 package com.datastrato.gravitino.flink.connector.integration.test;
 
+import static 
com.datastrato.gravitino.flink.connector.integration.test.utils.TestUtils.assertColumns;
+import static 
com.datastrato.gravitino.flink.connector.integration.test.utils.TestUtils.toFlinkPhysicalColumn;
+import static 
com.datastrato.gravitino.rel.expressions.transforms.Transforms.EMPTY_TRANSFORM;
+
 import com.datastrato.gravitino.Catalog;
+import com.datastrato.gravitino.NameIdentifier;
 import com.datastrato.gravitino.Schema;
 import com.datastrato.gravitino.catalog.hive.HiveSchemaPropertiesMetadata;
 import 
com.datastrato.gravitino.flink.connector.integration.test.utils.TestUtils;
+import com.datastrato.gravitino.rel.Column;
+import com.datastrato.gravitino.rel.Table;
+import com.datastrato.gravitino.rel.types.Types;
+import com.google.common.collect.ImmutableMap;
+import java.util.Optional;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.ResultKind;
 import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.types.Row;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 
-@Tag("gravitino-docker-it")
 public abstract class FlinkCommonIT extends FlinkEnvIT {
 
   protected abstract Catalog currentCatalog();
@@ -163,4 +176,158 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
           }
         });
   }
+
+  @Test
+  public void testCreateSimpleTable() {
+    String databaseName = "test_create_no_partition_table_db";
+    String tableName = "test_create_no_partition_table";
+    String comment = "test comment";
+    String key = "test key";
+    String value = "test value";
+
+    doWithSchema(
+        currentCatalog(),
+        databaseName,
+        catalog -> {
+          TableResult result =
+              sql(
+                  "CREATE TABLE %s "
+                      + "(string_type STRING COMMENT 'string_type', "
+                      + " double_type DOUBLE COMMENT 'double_type')"
+                      + " COMMENT '%s' WITH ("
+                      + "'%s' = '%s')",
+                  tableName, comment, key, value);
+          TestUtils.assertTableResult(result, ResultKind.SUCCESS);
+
+          Table table =
+              
catalog.asTableCatalog().loadTable(NameIdentifier.of(databaseName, tableName));
+          Assertions.assertNotNull(table);
+          Assertions.assertEquals(comment, table.comment());
+          Assertions.assertEquals(value, table.properties().get(key));
+          Column[] columns =
+              new Column[] {
+                Column.of("string_type", Types.StringType.get(), 
"string_type", true, false, null),
+                Column.of("double_type", Types.DoubleType.get(), "double_type")
+              };
+          assertColumns(columns, table.columns());
+          Assertions.assertArrayEquals(EMPTY_TRANSFORM, table.partitioning());
+
+          TestUtils.assertTableResult(
+              sql("INSERT INTO %s VALUES ('A', 1.0), ('B', 2.0)", tableName),
+              ResultKind.SUCCESS_WITH_CONTENT,
+              Row.of(-1L));
+          TestUtils.assertTableResult(
+              sql("SELECT * FROM %s", tableName),
+              ResultKind.SUCCESS_WITH_CONTENT,
+              Row.of("A", 1.0),
+              Row.of("B", 2.0));
+        },
+        true);
+  }
+
+  @Test
+  public void testListTables() {
+    String newSchema = "test_list_table_catalog";
+    Column[] columns = new Column[] {Column.of("user_id", 
Types.IntegerType.get(), "USER_ID")};
+    doWithSchema(
+        currentCatalog(),
+        newSchema,
+        catalog -> {
+          catalog
+              .asTableCatalog()
+              .createTable(
+                  NameIdentifier.of(newSchema, "test_table1"),
+                  columns,
+                  "comment1",
+                  ImmutableMap.of());
+          catalog
+              .asTableCatalog()
+              .createTable(
+                  NameIdentifier.of(newSchema, "test_table2"),
+                  columns,
+                  "comment2",
+                  ImmutableMap.of());
+          TableResult result = sql("SHOW TABLES");
+          TestUtils.assertTableResult(
+              result,
+              ResultKind.SUCCESS_WITH_CONTENT,
+              Row.of("test_table1"),
+              Row.of("test_table2"));
+        },
+        true);
+  }
+
+  @Test
+  public void testDropTable() {
+    String databaseName = "test_drop_table_db";
+    doWithSchema(
+        currentCatalog(),
+        databaseName,
+        catalog -> {
+          String tableName = "test_drop_table";
+          Column[] columns =
+              new Column[] {Column.of("user_id", Types.IntegerType.get(), 
"USER_ID")};
+          NameIdentifier identifier = NameIdentifier.of(databaseName, 
tableName);
+          catalog.asTableCatalog().createTable(identifier, columns, 
"comment1", ImmutableMap.of());
+          
Assertions.assertTrue(catalog.asTableCatalog().tableExists(identifier));
+
+          TableResult result = sql("DROP TABLE %s", tableName);
+          TestUtils.assertTableResult(result, ResultKind.SUCCESS);
+          
Assertions.assertFalse(catalog.asTableCatalog().tableExists(identifier));
+        },
+        true);
+  }
+
+  @Test
+  public void testGetSimpleTable() {
+    String databaseName = "test_get_simple_table";
+    Column[] columns =
+        new Column[] {
+          Column.of("string_type", Types.StringType.get(), "string_type", 
true, false, null),
+          Column.of("double_type", Types.DoubleType.get(), "double_type")
+        };
+
+    doWithSchema(
+        currentCatalog(),
+        databaseName,
+        catalog -> {
+          String tableName = "test_desc_table";
+          String comment = "comment1";
+          catalog
+              .asTableCatalog()
+              .createTable(
+                  NameIdentifier.of(databaseName, "test_desc_table"),
+                  columns,
+                  comment,
+                  ImmutableMap.of("k1", "v1"));
+
+          Optional<org.apache.flink.table.catalog.Catalog> flinkCatalog =
+              tableEnv.getCatalog(catalog.name());
+          Assertions.assertTrue(flinkCatalog.isPresent());
+          try {
+            CatalogBaseTable table =
+                flinkCatalog.get().getTable(new ObjectPath(databaseName, 
tableName));
+            Assertions.assertNotNull(table);
+            Assertions.assertEquals(CatalogBaseTable.TableKind.TABLE, 
table.getTableKind());
+            Assertions.assertEquals(comment, table.getComment());
+
+            org.apache.flink.table.catalog.Column[] expected =
+                new org.apache.flink.table.catalog.Column[] {
+                  
org.apache.flink.table.catalog.Column.physical("string_type", 
DataTypes.STRING())
+                      .withComment("string_type"),
+                  
org.apache.flink.table.catalog.Column.physical("double_type", 
DataTypes.DOUBLE())
+                      .withComment("double_type")
+                };
+            org.apache.flink.table.catalog.Column[] actual =
+                
toFlinkPhysicalColumn(table.getUnresolvedSchema().getColumns());
+            Assertions.assertArrayEquals(expected, actual);
+
+            CatalogTable catalogTable = (CatalogTable) table;
+            Assertions.assertFalse(catalogTable.isPartitioned());
+          } catch (TableNotExistException e) {
+            Assertions.fail(e);
+          }
+        },
+        true);
+  }
 }
diff --git 
a/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/FlinkEnvIT.java
 
b/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/FlinkEnvIT.java
index 13ef56660..fc574ffae 100644
--- 
a/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/FlinkEnvIT.java
+++ 
b/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/FlinkEnvIT.java
@@ -1,4 +1,5 @@
 /*
+ * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
@@ -25,12 +26,14 @@ import 
com.datastrato.gravitino.integration.test.container.ContainerSuite;
 import com.datastrato.gravitino.integration.test.container.HiveContainer;
 import com.datastrato.gravitino.integration.test.util.AbstractIT;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
 import com.google.errorprone.annotations.FormatMethod;
 import com.google.errorprone.annotations.FormatString;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.function.Consumer;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
@@ -125,7 +128,9 @@ public abstract class FlinkEnvIT extends AbstractIT {
         "table.catalog-store.kind", 
GravitinoCatalogStoreFactoryOptions.GRAVITINO);
     
configuration.setString("table.catalog-store.gravitino.gravitino.metalake", 
GRAVITINO_METALAKE);
     configuration.setString("table.catalog-store.gravitino.gravitino.uri", 
gravitinoUri);
-    tableEnv = TableEnvironment.create(configuration);
+    EnvironmentSettings.Builder builder =
+        EnvironmentSettings.newInstance().withConfiguration(configuration);
+    tableEnv = TableEnvironment.create(builder.inBatchMode().build());
   }
 
   private static void stopHdfsEnv() {
@@ -154,6 +159,27 @@ public abstract class FlinkEnvIT extends AbstractIT {
     return tableEnv.executeSql(String.format(sql, args));
   }
 
+  protected static void doWithSchema(
+      Catalog catalog, String schemaName, Consumer<Catalog> action, boolean 
dropSchema) {
+    Preconditions.checkNotNull(catalog);
+    Preconditions.checkNotNull(schemaName);
+    try {
+      tableEnv.useCatalog(catalog.name());
+      if (!catalog.asSchemas().schemaExists(schemaName)) {
+        catalog
+            .asSchemas()
+            .createSchema(
+                schemaName, null, ImmutableMap.of("location", warehouse + "/" 
+ schemaName));
+      }
+      tableEnv.useDatabase(schemaName);
+      action.accept(catalog);
+    } finally {
+      if (dropSchema) {
+        catalog.asSchemas().dropSchema(schemaName, true);
+      }
+    }
+  }
+
   protected static void doWithCatalog(Catalog catalog, Consumer<Catalog> 
action) {
     Preconditions.checkNotNull(catalog);
     tableEnv.useCatalog(catalog.name());
diff --git 
a/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
 
b/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
index b85c0d3f6..4db72cc9e 100644
--- 
a/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
+++ 
b/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
@@ -1,4 +1,5 @@
 /*
+ * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
@@ -23,6 +24,7 @@ import 
com.datastrato.gravitino.flink.connector.PropertiesConverter;
 import com.datastrato.gravitino.flink.connector.hive.GravitinoHiveCatalog;
 import 
com.datastrato.gravitino.flink.connector.hive.GravitinoHiveCatalogFactoryOptions;
 import com.datastrato.gravitino.flink.connector.integration.test.FlinkCommonIT;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import java.util.Arrays;
@@ -42,28 +44,37 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
 
 @Tag("gravitino-docker-test")
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class FlinkHiveCatalogIT extends FlinkCommonIT {
+  private static final String DEFAULT_HIVE_CATALOG = 
"test_flink_hive_schema_catalog";
 
   private static com.datastrato.gravitino.Catalog hiveCatalog;
 
   @BeforeAll
   static void hiveStartUp() {
+    initDefaultHiveCatalog();
+  }
+
+  @AfterAll
+  static void hiveStop() {
+    Preconditions.checkNotNull(metalake);
+    metalake.dropCatalog(DEFAULT_HIVE_CATALOG);
+  }
+
+  protected static void initDefaultHiveCatalog() {
+    Preconditions.checkNotNull(metalake);
     hiveCatalog =
         metalake.createCatalog(
-            "test_flink_hive_schema_catalog",
+            DEFAULT_HIVE_CATALOG,
             com.datastrato.gravitino.Catalog.Type.RELATIONAL,
             "hive",
             null,
             ImmutableMap.of("metastore.uris", hiveMetastoreUri));
   }
 
-  @AfterAll
-  static void hiveStop() {
-    metalake.dropCatalog("test_flink_hive_schema_catalog");
-  }
-
   @Test
   public void testCreateGravitinoHiveCatalog() {
     tableEnv.useCatalog(DEFAULT_CATALOG);
@@ -75,8 +86,7 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
     configuration.set(
         CommonCatalogOptions.CATALOG_TYPE, 
GravitinoHiveCatalogFactoryOptions.IDENTIFIER);
     configuration.set(HiveCatalogFactoryOptions.HIVE_CONF_DIR, 
"src/test/resources/flink-tests");
-    configuration.set(
-        GravitinoHiveCatalogFactoryOptions.HIVE_METASTORE_URIS, 
"thrift://127.0.0.1:9084");
+    configuration.set(GravitinoHiveCatalogFactoryOptions.HIVE_METASTORE_URIS, 
hiveMetastoreUri);
     CatalogDescriptor catalogDescriptor = CatalogDescriptor.of(catalogName, 
configuration);
     tableEnv.createCatalog(catalogName, catalogDescriptor);
     Assertions.assertTrue(metalake.catalogExists(catalogName));
@@ -84,7 +94,7 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
     // Check the catalog properties.
     com.datastrato.gravitino.Catalog gravitinoCatalog = 
metalake.loadCatalog(catalogName);
     Map<String, String> properties = gravitinoCatalog.properties();
-    Assertions.assertEquals("thrift://127.0.0.1:9084", 
properties.get(METASTORE_URIS));
+    Assertions.assertEquals(hiveMetastoreUri, properties.get(METASTORE_URIS));
     Map<String, String> flinkProperties =
         gravitinoCatalog.properties().entrySet().stream()
             .filter(e -> 
e.getKey().startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX))
@@ -141,16 +151,16 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
             "create catalog %s with ("
                 + "'type'='gravitino-hive', "
                 + "'hive-conf-dir'='src/test/resources/flink-tests',"
-                + "'hive.metastore.uris'='thrift://127.0.0.1:9084',"
+                + "'hive.metastore.uris'='%s',"
                 + "'unknown.key'='unknown.value'"
                 + ")",
-            catalogName));
+            catalogName, hiveMetastoreUri));
     Assertions.assertTrue(metalake.catalogExists(catalogName));
 
     // Check the properties of the created catalog.
     com.datastrato.gravitino.Catalog gravitinoCatalog = 
metalake.loadCatalog(catalogName);
     Map<String, String> properties = gravitinoCatalog.properties();
-    Assertions.assertEquals("thrift://127.0.0.1:9084", 
properties.get(METASTORE_URIS));
+    Assertions.assertEquals(hiveMetastoreUri, properties.get(METASTORE_URIS));
     Map<String, String> flinkProperties =
         properties.entrySet().stream()
             .filter(e -> 
e.getKey().startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX))
@@ -245,7 +255,7 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
                 "flink.bypass.hive.test",
                 "hive.config",
                 "metastore.uris",
-                "thrift://127.0.0.1:9084"));
+                hiveMetastoreUri));
     Assertions.assertNotNull(gravitinoCatalog);
     Assertions.assertEquals(catalogName, gravitinoCatalog.name());
     Assertions.assertTrue(metalake.catalogExists(catalogName));
@@ -261,7 +271,7 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
     Assertions.assertTrue(hiveConf.size() > 0, "Should have hive conf");
     Assertions.assertEquals("hive.config", hiveConf.get("hive.test"));
     Assertions.assertEquals(
-        "thrift://127.0.0.1:9084", 
hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname));
+        hiveMetastoreUri, 
hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname));
 
     // drop the catalog.
     tableEnv.useCatalog(DEFAULT_CATALOG);
diff --git 
a/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/utils/TestUtils.java
 
b/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/utils/TestUtils.java
index 08a2f6b1c..e08b1cd7c 100644
--- 
a/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/utils/TestUtils.java
+++ 
b/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/integration/test/utils/TestUtils.java
@@ -1,4 +1,5 @@
 /*
+ * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
@@ -17,10 +18,13 @@
  */
 package com.datastrato.gravitino.flink.connector.integration.test.utils;
 
+import com.datastrato.gravitino.rel.Column;
 import com.google.common.collect.Lists;
 import java.util.List;
 import org.apache.flink.table.api.ResultKind;
+import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.Row;
 import org.junit.jupiter.api.Assertions;
 
@@ -42,4 +46,32 @@ public class TestUtils {
       }
     }
   }
+
+  public static void assertColumns(Column[] expected, Column[] actual) {
+    Assertions.assertEquals(expected.length, actual.length);
+    for (int i = 0; i < expected.length; i++) {
+      Assertions.assertEquals(expected[i].name(), actual[i].name());
+      Assertions.assertEquals(expected[i].comment(), actual[i].comment());
+      Assertions.assertEquals(
+          expected[i].dataType().simpleString(), 
actual[i].dataType().simpleString());
+      Assertions.assertEquals(expected[i].defaultValue(), 
actual[i].defaultValue());
+      Assertions.assertEquals(expected[i].autoIncrement(), 
actual[i].autoIncrement());
+      Assertions.assertEquals(expected[i].nullable(), actual[i].nullable());
+    }
+  }
+
+  public static org.apache.flink.table.catalog.Column[] toFlinkPhysicalColumn(
+      List<Schema.UnresolvedColumn> unresolvedPhysicalColumns) {
+    return unresolvedPhysicalColumns.stream()
+        .map(
+            column -> {
+              Schema.UnresolvedPhysicalColumn unresolvedPhysicalColumn =
+                  (Schema.UnresolvedPhysicalColumn) column;
+              return org.apache.flink.table.catalog.Column.physical(
+                      unresolvedPhysicalColumn.getName(),
+                      (DataType) unresolvedPhysicalColumn.getDataType())
+                  
.withComment(unresolvedPhysicalColumn.getComment().orElse(null));
+            })
+        .toArray(org.apache.flink.table.catalog.Column[]::new);
+  }
 }
diff --git 
a/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/utils/TestTypeUtils.java
 
b/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/utils/TestTypeUtils.java
new file mode 100644
index 000000000..1a381d232
--- /dev/null
+++ 
b/flink-connector/src/test/java/com/datastrato/gravitino/flink/connector/utils/TestTypeUtils.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datastrato.gravitino.flink.connector.utils;
+
+import com.datastrato.gravitino.rel.types.Types;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.UnresolvedUserDefinedType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestTypeUtils {
+
+  @Test
+  public void testToGravitinoType() {
+    Assertions.assertEquals(
+        Types.StringType.get(), TypeUtils.toGravitinoType(new 
VarCharType(Integer.MAX_VALUE)));
+    Assertions.assertEquals(Types.DoubleType.get(), 
TypeUtils.toGravitinoType(new DoubleType()));
+    Assertions.assertEquals(Types.IntegerType.get(), 
TypeUtils.toGravitinoType(new IntType()));
+    Assertions.assertThrows(
+        UnsupportedOperationException.class,
+        () ->
+            TypeUtils.toGravitinoType(
+                new UnresolvedUserDefinedType(UnresolvedIdentifier.of("a", 
"b", "c"))));
+  }
+
+  @Test
+  public void testToFlinkType() {
+    Assertions.assertEquals(DataTypes.DOUBLE(), 
TypeUtils.toFlinkType(Types.DoubleType.get()));
+    Assertions.assertEquals(DataTypes.STRING(), 
TypeUtils.toFlinkType(Types.StringType.get()));
+    Assertions.assertEquals(DataTypes.INT(), 
TypeUtils.toFlinkType(Types.IntegerType.get()));
+    Assertions.assertThrows(
+        UnsupportedOperationException.class,
+        () -> TypeUtils.toFlinkType(Types.UnparsedType.of("unknown")));
+  }
+}


Reply via email to