This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 4669a4cad5dc71b44943c9aa357e95fd2895ca55
Author: empcl <[email protected]>
AuthorDate: Wed May 15 01:56:06 2024 -0700

    [HUDI-7608] Fix Flink table creation configuration not taking effect when 
writing to Spark (#11005)
---
 .../hudi/table/catalog/HoodieHiveCatalog.java      | 14 +++++---
 .../hudi/table/catalog/TestHoodieHiveCatalog.java  | 40 ++++++++++++++++++++++
 2 files changed, 50 insertions(+), 4 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index 09bf9460635..d18e2fe97c9 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.OptionsResolver;
@@ -105,6 +106,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
 import static org.apache.hudi.adapter.HiveCatalogConstants.ALTER_DATABASE_OP;
 import static 
org.apache.hudi.adapter.HiveCatalogConstants.DATABASE_LOCATION_URI;
 import static org.apache.hudi.adapter.HiveCatalogConstants.DATABASE_OWNER_NAME;
@@ -115,10 +120,6 @@ import static 
org.apache.hudi.configuration.FlinkOptions.PATH;
 import static org.apache.hudi.table.catalog.TableOptionProperties.COMMENT;
 import static 
org.apache.hudi.table.catalog.TableOptionProperties.PK_CONSTRAINT_NAME;
 import static 
org.apache.hudi.table.catalog.TableOptionProperties.SPARK_SOURCE_PROVIDER;
-import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
 
 /**
  * A catalog implementation for Hoodie based on MetaStore.
@@ -556,6 +557,11 @@ public class HoodieHiveCatalog extends AbstractCatalog {
     hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000));
 
     Map<String, String> properties = new HashMap<>(table.getOptions());
+    if (properties.containsKey(FlinkOptions.INDEX_TYPE.key())
+        && !properties.containsKey(HoodieIndexConfig.INDEX_TYPE.key())) {
+      properties.put(HoodieIndexConfig.INDEX_TYPE.key(), 
properties.get(FlinkOptions.INDEX_TYPE.key()));
+    }
+    properties.remove(FlinkOptions.INDEX_TYPE.key());
     hiveConf.getAllProperties().forEach((k, v) -> properties.put("hadoop." + 
k, String.valueOf(v)));
 
     if (external) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
index 1ef03291e9a..24621e1b8d7 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieCatalogException;
 import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
@@ -258,6 +259,45 @@ public class TestHoodieHiveCatalog {
     assertEquals(keyGeneratorClassName, 
NonpartitionedAvroKeyGenerator.class.getName());
   }
 
+  @Test
+  void testCreateTableWithIndexType() throws TableNotExistException, 
TableAlreadyExistException, DatabaseNotExistException {
+    Map<String, String> options = new HashMap<>();
+    options.put(FactoryUtil.CONNECTOR.key(), "hudi");
+    // hoodie.index.type
+    options.put(HoodieIndexConfig.INDEX_TYPE.key(), "BUCKET");
+    CatalogTable table =
+        new CatalogTableImpl(schema, partitions, options, "hudi table");
+    hoodieCatalog.createTable(tablePath, table, false);
+    Map<String, String> params = 
hoodieCatalog.getHiveTable(tablePath).getParameters();
+    assertResult(params, "BUCKET");
+    options.remove(HoodieIndexConfig.INDEX_TYPE.key());
+
+    // index.type
+    options.put(FlinkOptions.INDEX_TYPE.key(), 
FlinkOptions.INDEX_TYPE.defaultValue());
+    table =
+        new CatalogTableImpl(schema, partitions, options, "hudi table");
+    ObjectPath newTablePath1 = new ObjectPath("default", "test" + 
System.currentTimeMillis());
+    hoodieCatalog.createTable(newTablePath1, table, false);
+
+    params = hoodieCatalog.getHiveTable(newTablePath1).getParameters();
+    assertResult(params, FlinkOptions.INDEX_TYPE.defaultValue());
+
+    // index.type + hoodie.index.type
+    options.put(HoodieIndexConfig.INDEX_TYPE.key(), "BUCKET");
+    table = new CatalogTableImpl(schema, partitions, options, "hudi table");
+    ObjectPath newTablePath2 = new ObjectPath("default", "test" + 
System.currentTimeMillis());
+    hoodieCatalog.createTable(newTablePath2, table, false);
+
+    params = hoodieCatalog.getHiveTable(newTablePath2).getParameters();
+    assertResult(params, "BUCKET");
+  }
+
+  private void assertResult(Map<String, String> params, String index) {
+    assertTrue(params.containsKey(HoodieIndexConfig.INDEX_TYPE.key()));
+    assertFalse(params.containsKey(FlinkOptions.INDEX_TYPE.key()));
+    assertThat(params.get(HoodieIndexConfig.INDEX_TYPE.key()), is(index));
+  }
+
   @Test
   void testCreateTableWithoutPreCombineKey() throws 
TableAlreadyExistException, DatabaseNotExistException, IOException, 
TableNotExistException {
     String db = "default";

Reply via email to