This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 4669a4cad5dc71b44943c9aa357e95fd2895ca55 Author: empcl <[email protected]> AuthorDate: Wed May 15 01:56:06 2024 -0700 [HUDI-7608] Fix Flink table creation configuration not taking effect when writing to Spark (#11005) --- .../hudi/table/catalog/HoodieHiveCatalog.java | 14 +++++--- .../hudi/table/catalog/TestHoodieHiveCatalog.java | 40 ++++++++++++++++++++++ 2 files changed, 50 insertions(+), 4 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java index 09bf9460635..d18e2fe97c9 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.util.ConfigUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.OptionsResolver; @@ -105,6 +106,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly; import static org.apache.hudi.adapter.HiveCatalogConstants.ALTER_DATABASE_OP; import static org.apache.hudi.adapter.HiveCatalogConstants.DATABASE_LOCATION_URI; import static org.apache.hudi.adapter.HiveCatalogConstants.DATABASE_OWNER_NAME; @@ -115,10 +120,6 @@ import static org.apache.hudi.configuration.FlinkOptions.PATH; import static org.apache.hudi.table.catalog.TableOptionProperties.COMMENT; import static org.apache.hudi.table.catalog.TableOptionProperties.PK_CONSTRAINT_NAME; import static org.apache.hudi.table.catalog.TableOptionProperties.SPARK_SOURCE_PROVIDER; -import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; -import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly; /** * A catalog implementation for Hoodie based on MetaStore. @@ -556,6 +557,11 @@ public class HoodieHiveCatalog extends AbstractCatalog { hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000)); Map<String, String> properties = new HashMap<>(table.getOptions()); + if (properties.containsKey(FlinkOptions.INDEX_TYPE.key()) + && !properties.containsKey(HoodieIndexConfig.INDEX_TYPE.key())) { + properties.put(HoodieIndexConfig.INDEX_TYPE.key(), properties.get(FlinkOptions.INDEX_TYPE.key())); + } + properties.remove(FlinkOptions.INDEX_TYPE.key()); hiveConf.getAllProperties().forEach((k, v) -> properties.put("hadoop." + k, String.valueOf(v))); if (external) { diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java index 1ef03291e9a..24621e1b8d7 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieCatalogException; import org.apache.hudi.keygen.ComplexAvroKeyGenerator; @@ -258,6 +259,45 @@ public class TestHoodieHiveCatalog { assertEquals(keyGeneratorClassName, NonpartitionedAvroKeyGenerator.class.getName()); } + @Test + void testCreateTableWithIndexType() throws TableNotExistException, TableAlreadyExistException, DatabaseNotExistException { + Map<String, String> options = new HashMap<>(); + options.put(FactoryUtil.CONNECTOR.key(), "hudi"); + // hoodie.index.type + options.put(HoodieIndexConfig.INDEX_TYPE.key(), "BUCKET"); + CatalogTable table = + new CatalogTableImpl(schema, partitions, options, "hudi table"); + hoodieCatalog.createTable(tablePath, table, false); + Map<String, String> params = hoodieCatalog.getHiveTable(tablePath).getParameters(); + assertResult(params, "BUCKET"); + options.remove(HoodieIndexConfig.INDEX_TYPE.key()); + + // index.type + options.put(FlinkOptions.INDEX_TYPE.key(), FlinkOptions.INDEX_TYPE.defaultValue()); + table = + new CatalogTableImpl(schema, partitions, options, "hudi table"); + ObjectPath newTablePath1 = new ObjectPath("default", "test" + System.currentTimeMillis()); + hoodieCatalog.createTable(newTablePath1, table, false); + + params = hoodieCatalog.getHiveTable(newTablePath1).getParameters(); + assertResult(params, FlinkOptions.INDEX_TYPE.defaultValue()); + + // index.type + hoodie.index.type + options.put(HoodieIndexConfig.INDEX_TYPE.key(), "BUCKET"); + table = new CatalogTableImpl(schema, partitions, options, "hudi table"); + ObjectPath newTablePath2 = new ObjectPath("default", "test" + System.currentTimeMillis()); + hoodieCatalog.createTable(newTablePath2, table, false); + + params = hoodieCatalog.getHiveTable(newTablePath2).getParameters(); + assertResult(params, "BUCKET"); + } + + private void assertResult(Map<String, String> params, String index) { + assertTrue(params.containsKey(HoodieIndexConfig.INDEX_TYPE.key())); + assertFalse(params.containsKey(FlinkOptions.INDEX_TYPE.key())); + assertThat(params.get(HoodieIndexConfig.INDEX_TYPE.key()), is(index)); + } + @Test void testCreateTableWithoutPreCombineKey() throws TableAlreadyExistException, DatabaseNotExistException, IOException, TableNotExistException { String db = "default";
