This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8930e8ced1b [HUDI-7270] Support schema evolution by Flink SQL using 
HoodieCatalog (#10494)
8930e8ced1b is described below

commit 8930e8ced1be7cfa379b2febfda563b96b212a5c
Author: Jing Zhang <[email protected]>
AuthorDate: Thu Jan 18 11:19:16 2024 +0800

    [HUDI-7270] Support schema evolution by Flink SQL using HoodieCatalog 
(#10494)
---
 .../apache/hudi/table/catalog/HoodieCatalog.java   |  42 ++++---
 .../hudi/table/catalog/HoodieCatalogUtil.java      | 129 +++++++++++++++++++++
 .../hudi/table/catalog/HoodieHiveCatalog.java      |  85 +-------------
 .../hudi/table/catalog/TableOptionProperties.java  |  22 +++-
 .../hudi/table/ITTestSchemaEvolutionBySQL.java     |  37 +++---
 .../ITTestSchemaEvolutionBySQLWithDFSCatalog.java  |  55 +++++++++
 .../ITTestSchemaEvolutionBySQLWithHMSCatalog.java  |  35 ++++++
 7 files changed, 291 insertions(+), 114 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
index 2b8703a00b9..754fe05a6fb 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
@@ -25,14 +25,12 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
 import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.DataTypeUtils;
-import org.apache.hudi.util.FlinkWriteClients;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.avro.Schema;
@@ -392,9 +390,15 @@ public class HoodieCatalog extends AbstractCatalog {
   }
 
   @Override
-  public void alterTable(ObjectPath tablePath, CatalogBaseTable 
catalogBaseTable, boolean ignoreIfNotExists)
+  public void alterTable(
+      ObjectPath tablePath, CatalogBaseTable newCatalogTable, boolean 
ignoreIfNotExists)
       throws TableNotExistException, CatalogException {
-    throw new UnsupportedOperationException("alterTable is not implemented.");
+    HoodieCatalogUtil.alterTable(this, tablePath, newCatalogTable, 
Collections.emptyList(), ignoreIfNotExists, hadoopConf, this::inferTablePath, 
this::refreshTableProperties);
+  }
+
+  public void alterTable(ObjectPath tablePath, CatalogBaseTable 
newCatalogTable, List tableChanges,
+      boolean ignoreIfNotExists) throws TableNotExistException, 
CatalogException {
+    HoodieCatalogUtil.alterTable(this, tablePath, newCatalogTable, 
tableChanges, ignoreIfNotExists, hadoopConf, this::inferTablePath, 
this::refreshTableProperties);
   }
 
   @Override
@@ -470,9 +474,7 @@ public class HoodieCatalog extends AbstractCatalog {
       }
     }
 
-    // enable auto-commit though ~
-    options.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), "true");
-    try (HoodieFlinkWriteClient<?> writeClient = createWriteClient(options, 
tablePathStr, tablePath)) {
+    try (HoodieFlinkWriteClient<?> writeClient = 
HoodieCatalogUtil.createWriteClient(options, tablePathStr, tablePath, 
hadoopConf)) {
       writeClient.deletePartitions(Collections.singletonList(partitionPathStr),
               writeClient.createNewInstantTime())
           .forEach(writeStatus -> {
@@ -594,20 +596,26 @@ public class HoodieCatalog extends AbstractCatalog {
     return newOptions;
   }
 
-  private HoodieFlinkWriteClient<?> createWriteClient(
-      Map<String, String> options,
-      String tablePathStr,
-      ObjectPath tablePath) {
-    return FlinkWriteClients.createWriteClientV2(
-        Configuration.fromMap(options)
-            .set(FlinkOptions.TABLE_NAME, tablePath.getObjectName())
-            .set(FlinkOptions.SOURCE_AVRO_SCHEMA,
-                StreamerUtil.createMetaClient(tablePathStr, hadoopConf)
-                    
.getTableConfig().getTableCreateSchema().get().toString()));
+  private void refreshTableProperties(ObjectPath tablePath, CatalogBaseTable 
newCatalogTable) {
+    Map<String, String> options = newCatalogTable.getOptions();
+    final String avroSchema = AvroSchemaConverter.convertToSchema(
+        ((ResolvedCatalogTable) 
newCatalogTable).getResolvedSchema().toPhysicalRowDataType().getLogicalType(),
+        
AvroSchemaUtils.getAvroRecordQualifiedName(tablePath.getObjectName())).toString();
+    options.put(FlinkOptions.SOURCE_AVRO_SCHEMA.key(), avroSchema);
+    String tablePathStr = inferTablePath(catalogPathStr, tablePath);
+    try {
+      TableOptionProperties.overwriteProperties(tablePathStr, hadoopConf, 
options);
+    } catch (IOException e) {
+      throw new CatalogException(String.format("Update table path %s 
exception.", tablePathStr), e);
+    }
   }
 
   @VisibleForTesting
   protected String inferTablePath(String catalogPath, ObjectPath tablePath) {
     return String.format("%s/%s/%s", catalogPath, tablePath.getDatabaseName(), 
tablePath.getObjectName());
   }
+
+  private String inferTablePath(ObjectPath tablePath, CatalogBaseTable table) {
+    return inferTablePath(catalogPathStr, tablePath);
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
index f0ab2ebc755..99a23ded5e8 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
@@ -18,15 +18,34 @@
 
 package org.apache.hudi.table.catalog;
 
+import org.apache.hudi.adapter.Utils;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.exception.HoodieCatalogException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -44,8 +63,13 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
 import static org.apache.hudi.table.catalog.CatalogOptions.HIVE_SITE_FILE;
 
@@ -172,4 +196,109 @@ public class HoodieCatalogUtil {
 
     return values;
   }
+
+  /**
+   * Modifies an existing hoodie table. Note that the new and old {@link 
CatalogBaseTable} must be of the same kind. For example, this doesn't allow 
altering a regular table to partitioned table, or
+   * altering a mor table to a cow table, or altering index type and vice 
versa.
+   *
+   * @param catalog            hoodie catalog
+   * @param tablePath          path of the table or view to be modified
+   * @param newTable           the new table definition
+   * @param tableChanges       change to describe the modification between the 
newTable and the original table
+   * @param ignoreIfNotExists  flag to specify behavior when the table or view 
does not exist: if set to false, throw an exception, if set to true, do nothing.
+   * @param hadoopConf         hadoop configuration
+   * @param inferTablePathFunc function to infer hoodie table path
+   * @param postAlterTableFunc function to do post process after alter table
+   * @throws TableNotExistException if the table does not exist
+   * @throws CatalogException       in case of any runtime exception
+   */
+  protected static void alterTable(
+      AbstractCatalog catalog,
+      ObjectPath tablePath,
+      CatalogBaseTable newTable,
+      List tableChanges,
+      boolean ignoreIfNotExists,
+      org.apache.hadoop.conf.Configuration hadoopConf,
+      BiFunction<ObjectPath, CatalogBaseTable, String> inferTablePathFunc,
+      BiConsumer<ObjectPath, CatalogBaseTable> postAlterTableFunc) throws 
TableNotExistException, CatalogException {
+    checkNotNull(tablePath, "Table path cannot be null");
+    checkNotNull(newTable, "New catalog table cannot be null");
+
+    if (!isUpdatePermissible(catalog, tablePath, newTable, ignoreIfNotExists)) 
{
+      return;
+    }
+    if (!tableChanges.isEmpty()) {
+      CatalogBaseTable oldTable = catalog.getTable(tablePath);
+      HoodieFlinkWriteClient<?> writeClient = createWriteClient(tablePath, 
oldTable, hadoopConf, inferTablePathFunc);
+      Pair<InternalSchema, HoodieTableMetaClient> pair = 
writeClient.getInternalSchemaAndMetaClient();
+      InternalSchema oldSchema = pair.getLeft();
+      Function<LogicalType, Type> convertFunc = (LogicalType logicalType) -> 
AvroInternalSchemaConverter.convertToField(AvroSchemaConverter.convertToSchema(logicalType));
+      InternalSchema newSchema = Utils.applyTableChange(oldSchema, 
tableChanges, convertFunc);
+      if (!oldSchema.equals(newSchema)) {
+        writeClient.setOperationType(WriteOperationType.ALTER_SCHEMA);
+        writeClient.commitTableChange(newSchema, pair.getRight());
+      }
+    }
+    postAlterTableFunc.accept(tablePath, newTable);
+  }
+
+  protected static HoodieFlinkWriteClient<?> createWriteClient(
+      ObjectPath tablePath,
+      CatalogBaseTable table,
+      org.apache.hadoop.conf.Configuration hadoopConf,
+      BiFunction<ObjectPath, CatalogBaseTable, String> inferTablePathFunc) {
+    Map<String, String> options = table.getOptions();
+    String tablePathStr = inferTablePathFunc.apply(tablePath, table);
+    return createWriteClient(options, tablePathStr, tablePath, hadoopConf);
+  }
+
+  protected static HoodieFlinkWriteClient<?> createWriteClient(
+      Map<String, String> options,
+      String tablePathStr,
+      ObjectPath tablePath,
+      org.apache.hadoop.conf.Configuration hadoopConf) {
+    // enable auto-commit though ~
+    options.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), "true");
+    return FlinkWriteClients.createWriteClientV2(
+        org.apache.flink.configuration.Configuration.fromMap(options)
+            .set(FlinkOptions.TABLE_NAME, tablePath.getObjectName())
+            .set(
+                FlinkOptions.SOURCE_AVRO_SCHEMA,
+                StreamerUtil.createMetaClient(tablePathStr, hadoopConf)
+                    
.getTableConfig().getTableCreateSchema().get().toString()));
+  }
+
+  private static boolean sameOptions(Map<String, String> parameters1, 
Map<String, String> parameters2, ConfigOption<String> option) {
+    return parameters1.getOrDefault(option.key(), 
String.valueOf(option.defaultValue()))
+        .equalsIgnoreCase(parameters2.getOrDefault(option.key(), 
String.valueOf(option.defaultValue())));
+  }
+
+  private static boolean isUpdatePermissible(AbstractCatalog catalog, 
ObjectPath tablePath, CatalogBaseTable newCatalogTable, boolean 
ignoreIfNotExists) throws TableNotExistException {
+    if (!newCatalogTable.getOptions().getOrDefault(CONNECTOR.key(), 
"").equalsIgnoreCase("hudi")) {
+      throw new HoodieCatalogException(String.format("The %s is not hoodie 
table", tablePath.getObjectName()));
+    }
+    if (newCatalogTable instanceof CatalogView) {
+      throw new HoodieCatalogException("Hoodie catalog does not support to 
ALTER VIEW");
+    }
+
+    if (!catalog.tableExists(tablePath)) {
+      if (!ignoreIfNotExists) {
+        throw new TableNotExistException(catalog.getName(), tablePath);
+      } else {
+        return false;
+      }
+    }
+    CatalogBaseTable oldCatalogTable = catalog.getTable(tablePath);
+    List<String> oldPartitionKeys = 
HoodieCatalogUtil.getPartitionKeys((CatalogTable) oldCatalogTable);
+    List<String> newPartitionKeys = 
HoodieCatalogUtil.getPartitionKeys((CatalogTable) newCatalogTable);
+    if (!oldPartitionKeys.equals(newPartitionKeys)) {
+      throw new HoodieCatalogException("Hoodie catalog does not support to 
alter table partition keys");
+    }
+    Map<String, String> oldOptions = oldCatalogTable.getOptions();
+    if (!sameOptions(oldOptions, newCatalogTable.getOptions(), 
FlinkOptions.TABLE_TYPE)
+        || !sameOptions(oldOptions, newCatalogTable.getOptions(), 
FlinkOptions.INDEX_TYPE)) {
+      throw new HoodieCatalogException("Hoodie catalog does not support to 
alter table type and index type");
+    }
+    return true;
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index f94218f52f5..b9f2e882cae 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -19,38 +19,30 @@
 package org.apache.hudi.table.catalog;
 
 import org.apache.hudi.adapter.HiveCatalogConstants.AlterHiveDatabaseOp;
-import org.apache.hudi.adapter.Utils;
 import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieFileFormat;
-import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieCatalogException;
 import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
-import org.apache.hudi.internal.schema.InternalSchema;
-import org.apache.hudi.internal.schema.Type;
-import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
 import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
 import org.apache.hudi.table.HoodieTableFactory;
 import org.apache.hudi.table.format.FilePathUtils;
 import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.DataTypeUtils;
-import org.apache.hudi.util.FlinkWriteClients;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.avro.Schema;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.catalog.AbstractCatalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
@@ -80,7 +72,6 @@ import 
org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -109,7 +100,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.function.Function;
 
 import static org.apache.hudi.adapter.HiveCatalogConstants.ALTER_DATABASE_OP;
 import static 
org.apache.hudi.adapter.HiveCatalogConstants.DATABASE_LOCATION_URI;
@@ -734,43 +724,16 @@ public class HoodieHiveCatalog extends AbstractCatalog {
     }
   }
 
-  private boolean sameOptions(Map<String, String> existingOptions, Map<String, 
String> newOptions, ConfigOption option) {
-    return existingOptions.getOrDefault(option.key(), 
String.valueOf(option.defaultValue()))
-        .equalsIgnoreCase(newOptions.getOrDefault(option.key(), 
String.valueOf(option.defaultValue())));
-  }
-
   @Override
   public void alterTable(
       ObjectPath tablePath, CatalogBaseTable newCatalogTable, boolean 
ignoreIfNotExists)
       throws TableNotExistException, CatalogException {
-    checkNotNull(tablePath, "Table path cannot be null");
-    checkNotNull(newCatalogTable, "New catalog table cannot be null");
-
-    if (!isUpdatePermissible(tablePath, newCatalogTable, ignoreIfNotExists)) {
-      return;
-    }
-    refreshHMSTable(tablePath, newCatalogTable);
+    HoodieCatalogUtil.alterTable(this, tablePath, newCatalogTable, 
Collections.emptyList(), ignoreIfNotExists, hiveConf, this::inferTablePath, 
this::refreshHMSTable);
   }
 
   public void alterTable(ObjectPath tablePath, CatalogBaseTable 
newCatalogTable, List tableChanges,
       boolean ignoreIfNotExists) throws TableNotExistException, 
CatalogException {
-    checkNotNull(tablePath, "Table path cannot be null");
-    checkNotNull(newCatalogTable, "New catalog table cannot be null");
-
-    if (!isUpdatePermissible(tablePath, newCatalogTable, ignoreIfNotExists)) {
-      return;
-    }
-    CatalogBaseTable oldTable = getTable(tablePath);
-    HoodieFlinkWriteClient<?> writeClient = createWriteClient(tablePath, 
oldTable);
-    Pair<InternalSchema, HoodieTableMetaClient> pair = 
writeClient.getInternalSchemaAndMetaClient();
-    InternalSchema oldSchema = pair.getLeft();
-    Function<LogicalType, Type> convertFunc = (LogicalType logicalType) -> 
AvroInternalSchemaConverter.convertToField(AvroSchemaConverter.convertToSchema(logicalType));
-    InternalSchema newSchema = Utils.applyTableChange(oldSchema, tableChanges, 
convertFunc);
-    if (!oldSchema.equals(newSchema)) {
-      writeClient.setOperationType(WriteOperationType.ALTER_SCHEMA);
-      writeClient.commitTableChange(newSchema, pair.getRight());
-    }
-    refreshHMSTable(tablePath, newCatalogTable);
+    HoodieCatalogUtil.alterTable(this, tablePath, newCatalogTable, 
tableChanges, ignoreIfNotExists, hiveConf, this::inferTablePath, 
this::refreshHMSTable);
   }
 
   @Override
@@ -835,7 +798,7 @@ public class HoodieHiveCatalog extends AbstractCatalog {
         return;
       }
     }
-    try (HoodieFlinkWriteClient<?> writeClient = createWriteClient(tablePath, 
table)) {
+    try (HoodieFlinkWriteClient<?> writeClient = 
HoodieCatalogUtil.createWriteClient(tablePath, table, hiveConf, 
this::inferTablePath)) {
       boolean hiveStylePartitioning = 
Boolean.parseBoolean(table.getOptions().get(FlinkOptions.HIVE_STYLE_PARTITIONING.key()));
       writeClient.deletePartitions(
           
Collections.singletonList(HoodieCatalogUtil.inferPartitionPath(hiveStylePartitioning,
 partitionSpec)),
@@ -974,34 +937,6 @@ public class HoodieHiveCatalog extends AbstractCatalog {
     throw new HoodieCatalogException("Not supported.");
   }
 
-  private boolean isUpdatePermissible(ObjectPath tablePath, CatalogBaseTable 
newCatalogTable, boolean ignoreIfNotExists) throws TableNotExistException {
-    if (!newCatalogTable.getOptions().getOrDefault(CONNECTOR.key(), 
"").equalsIgnoreCase("hudi")) {
-      throw new HoodieCatalogException(String.format("The %s is not hoodie 
table", tablePath.getObjectName()));
-    }
-    if (newCatalogTable instanceof CatalogView) {
-      throw new HoodieCatalogException("Hoodie catalog does not support to 
ALTER VIEW");
-    }
-
-    try {
-      Table hiveTable = getHiveTable(tablePath);
-      List<String> oldPartitionKeys = 
HiveSchemaUtils.getFieldNames(hiveTable.getPartitionKeys());
-      List<String> newPartitionKeys = 
HoodieCatalogUtil.getPartitionKeys((CatalogTable) newCatalogTable);
-      if (!oldPartitionKeys.equals(newPartitionKeys)) {
-        throw new HoodieCatalogException("Hoodie catalog does not support to 
alter table partition keys");
-      }
-      if (!sameOptions(hiveTable.getParameters(), 
newCatalogTable.getOptions(), FlinkOptions.TABLE_TYPE)
-          || !sameOptions(hiveTable.getParameters(), 
newCatalogTable.getOptions(), FlinkOptions.INDEX_TYPE)) {
-        throw new HoodieCatalogException("Hoodie catalog does not support to 
alter table type and index type");
-      }
-      return true;
-    } catch (TableNotExistException e) {
-      if (!ignoreIfNotExists) {
-        throw e;
-      }
-      return false;
-    }
-  }
-
   private void refreshHMSTable(ObjectPath tablePath, CatalogBaseTable 
newCatalogTable) {
     try {
       boolean isMorTable = 
OptionsResolver.isMorTable(newCatalogTable.getOptions());
@@ -1032,20 +967,6 @@ public class HoodieHiveCatalog extends AbstractCatalog {
     }
   }
 
-  private HoodieFlinkWriteClient<?> createWriteClient(
-      ObjectPath tablePath,
-      CatalogBaseTable table) {
-    Map<String, String> options = table.getOptions();
-    // enable auto-commit though ~
-    options.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), "true");
-    return FlinkWriteClients.createWriteClientV2(
-        Configuration.fromMap(options)
-            .set(FlinkOptions.TABLE_NAME, tablePath.getObjectName())
-            .set(FlinkOptions.SOURCE_AVRO_SCHEMA,
-                StreamerUtil.createMetaClient(inferTablePath(tablePath, 
table), hiveConf)
-                    
.getTableConfig().getTableCreateSchema().get().toString()));
-  }
-
   @VisibleForTesting
   public IMetaStoreClient getClient() {
     return client;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
index 8f3e88417be..d32c0f3bda9 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
@@ -104,15 +104,33 @@ public class TableOptionProperties {
   public static void createProperties(String basePath,
                                       Configuration hadoopConf,
                                       Map<String, String> options) throws 
IOException {
+    Path propertiesFilePath = writePropertiesFile(basePath, hadoopConf, 
options, false);
+    LOG.info(String.format("Create file %s success.", propertiesFilePath));
+  }
+
+  /**
+   * Overwrite the {@link #FILE_NAME} meta file.
+   */
+  public static void overwriteProperties(String basePath,
+      Configuration hadoopConf,
+      Map<String, String> options) throws IOException {
+    Path propertiesFilePath = writePropertiesFile(basePath, hadoopConf, 
options, true);
+    LOG.info(String.format("Update file %s success.", propertiesFilePath));
+  }
+
+  private static Path writePropertiesFile(String basePath,
+      Configuration hadoopConf,
+      Map<String, String> options,
+      boolean isOverwrite) throws IOException {
     Path propertiesFilePath = getPropertiesFilePath(basePath);
     FileSystem fs = FSUtils.getFs(basePath, hadoopConf);
-    try (FSDataOutputStream outputStream = fs.create(propertiesFilePath)) {
+    try (FSDataOutputStream outputStream = fs.create(propertiesFilePath, 
isOverwrite)) {
       Properties properties = new Properties();
       properties.putAll(options);
       properties.store(outputStream,
           "Table option properties saved on " + new 
Date(System.currentTimeMillis()));
     }
-    LOG.info(String.format("Create file %s success.", propertiesFilePath));
+    return propertiesFilePath;
   }
 
   /**
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolutionBySQL.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolutionBySQL.java
index 8040f6df279..a98e1172798 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolutionBySQL.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolutionBySQL.java
@@ -21,8 +21,6 @@ package org.apache.hudi.table;
 import org.apache.hudi.adapter.TestHoodieCatalogs;
 import org.apache.hudi.exception.HoodieCatalogException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
-import org.apache.hudi.table.catalog.HoodieCatalogTestUtils;
-import org.apache.hudi.table.catalog.HoodieHiveCatalog;
 import org.apache.hudi.utils.FlinkMiniCluster;
 import org.apache.hudi.utils.TestTableEnvs;
 
@@ -30,10 +28,12 @@ import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CollectionUtil;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.condition.EnabledIf;
@@ -49,13 +49,15 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
- * IT cases for schema evolution by alter table SQL using {@link 
HoodieHiveCatalog}.
+ * IT cases for schema evolution by alter table SQL using catalog.
  */
 @EnabledIf("supportAdvancedAlterTableSyntax")
 @ExtendWith(FlinkMiniCluster.class)
-public class ITTestSchemaEvolutionBySQL {
+public abstract class ITTestSchemaEvolutionBySQL {
+  protected static final String CATALOG_NAME = "hudi_catalog";
+  protected static final String DB_NAME = "hudi";
   private TableEnvironment tableEnv;
-  private HoodieHiveCatalog hoodieCatalog;
+  protected Catalog catalog;
 
   private static final String CREATE_TABLE_DDL = ""
       + "create table t1("
@@ -84,15 +86,22 @@ public class ITTestSchemaEvolutionBySQL {
     tableEnv = TestTableEnvs.getBatchTableEnv();
     tableEnv.getConfig().getConfiguration()
         
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
-    hoodieCatalog = HoodieCatalogTestUtils.createHiveCatalog("hudi_catalog");
-    tableEnv.registerCatalog("hudi_catalog", hoodieCatalog);
-    tableEnv.executeSql("use catalog hudi_catalog");
-    String dbName = "hudi";
-    tableEnv.executeSql("create database " + dbName);
-    tableEnv.executeSql("use " + dbName);
+    catalog = createCatalog();
+    catalog.open();
+    tableEnv.registerCatalog(CATALOG_NAME, catalog);
+    tableEnv.executeSql("use catalog " + CATALOG_NAME);
+    tableEnv.executeSql("create database if not exists " + DB_NAME);
+    tableEnv.executeSql("use " + DB_NAME);
     tableEnv.executeSql(CREATE_TABLE_DDL);
   }
 
+  @AfterEach
+  void afterEach() {
+    if (catalog != null) {
+      catalog.close();
+    }
+  }
+
   @Test
   void testAddColumns() {
     execInsertSql(tableEnv, INITIALIZE_INSERT_SQL);
@@ -189,11 +198,11 @@ public class ITTestSchemaEvolutionBySQL {
   @Test
   void testSetAndResetProperty() throws Exception {
     tableEnv.executeSql("alter table t1 set ('k' = 'v')");
-    CatalogBaseTable table = hoodieCatalog.getTable(new ObjectPath("hudi", 
"t1"));
+    CatalogBaseTable table = catalog.getTable(new ObjectPath("hudi", "t1"));
     assertEquals(table.getOptions().get("k"), "v");
 
     tableEnv.executeSql("alter table t1 reset ('k')");
-    table = hoodieCatalog.getTable(new ObjectPath("hudi", "t1"));
+    table = catalog.getTable(new ObjectPath("hudi", "t1"));
     assertFalse(table.getOptions().containsKey("k"));
   }
 
@@ -266,4 +275,6 @@ public class ITTestSchemaEvolutionBySQL {
       // ignored
     }
   }
+
+  protected abstract Catalog createCatalog();
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolutionBySQLWithDFSCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolutionBySQLWithDFSCatalog.java
new file mode 100644
index 00000000000..91778af905b
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolutionBySQLWithDFSCatalog.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.table.catalog.HoodieCatalog;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.Catalog;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH;
+import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DATABASE;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * IT cases for schema evolution by alter table SQL using {@link 
HoodieCatalog}.
+ */
+public class ITTestSchemaEvolutionBySQLWithDFSCatalog extends 
ITTestSchemaEvolutionBySQL {
+
+  @TempDir
+  File tempFile;
+
+  @Override
+  protected Catalog createCatalog() {
+    Map<String, String> catalogOptions = new HashMap<>();
+    assertThrows(
+        ValidationException.class,
+        () -> catalog = new HoodieCatalog(CATALOG_NAME, 
Configuration.fromMap(catalogOptions)));
+    String catalogPathStr = tempFile.getAbsolutePath();
+    catalogOptions.put(CATALOG_PATH.key(), catalogPathStr);
+    catalogOptions.put(DEFAULT_DATABASE.key(), DB_NAME);
+    return new HoodieCatalog(DB_NAME, Configuration.fromMap(catalogOptions));
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolutionBySQLWithHMSCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolutionBySQLWithHMSCatalog.java
new file mode 100644
index 00000000000..57910b1b250
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolutionBySQLWithHMSCatalog.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.table.catalog.HoodieCatalogTestUtils;
+import org.apache.hudi.table.catalog.HoodieHiveCatalog;
+
+import org.apache.flink.table.catalog.Catalog;
+
+/**
+ * IT cases for schema evolution by alter table SQL using {@link 
HoodieHiveCatalog}.
+ */
+public class ITTestSchemaEvolutionBySQLWithHMSCatalog extends 
ITTestSchemaEvolutionBySQL {
+
+  @Override
+  protected Catalog createCatalog() {
+    return HoodieCatalogTestUtils.createHiveCatalog(CATALOG_NAME);
+  }
+}

Reply via email to