danny0405 commented on a change in pull request #4153:
URL: https://github.com/apache/hudi/pull/4153#discussion_r761809811



##########
File path: 
hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.catalog;
+
+import org.apache.hudi.configuration.FlinkOptions;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH;
+import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DATABASE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link HoodieCatalog}.
+ */
+public class TestHoodieCatalog {
+
+  private static final String TEST_DEFAULT_DATABASE = "test_db";
+  private static final String NONE_EXIST_DATABASE = "none_exist_database";
+  private final List<Column> createColumns = Arrays.asList(
+      Column.physical("uuid", DataTypes.VARCHAR(20)),
+      Column.physical("name", DataTypes.VARCHAR(20)),
+      Column.physical("age", DataTypes.INT()),
+      Column.physical("tss", DataTypes.TIMESTAMP(3)),
+      Column.physical("partition", DataTypes.VARCHAR(10))
+  );
+  private final UniqueConstraint constraints = 
UniqueConstraint.primaryKey("uuid", Arrays.asList("uuid"));
+  private final ResolvedSchema createTableSchema =
+      new ResolvedSchema(
+          createColumns,
+          Collections.emptyList(),
+          constraints);
+
+  private final List<Column> expectedTableColumns =
+      createColumns.stream()
+          .map(
+              col -> {
+                // Flink char/varchar is transform to string in avro.
+                if (col.getDataType()
+                    .getLogicalType()
+                    .getTypeRoot()
+                    .equals(LogicalTypeRoot.VARCHAR)) {
+                  return Column.physical(col.getName(), DataTypes.STRING());
+                } else {
+                  return col;
+                }
+              })
+          .collect(Collectors.toList());
+  private final ResolvedSchema expectedTableSchema =
+      new ResolvedSchema(expectedTableColumns, Collections.emptyList(), 
constraints);
+
+  private TableEnvironment streamTableEnv;
+  private HoodieCatalog catalog;
+
+  @TempDir
+  File tempFile;
+
+  @BeforeEach
+  void beforeEach() {
+    EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
+    streamTableEnv = TableEnvironmentImpl.create(settings);
+    streamTableEnv.getConfig().getConfiguration()
+        
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
+    File db1 = new File(tempFile, TEST_DEFAULT_DATABASE);
+    db1.mkdir();
+    Map<String, String> catalogOptions = new HashMap<>();
+    catalogOptions.put(CATALOG_PATH.key(), tempFile.getAbsolutePath());
+    catalogOptions.put(DEFAULT_DATABASE.key(), TEST_DEFAULT_DATABASE);
+    catalog = new HoodieCatalog("hudi", Configuration.fromMap(catalogOptions));
+    catalog.open();
+  }
+
+  @Test
+  public void testListDatabases() {
+    List<String> actual = catalog.listDatabases();
+    assertTrue(actual.contains(TEST_DEFAULT_DATABASE));
+    assertFalse(actual.contains(NONE_EXIST_DATABASE));
+  }
+
+  @Test
+  public void testDatabaseExists() {
+    assertTrue(catalog.databaseExists(TEST_DEFAULT_DATABASE));
+    assertFalse(catalog.databaseExists(NONE_EXIST_DATABASE));
+  }
+
+  @Test
+  public void testCreateAndDropDatabase() throws Exception {
+    CatalogDatabase expected = new CatalogDatabaseImpl(Collections.emptyMap(), 
null);
+    catalog.createDatabase("db1", expected, true);
+
+    CatalogDatabase actual = catalog.getDatabase("db1");
+    assertTrue(catalog.listDatabases().contains("db1"));
+    assertEquals(expected.getProperties(), actual.getProperties());
+
+    catalog.dropDatabase("db1", true);
+    assertFalse(catalog.listDatabases().contains("db1"));

Review comment:
       add a test case: drop non-exists database.

##########
File path: 
hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.catalog;
+
+import org.apache.hudi.configuration.FlinkOptions;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH;
+import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DATABASE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link HoodieCatalog}.
+ */
+public class TestHoodieCatalog {
+
+  private static final String TEST_DEFAULT_DATABASE = "test_db";
+  private static final String NONE_EXIST_DATABASE = "none_exist_database";
+  private final List<Column> createColumns = Arrays.asList(
+      Column.physical("uuid", DataTypes.VARCHAR(20)),
+      Column.physical("name", DataTypes.VARCHAR(20)),
+      Column.physical("age", DataTypes.INT()),
+      Column.physical("tss", DataTypes.TIMESTAMP(3)),
+      Column.physical("partition", DataTypes.VARCHAR(10))
+  );
+  private final UniqueConstraint constraints = 
UniqueConstraint.primaryKey("uuid", Arrays.asList("uuid"));
+  private final ResolvedSchema createTableSchema =
+      new ResolvedSchema(
+          createColumns,
+          Collections.emptyList(),
+          constraints);
+
+  private final List<Column> expectedTableColumns =
+      createColumns.stream()
+          .map(
+              col -> {
+                // Flink char/varchar is transform to string in avro.
+                if (col.getDataType()
+                    .getLogicalType()
+                    .getTypeRoot()
+                    .equals(LogicalTypeRoot.VARCHAR)) {
+                  return Column.physical(col.getName(), DataTypes.STRING());
+                } else {
+                  return col;
+                }
+              })
+          .collect(Collectors.toList());
+  private final ResolvedSchema expectedTableSchema =
+      new ResolvedSchema(expectedTableColumns, Collections.emptyList(), 
constraints);
+
+  private TableEnvironment streamTableEnv;
+  private HoodieCatalog catalog;
+
+  @TempDir
+  File tempFile;
+
+  @BeforeEach
+  void beforeEach() {
+    EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
+    streamTableEnv = TableEnvironmentImpl.create(settings);
+    streamTableEnv.getConfig().getConfiguration()
+        
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
+    File db1 = new File(tempFile, TEST_DEFAULT_DATABASE);
+    db1.mkdir();

Review comment:
       db1 => testDb

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
##########
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.catalog;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.avro.Schema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH;
+import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DATABASE;
+
+/**
+ * Catalog that can set up common options for underneath table.
+ */
+public class HoodieCatalog extends AbstractCatalog {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieCatalog.class);
+
+  private final org.apache.hadoop.conf.Configuration hadoopConf;
+  private final String catalogPathStr;
+  private final Map<String, String> tableCommonOptions;
+
+  private Path catalogPath;
+  private FileSystem fs;
+
+  public HoodieCatalog(String name, Configuration options) {
+    super(name, options.get(DEFAULT_DATABASE));
+    this.catalogPathStr = options.get(CATALOG_PATH);
+    this.hadoopConf = StreamerUtil.getHadoopConf();
+    this.tableCommonOptions = CatalogOptions.tableCommonOptions(options);
+  }
+
+  @Override
+  public void open() throws CatalogException {
+    fs = FSUtils.getFs(catalogPathStr, hadoopConf);
+    catalogPath = new Path(catalogPathStr);
+    try {
+      if (!fs.exists(catalogPath)) {
+        throw new CatalogException(String.format("Catalog %s path %s is not 
exists.", getName(), catalogPathStr));
+      }

Review comment:
       `is not exists` => `does not exist`

##########
File path: 
hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.catalog;
+
+import org.apache.hudi.configuration.FlinkOptions;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH;
+import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DATABASE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link HoodieCatalog}.
+ */
+public class TestHoodieCatalog {
+
+  private static final String TEST_DEFAULT_DATABASE = "test_db";
+  private static final String NONE_EXIST_DATABASE = "none_exist_database";
+  private final List<Column> createColumns = Arrays.asList(
+      Column.physical("uuid", DataTypes.VARCHAR(20)),
+      Column.physical("name", DataTypes.VARCHAR(20)),
+      Column.physical("age", DataTypes.INT()),
+      Column.physical("tss", DataTypes.TIMESTAMP(3)),
+      Column.physical("partition", DataTypes.VARCHAR(10))
+  );
+  private final UniqueConstraint constraints = 
UniqueConstraint.primaryKey("uuid", Arrays.asList("uuid"));
+  private final ResolvedSchema createTableSchema =
+      new ResolvedSchema(
+          createColumns,
+          Collections.emptyList(),
+          constraints);
+
+  private final List<Column> expectedTableColumns =
+      createColumns.stream()
+          .map(
+              col -> {
+                // Flink char/varchar is transform to string in avro.
+                if (col.getDataType()
+                    .getLogicalType()
+                    .getTypeRoot()
+                    .equals(LogicalTypeRoot.VARCHAR)) {
+                  return Column.physical(col.getName(), DataTypes.STRING());
+                } else {
+                  return col;
+                }
+              })
+          .collect(Collectors.toList());
+  private final ResolvedSchema expectedTableSchema =
+      new ResolvedSchema(expectedTableColumns, Collections.emptyList(), 
constraints);
+
+  private TableEnvironment streamTableEnv;
+  private HoodieCatalog catalog;
+
+  @TempDir
+  File tempFile;
+
+  @BeforeEach
+  void beforeEach() {
+    EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
+    streamTableEnv = TableEnvironmentImpl.create(settings);
+    streamTableEnv.getConfig().getConfiguration()
+        
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
+    File db1 = new File(tempFile, TEST_DEFAULT_DATABASE);
+    db1.mkdir();
+    Map<String, String> catalogOptions = new HashMap<>();
+    catalogOptions.put(CATALOG_PATH.key(), tempFile.getAbsolutePath());
+    catalogOptions.put(DEFAULT_DATABASE.key(), TEST_DEFAULT_DATABASE);
+    catalog = new HoodieCatalog("hudi", Configuration.fromMap(catalogOptions));
+    catalog.open();
+  }
+
+  @Test
+  public void testListDatabases() {
+    List<String> actual = catalog.listDatabases();
+    assertTrue(actual.contains(TEST_DEFAULT_DATABASE));
+    assertFalse(actual.contains(NONE_EXIST_DATABASE));
+  }
+
+  @Test
+  public void testDatabaseExists() {
+    assertTrue(catalog.databaseExists(TEST_DEFAULT_DATABASE));
+    assertFalse(catalog.databaseExists(NONE_EXIST_DATABASE));
+  }
+
+  @Test
+  public void testCreateAndDropDatabase() throws Exception {
+    CatalogDatabase expected = new CatalogDatabaseImpl(Collections.emptyMap(), 
null);
+    catalog.createDatabase("db1", expected, true);
+
+    CatalogDatabase actual = catalog.getDatabase("db1");
+    assertTrue(catalog.listDatabases().contains("db1"));
+    assertEquals(expected.getProperties(), actual.getProperties());
+
+    catalog.dropDatabase("db1", true);
+    assertFalse(catalog.listDatabases().contains("db1"));
+  }
+
+  @Test
+  public void testCreateDatabaseWithOptions() {
+    Map<String, String> options = new HashMap<>();
+    options.put("k1", "v1");
+    options.put("k2", "v2");
+
+    assertThrows(
+        CatalogException.class,
+        () -> catalog.createDatabase("db1", new CatalogDatabaseImpl(options, 
null), true),
+        "Hudi catalog doesn't support to create database with options."
+    );
+  }
+
+  @Test
+  public void testTableRelatedMethod() throws Exception {
+    Map<String, String> expectedOptions = new HashMap<>();

Review comment:
       We better split it into specific tests.

##########
File path: 
hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.catalog;
+
+import org.apache.hudi.configuration.FlinkOptions;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH;
+import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DATABASE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link HoodieCatalog}.
+ */
+public class TestHoodieCatalog {
+
+  private static final String TEST_DEFAULT_DATABASE = "test_db";
+  private static final String NONE_EXIST_DATABASE = "none_exist_database";
+  private final List<Column> createColumns = Arrays.asList(
+      Column.physical("uuid", DataTypes.VARCHAR(20)),
+      Column.physical("name", DataTypes.VARCHAR(20)),
+      Column.physical("age", DataTypes.INT()),
+      Column.physical("tss", DataTypes.TIMESTAMP(3)),
+      Column.physical("partition", DataTypes.VARCHAR(10))
+  );
+  private final UniqueConstraint constraints = 
UniqueConstraint.primaryKey("uuid", Arrays.asList("uuid"));
+  private final ResolvedSchema createTableSchema =
+      new ResolvedSchema(
+          createColumns,
+          Collections.emptyList(),
+          constraints);
+
+  private final List<Column> expectedTableColumns =
+      createColumns.stream()
+          .map(
+              col -> {
+                // Flink char/varchar is transform to string in avro.
+                if (col.getDataType()
+                    .getLogicalType()
+                    .getTypeRoot()
+                    .equals(LogicalTypeRoot.VARCHAR)) {
+                  return Column.physical(col.getName(), DataTypes.STRING());
+                } else {
+                  return col;
+                }
+              })
+          .collect(Collectors.toList());
+  private final ResolvedSchema expectedTableSchema =
+      new ResolvedSchema(expectedTableColumns, Collections.emptyList(), 
constraints);

Review comment:
       Can all these final variables switch to static members ?

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
##########
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.catalog;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.avro.Schema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH;
+import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DATABASE;
+
+/**
+ * Catalog that can set up common options for underneath table.
+ */
+public class HoodieCatalog extends AbstractCatalog {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieCatalog.class);
+
+  private final org.apache.hadoop.conf.Configuration hadoopConf;
+  private final String catalogPathStr;
+  private final Map<String, String> tableCommonOptions;
+
+  private Path catalogPath;
+  private FileSystem fs;
+
+  public HoodieCatalog(String name, Configuration options) {
+    super(name, options.get(DEFAULT_DATABASE));
+    this.catalogPathStr = options.get(CATALOG_PATH);
+    this.hadoopConf = StreamerUtil.getHadoopConf();
+    this.tableCommonOptions = CatalogOptions.tableCommonOptions(options);
+  }
+
+  @Override
+  public void open() throws CatalogException {
+    fs = FSUtils.getFs(catalogPathStr, hadoopConf);
+    catalogPath = new Path(catalogPathStr);
+    try {
+      if (!fs.exists(catalogPath)) {
+        throw new CatalogException(String.format("Catalog %s path %s is not 
exists.", getName(), catalogPathStr));
+      }
+    } catch (IOException e) {
+      throw new CatalogException(String.format("Check catalog path %s exists 
occur error.", catalogPathStr), e);
+    }
+  }
+
+  @Override
+  public void close() throws CatalogException {
+    try {
+      fs.close();
+    } catch (IOException e) {
+      throw new CatalogException("Close FileSystem occur error.", e);
+    }
+  }
+
+  // ------ databases ------
+
+  @Override
+  public List<String> listDatabases() throws CatalogException {
+    try {
+      FileStatus[] fileStatuses = fs.listStatus(catalogPath);
+      return Arrays.stream(fileStatuses)
+          .filter(FileStatus::isDirectory)
+          .map(fileStatus -> fileStatus.getPath().getName())
+          .collect(Collectors.toList());
+    } catch (IOException e) {
+      throw new CatalogException("List databases occur error.", e);
+    }
+  }
+
+  @Override
+  public CatalogDatabase getDatabase(String databaseName) throws 
DatabaseNotExistException, CatalogException {
+    if (databaseExists(databaseName)) {
+      return new CatalogDatabaseImpl(Collections.emptyMap(), null);
+    } else {
+      throw new DatabaseNotExistException(getName(), databaseName);
+    }
+  }
+
+  @Override
+  public boolean databaseExists(String databaseName) throws CatalogException {
+    checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+
+    return listDatabases().contains(databaseName);
+  }
+
+  @Override
+  public void createDatabase(String databaseName, CatalogDatabase 
catalogDatabase, boolean ignoreIfExists)
+      throws DatabaseAlreadyExistException, CatalogException {
+    if (databaseExists(databaseName)) {
+      if (ignoreIfExists) {
+        return;
+      } else {
+        throw new DatabaseAlreadyExistException(getName(), databaseName);
+      }
+    }
+
+    if (!CollectionUtil.isNullOrEmpty(catalogDatabase.getProperties())) {
+      throw new CatalogException("Hudi catalog doesn't support to create 
database with options.");
+    }
+
+    Path dbPath = new Path(catalogPath, databaseName);
+    try {
+      fs.mkdirs(dbPath);
+    } catch (IOException e) {
+      throw new CatalogException(String.format("Create database %s occur 
exception.", databaseName), e);
+    }
+  }
+
+  @Override
+  public void dropDatabase(String databaseName, boolean ignoreIfNotExists, 
boolean cascade)
+      throws DatabaseNotExistException, DatabaseNotEmptyException, 
CatalogException {
+    if (!databaseExists(databaseName)) {
+      if (ignoreIfNotExists) {
+        return;
+      } else {
+        throw new DatabaseNotExistException(getName(), databaseName);
+      }
+    }
+
+    List<String> tables = listTables(databaseName);
+    if (!tables.isEmpty() && !cascade) {
+      throw new DatabaseNotEmptyException(getName(), databaseName);
+    }
+
+    if (databaseName.equals(getDefaultDatabase())) {
+      throw new IllegalArgumentException(
+          "Hudi catalog doesn't support to drop the default database.");
+    }
+
+    Path dbPath = new Path(catalogPath, databaseName);
+    try {
+      fs.delete(dbPath, true);
+    } catch (IOException e) {
+      throw new CatalogException(String.format("Drop database %s occur 
exception.", databaseName), e);
+    }
+  }
+
+  @Override
+  public void alterDatabase(String databaseName, CatalogDatabase 
catalogDatabase, boolean ignoreIfNotExists)
+      throws DatabaseNotExistException, CatalogException {
+    throw new UnsupportedOperationException("alterDatabase is not 
implemented.");
+  }

Review comment:
       "Altering database is not supported yet"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to