This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 278f57828fd7a26709b8446a58094de9ce6ee359
Author: empcl <[email protected]>
AuthorDate: Tue Apr 23 13:09:10 2024 +0800

    [HUDI-7608] Fix Flink table creation configuration not taking effect when 
writing to Spark (#11005)
---
 .../hudi/table/catalog/HoodieHiveCatalog.java      |  5 +++
 .../hudi/table/catalog/TestHoodieHiveCatalog.java  | 40 ++++++++++++++++++++++
 2 files changed, 45 insertions(+)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index 09bf9460635..98e4cb793a0 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -556,6 +556,11 @@ public class HoodieHiveCatalog extends AbstractCatalog {
     hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000));
 
     Map<String, String> properties = new HashMap<>(table.getOptions());
+    if (properties.containsKey(FlinkOptions.INDEX_TYPE.key())
+        && !properties.containsKey(HoodieIndexConfig.INDEX_TYPE.key())) {
+      properties.put(HoodieIndexConfig.INDEX_TYPE.key(), 
properties.get(FlinkOptions.INDEX_TYPE.key()));
+    }
+    properties.remove(FlinkOptions.INDEX_TYPE.key());
     hiveConf.getAllProperties().forEach((k, v) -> properties.put("hadoop." + 
k, String.valueOf(v)));
 
     if (external) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
index 1ef03291e9a..24621e1b8d7 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieCatalogException;
 import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
@@ -258,6 +259,45 @@ public class TestHoodieHiveCatalog {
     assertEquals(keyGeneratorClassName, 
NonpartitionedAvroKeyGenerator.class.getName());
   }
 
+  @Test
+  void testCreateTableWithIndexType() throws TableNotExistException, 
TableAlreadyExistException, DatabaseNotExistException {
+    Map<String, String> options = new HashMap<>();
+    options.put(FactoryUtil.CONNECTOR.key(), "hudi");
+    // hoodie.index.type
+    options.put(HoodieIndexConfig.INDEX_TYPE.key(), "BUCKET");
+    CatalogTable table =
+        new CatalogTableImpl(schema, partitions, options, "hudi table");
+    hoodieCatalog.createTable(tablePath, table, false);
+    Map<String, String> params = 
hoodieCatalog.getHiveTable(tablePath).getParameters();
+    assertResult(params, "BUCKET");
+    options.remove(HoodieIndexConfig.INDEX_TYPE.key());
+
+    // index.type
+    options.put(FlinkOptions.INDEX_TYPE.key(), 
FlinkOptions.INDEX_TYPE.defaultValue());
+    table =
+        new CatalogTableImpl(schema, partitions, options, "hudi table");
+    ObjectPath newTablePath1 = new ObjectPath("default", "test" + 
System.currentTimeMillis());
+    hoodieCatalog.createTable(newTablePath1, table, false);
+
+    params = hoodieCatalog.getHiveTable(newTablePath1).getParameters();
+    assertResult(params, FlinkOptions.INDEX_TYPE.defaultValue());
+
+    // index.type + hoodie.index.type
+    options.put(HoodieIndexConfig.INDEX_TYPE.key(), "BUCKET");
+    table = new CatalogTableImpl(schema, partitions, options, "hudi table");
+    ObjectPath newTablePath2 = new ObjectPath("default", "test" + 
System.currentTimeMillis());
+    hoodieCatalog.createTable(newTablePath2, table, false);
+
+    params = hoodieCatalog.getHiveTable(newTablePath2).getParameters();
+    assertResult(params, "BUCKET");
+  }
+
+  private void assertResult(Map<String, String> params, String index) {
+    assertTrue(params.containsKey(HoodieIndexConfig.INDEX_TYPE.key()));
+    assertFalse(params.containsKey(FlinkOptions.INDEX_TYPE.key()));
+    assertThat(params.get(HoodieIndexConfig.INDEX_TYPE.key()), is(index));
+  }
+
   @Test
   void testCreateTableWithoutPreCombineKey() throws 
TableAlreadyExistException, DatabaseNotExistException, IOException, 
TableNotExistException {
     String db = "default";

Reply via email to