This is an automated email from the ASF dual-hosted git repository. vinish pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/main by this push: new f194f4cb [590] Add Hudi HMS Catalog Sync Implementation f194f4cb is described below commit f194f4cbc8a54a2cfd49c18b14d656abb046baf2 Author: Vamsi <va...@onehouse.ai> AuthorDate: Tue Feb 11 20:00:00 2025 +0530 [590] Add Hudi HMS Catalog Sync Implementation --- ...va => HudiCatalogTablePropertiesExtractor.java} | 14 +- ...> TestHudiCatalogTablePropertiesExtractor.java} | 10 +- xtable-hive-metastore/pom.xml | 8 + .../org/apache/xtable/hms/HMSCatalogConfig.java | 11 + .../hms/HMSCatalogPartitionSyncOperations.java | 221 ++++++++++ .../apache/xtable/hms/HMSCatalogSyncClient.java | 22 +- .../xtable/hms/HMSCatalogTableBuilderFactory.java | 7 +- .../hms/table/HudiHMSCatalogTableBuilder.java | 204 +++++++++ ...ntTestBase.java => HMSCatalogSyncTestBase.java} | 10 +- .../hms/TestHMSCatalogPartitionSyncOperations.java | 470 +++++++++++++++++++++ .../xtable/hms/TestHMSCatalogSyncClient.java | 95 +++-- .../hms/TestHMSCatalogTableBuilderFactory.java | 8 +- .../hms/table/TestDeltaHMSCatalogTableBuilder.java | 4 +- .../hms/table/TestHudiHMSCatalogTableBuilder.java | 143 +++++++ .../table/TestIcebergHMSCatalogTableBuilder.java | 4 +- .../apache/xtable/utilities/RunCatalogSync.java | 25 +- .../src/test/resources/catalogConfig.yaml | 1 + 17 files changed, 1200 insertions(+), 57 deletions(-) diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTableUtils.java b/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTablePropertiesExtractor.java similarity index 91% rename from xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTableUtils.java rename to xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTablePropertiesExtractor.java index e6d99706..ff26e600 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTableUtils.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTablePropertiesExtractor.java @@ -23,6 +23,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + import org.apache.spark.sql.types.StructType; import org.apache.hudi.common.util.StringUtils; @@ -33,15 +36,22 @@ import org.apache.xtable.model.schema.InternalType; import org.apache.xtable.schema.SparkSchemaExtractor; /** Util class to fetch details about Hudi table */ -public class HudiCatalogTableUtils { +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class HudiCatalogTablePropertiesExtractor { + + private static final HudiCatalogTablePropertiesExtractor INSTANCE = + new HudiCatalogTablePropertiesExtractor(); + public static HudiCatalogTablePropertiesExtractor getInstance() { + return INSTANCE; + } /** * Get Spark Sql related table properties. This is used for spark datasource table. * * @param schema The schema to write to the table. * @return A new parameters added the spark's table properties. */ - public static Map<String, String> getSparkTableProperties( + public Map<String, String> getSparkTableProperties( List<String> partitionNames, String sparkVersion, int schemaLengthThreshold, diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogTableUtils.java b/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogTablePropertiesExtractor.java similarity index 94% rename from xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogTableUtils.java rename to xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogTablePropertiesExtractor.java index e39d63e5..e08674bf 100644 --- a/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogTableUtils.java +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogTablePropertiesExtractor.java @@ -32,7 +32,7 @@ import org.apache.xtable.model.schema.InternalField; import org.apache.xtable.model.schema.InternalSchema; import org.apache.xtable.model.schema.InternalType; -public class TestHudiCatalogTableUtils { +public class TestHudiCatalogTablePropertiesExtractor { @Test void testGetSparkTableProperties() { @@ -81,8 +81,8 @@ public class TestHudiCatalogTableUtils { .build(); Map<String, String> result = - HudiCatalogTableUtils.getSparkTableProperties( - partitionNames, sparkVersion, schemaLengthThreshold, schema); + HudiCatalogTablePropertiesExtractor.getInstance() + .getSparkTableProperties(partitionNames, sparkVersion, schemaLengthThreshold, schema); // Validate results assertEquals("hudi", result.get("spark.sql.sources.provider")); @@ -127,8 +127,8 @@ public class TestHudiCatalogTableUtils { // Call the method Map<String, String> result = - HudiCatalogTableUtils.getSparkTableProperties( - partitionNames, "", schemaLengthThreshold, schema); + HudiCatalogTablePropertiesExtractor.getInstance() + .getSparkTableProperties(partitionNames, "", schemaLengthThreshold, schema); assertEquals("hudi", result.get("spark.sql.sources.provider")); assertNull(result.get("spark.sql.create.version")); diff --git a/xtable-hive-metastore/pom.xml b/xtable-hive-metastore/pom.xml index 01037a1f..20520f13 100644 --- a/xtable-hive-metastore/pom.xml +++ b/xtable-hive-metastore/pom.xml @@ -42,6 +42,14 @@ <scope>provided</scope> </dependency> + <!-- Hudi dependencies --> + <dependency> + <groupId>org.apache.hudi</groupId> + <artifactId>hudi-hive-sync</artifactId> + <version>${hudi.version}</version> + <scope>provided</scope> + </dependency> + <!-- Iceberg dependencies --> <dependency> <groupId>org.apache.iceberg</groupId> diff --git a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogConfig.java b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogConfig.java index f6f9eabd..36fffdd9 100644 --- a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogConfig.java +++ b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogConfig.java @@ -26,6 +26,8 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.ToString; +import org.apache.hudi.hive.MultiPartKeysValueExtractor; + import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationFeature; @@ -43,6 +45,15 @@ public class HMSCatalogConfig { @JsonProperty("externalCatalog.hms.serverUrl") private final String serverUrl; + @JsonProperty("externalCatalog.hms.schema_string_length_thresh") + private final int schemaLengthThreshold = 4000; + + @JsonProperty("externalCatalog.hms.partition_extractor_class") + private final String partitionExtractorClass = MultiPartKeysValueExtractor.class.getName(); + + @JsonProperty("externalCatalog.hms.max_partitions_per_request") + private final int maxPartitionsPerRequest = 1000; + protected static HMSCatalogConfig of(Map<String, String> properties) { try { return OBJECT_MAPPER.readValue( diff --git a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogPartitionSyncOperations.java b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogPartitionSyncOperations.java new file mode 100644 index 00000000..89b53ca5 --- /dev/null +++ b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogPartitionSyncOperations.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.hms; + +import static org.apache.xtable.catalog.CatalogUtils.toHierarchicalTableIdentifier; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import lombok.extern.log4j.Log4j2; + +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.thrift.TException; + +import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.exception.TableNotFoundException; + +import org.apache.xtable.catalog.CatalogPartition; +import org.apache.xtable.catalog.CatalogPartitionSyncOperations; +import org.apache.xtable.exception.CatalogSyncException; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; +import org.apache.xtable.model.catalog.HierarchicalTableIdentifier; + +@Log4j2 +public class HMSCatalogPartitionSyncOperations implements CatalogPartitionSyncOperations { + + private final IMetaStoreClient metaStoreClient; + private final HMSCatalogConfig catalogConfig; + + public HMSCatalogPartitionSyncOperations( + IMetaStoreClient metaStoreClient, HMSCatalogConfig hmsCatalogConfig) { + this.metaStoreClient = metaStoreClient; + this.catalogConfig = hmsCatalogConfig; + } + + @Override + public List<CatalogPartition> getAllPartitions(CatalogTableIdentifier catalogTableIdentifier) { + HierarchicalTableIdentifier tableIdentifier = + toHierarchicalTableIdentifier(catalogTableIdentifier); + try { + return metaStoreClient + .listPartitions( + tableIdentifier.getDatabaseName(), tableIdentifier.getTableName(), (short) -1) + .stream() + .map(p -> new CatalogPartition(p.getValues(), p.getSd().getLocation())) + .collect(Collectors.toList()); + } catch (TException e) { + throw new CatalogSyncException( + "Failed to get all partitions for table " + tableIdentifier, e); + } + } + + @Override + public void addPartitionsToTable( + CatalogTableIdentifier catalogTableIdentifier, List<CatalogPartition> partitionsToAdd) { + HierarchicalTableIdentifier tableIdentifier = + toHierarchicalTableIdentifier(catalogTableIdentifier); + if (partitionsToAdd.isEmpty()) { + log.info("No partitions to add for {}", tableIdentifier); + return; + } + log.info("Adding partitions {} to table {}", partitionsToAdd.size(), tableIdentifier); + try { + StorageDescriptor sd = + metaStoreClient + .getTable(tableIdentifier.getDatabaseName(), tableIdentifier.getTableName()) + .getSd(); + + CollectionUtils.batches(partitionsToAdd, catalogConfig.getMaxPartitionsPerRequest()) + .forEach( + batch -> { + List<Partition> partitionList = new ArrayList<>(); + batch.forEach( + partition -> { + partitionList.add(createPartition(tableIdentifier, partition, sd)); + }); + try { + metaStoreClient.add_partitions(partitionList, true, false); + } catch (TException e) { + log.error("{} add partition failed", tableIdentifier, e); + throw new CatalogSyncException(tableIdentifier + " add partition failed", e); + } + log.info("Add batch partitions done: {}", partitionList.size()); + }); + } catch (TException e) { + log.error("Failed to add partitions to table {}", tableIdentifier, e); + throw new CatalogSyncException(tableIdentifier + " add partition failed", e); + } + } + + @Override + public void updatePartitionsToTable( + CatalogTableIdentifier catalogTableIdentifier, List<CatalogPartition> changedPartitions) { + HierarchicalTableIdentifier tableIdentifier = + toHierarchicalTableIdentifier(catalogTableIdentifier); + try { + Table table = + metaStoreClient.getTable( + tableIdentifier.getDatabaseName(), tableIdentifier.getTableName()); + StorageDescriptor tableSd = table.getSd(); + + List<Partition> updatedPartitions = new ArrayList<>(); + + changedPartitions.forEach( + partition -> { + updatedPartitions.add(createPartition(tableIdentifier, partition, tableSd)); + }); + + // Update partitions + metaStoreClient.alter_partitions( + tableIdentifier.getDatabaseName(), tableIdentifier.getTableName(), updatedPartitions); + } catch (TException e) { + throw new CatalogSyncException( + "Failed to update partitions for the table " + tableIdentifier, e); + } + } + + @Override + public void dropPartitions( + CatalogTableIdentifier catalogTableIdentifier, List<CatalogPartition> partitionsToDrop) { + HierarchicalTableIdentifier tableIdentifier = + toHierarchicalTableIdentifier(catalogTableIdentifier); + try { + for (CatalogPartition partition : partitionsToDrop) { + metaStoreClient.dropPartition( + tableIdentifier.getDatabaseName(), + tableIdentifier.getTableName(), + partition.getValues(), + false); + } + } catch (TException e) { + throw new CatalogSyncException("Failed to drop partitions for table " + tableIdentifier, e); + } + } + + @Override + public Map<String, String> getTableProperties( + CatalogTableIdentifier catalogTableIdentifier, List<String> keysToRetrieve) { + HierarchicalTableIdentifier tableIdentifier = + toHierarchicalTableIdentifier(catalogTableIdentifier); + try { + Table table = + metaStoreClient.getTable( + tableIdentifier.getDatabaseName(), tableIdentifier.getTableName()); + Map<String, String> tableParameters = table.getParameters(); + + return keysToRetrieve.stream() + .filter(tableParameters::containsKey) + .collect(Collectors.toMap(key -> key, tableParameters::get)); + } catch (TableNotFoundException | TException e) { + throw new CatalogSyncException( + "failed to fetch last time synced properties for table" + tableIdentifier, e); + } + } + + @Override + public void updateTableProperties( + CatalogTableIdentifier catalogTableIdentifier, Map<String, String> propertiesToUpdate) { + HierarchicalTableIdentifier tableIdentifier = + toHierarchicalTableIdentifier(catalogTableIdentifier); + try { + if (propertiesToUpdate == null || propertiesToUpdate.isEmpty()) { + return; + } + + Table table = + metaStoreClient.getTable( + tableIdentifier.getDatabaseName(), tableIdentifier.getTableName()); + Map<String, String> tableParameters = table.getParameters(); + tableParameters.putAll(propertiesToUpdate); + table.setParameters(tableParameters); + metaStoreClient.alter_table( + tableIdentifier.getDatabaseName(), tableIdentifier.getTableName(), table); + } catch (TableNotFoundException | TException e) { + throw new CatalogSyncException( + "failed to update last time synced properties for table" + tableIdentifier, e); + } + } + + private Partition createPartition( + HierarchicalTableIdentifier tableIdentifier, + CatalogPartition partition, + StorageDescriptor sd) { + StorageDescriptor partitionSd = new StorageDescriptor(); + partitionSd.setCols(sd.getCols()); + partitionSd.setInputFormat(sd.getInputFormat()); + partitionSd.setOutputFormat(sd.getOutputFormat()); + partitionSd.setSerdeInfo(sd.getSerdeInfo()); + partitionSd.setLocation(partition.getStorageLocation()); + + return new Partition( + partition.getValues(), + tableIdentifier.getDatabaseName(), + tableIdentifier.getTableName(), + 0, + 0, + partitionSd, + null); + } +} diff --git a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogSyncClient.java b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogSyncClient.java index 537de834..acfdfbd9 100644 --- a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogSyncClient.java +++ b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogSyncClient.java @@ -22,6 +22,7 @@ import static org.apache.xtable.catalog.CatalogUtils.toHierarchicalTableIdentifi import java.time.ZonedDateTime; import java.util.Collections; +import java.util.Optional; import lombok.extern.log4j.Log4j2; @@ -36,7 +37,9 @@ import org.apache.thrift.TException; import com.google.common.annotations.VisibleForTesting; +import org.apache.xtable.catalog.CatalogPartitionSyncTool; import org.apache.xtable.catalog.CatalogTableBuilder; +import org.apache.xtable.catalog.CatalogUtils; import org.apache.xtable.conversion.ExternalCatalogConfig; import org.apache.xtable.exception.CatalogSyncException; import org.apache.xtable.model.InternalTable; @@ -55,6 +58,7 @@ public class HMSCatalogSyncClient implements CatalogSyncClient<Table> { private Configuration configuration; private IMetaStoreClient metaStoreClient; private CatalogTableBuilder<Table, Table> tableBuilder; + private Optional<CatalogPartitionSyncTool> partitionSyncTool; // For loading the instance using ServiceLoader public HMSCatalogSyncClient() {} @@ -70,12 +74,14 @@ public class HMSCatalogSyncClient implements CatalogSyncClient<Table> { HMSCatalogConfig hmsCatalogConfig, Configuration configuration, IMetaStoreClient metaStoreClient, - CatalogTableBuilder tableBuilder) { + CatalogTableBuilder tableBuilder, + Optional<CatalogPartitionSyncTool> partitionSyncTool) { this.catalogConfig = catalogConfig; this.hmsCatalogConfig = hmsCatalogConfig; this.configuration = configuration; this.metaStoreClient = metaStoreClient; this.tableBuilder = tableBuilder; + this.partitionSyncTool = partitionSyncTool; } @Override @@ -145,6 +151,8 @@ public class HMSCatalogSyncClient implements CatalogSyncClient<Table> { } catch (TException e) { throw new CatalogSyncException("Failed to create table: " + tableIdentifier.getId(), e); } + + partitionSyncTool.ifPresent(tool -> tool.syncPartitions(table, tableIdentifier)); } @Override @@ -158,6 +166,8 @@ public class HMSCatalogSyncClient implements CatalogSyncClient<Table> { } catch (TException e) { throw new CatalogSyncException("Failed to refresh table: " + tableIdentifier.getId(), e); } + + partitionSyncTool.ifPresent(tool -> tool.syncPartitions(table, tableIdentifier)); } @Override @@ -194,7 +204,15 @@ public class HMSCatalogSyncClient implements CatalogSyncClient<Table> { } catch (MetaException | HiveException e) { throw new CatalogSyncException("HiveMetastoreClient could not be created", e); } - this.tableBuilder = HMSCatalogTableBuilderFactory.getInstance(tableFormat, this.configuration); + this.tableBuilder = + HMSCatalogTableBuilderFactory.getTableBuilder( + tableFormat, hmsCatalogConfig, this.configuration); + this.partitionSyncTool = + CatalogUtils.getPartitionSyncTool( + tableFormat, + hmsCatalogConfig.getPartitionExtractorClass(), + new HMSCatalogPartitionSyncOperations(metaStoreClient, hmsCatalogConfig), + configuration); } /** diff --git a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogTableBuilderFactory.java b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogTableBuilderFactory.java index 56c18279..43a77700 100644 --- a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogTableBuilderFactory.java +++ b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogTableBuilderFactory.java @@ -33,6 +33,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.xtable.catalog.CatalogTableBuilder; import org.apache.xtable.exception.NotSupportedException; import org.apache.xtable.hms.table.DeltaHMSCatalogTableBuilder; +import org.apache.xtable.hms.table.HudiHMSCatalogTableBuilder; import org.apache.xtable.hms.table.IcebergHMSCatalogTableBuilder; import org.apache.xtable.model.catalog.CatalogTableIdentifier; import org.apache.xtable.model.catalog.HierarchicalTableIdentifier; @@ -40,13 +41,15 @@ import org.apache.xtable.model.storage.TableFormat; public class HMSCatalogTableBuilderFactory { - static CatalogTableBuilder<Table, Table> getInstance( - String tableFormat, Configuration configuration) { + public static CatalogTableBuilder<Table, Table> getTableBuilder( + String tableFormat, HMSCatalogConfig hmsCatalogConfig, Configuration configuration) { switch (tableFormat) { case TableFormat.ICEBERG: return new IcebergHMSCatalogTableBuilder(configuration); case TableFormat.DELTA: return new DeltaHMSCatalogTableBuilder(); + case TableFormat.HUDI: + return new HudiHMSCatalogTableBuilder(hmsCatalogConfig, configuration); default: throw new NotSupportedException("Unsupported table format: " + tableFormat); } diff --git a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/table/HudiHMSCatalogTableBuilder.java b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/table/HudiHMSCatalogTableBuilder.java new file mode 100644 index 00000000..cc4dc3df --- /dev/null +++ b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/table/HudiHMSCatalogTableBuilder.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.hms.table; + +import static org.apache.xtable.catalog.CatalogUtils.toHierarchicalTableIdentifier; + +import java.io.IOException; +import java.time.Instant; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import lombok.extern.log4j.Log4j2; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.security.UserGroupInformation; + +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.ConfigUtils; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.xtable.catalog.CatalogTableBuilder; +import org.apache.xtable.exception.CatalogSyncException; +import org.apache.xtable.hms.HMSCatalogConfig; +import org.apache.xtable.hms.HMSSchemaExtractor; +import org.apache.xtable.hudi.HudiTableManager; +import org.apache.xtable.hudi.catalog.HudiCatalogTablePropertiesExtractor; +import org.apache.xtable.hudi.catalog.HudiInputFormatUtils; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; +import org.apache.xtable.model.catalog.HierarchicalTableIdentifier; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.storage.TableFormat; + +@Log4j2 +public class HudiHMSCatalogTableBuilder implements CatalogTableBuilder<Table, Table> { + + private final HudiTableManager hudiTableManager; + private final HMSSchemaExtractor schemaExtractor; + private final HMSCatalogConfig hmsCatalogConfig; + private final HudiCatalogTablePropertiesExtractor tablePropertiesExtractor; + + private HoodieTableMetaClient metaClient; + + protected static final String HUDI_METADATA_CONFIG = "hudi.metadata-listing-enabled"; + + public HudiHMSCatalogTableBuilder( + HMSCatalogConfig hmsCatalogConfig, Configuration configuration) { + this.hudiTableManager = HudiTableManager.of(configuration); + this.schemaExtractor = HMSSchemaExtractor.getInstance(); + this.hmsCatalogConfig = hmsCatalogConfig; + this.tablePropertiesExtractor = HudiCatalogTablePropertiesExtractor.getInstance(); + } + + @VisibleForTesting + HudiHMSCatalogTableBuilder( + HMSCatalogConfig hmsCatalogConfig, + HMSSchemaExtractor schemaExtractor, + HudiTableManager hudiTableManager, + HoodieTableMetaClient metaClient, + HudiCatalogTablePropertiesExtractor propertiesExtractor) { + this.hudiTableManager = hudiTableManager; + this.schemaExtractor = schemaExtractor; + this.metaClient = metaClient; + this.hmsCatalogConfig = hmsCatalogConfig; + this.tablePropertiesExtractor = propertiesExtractor; + } + + HoodieTableMetaClient getMetaClient(String basePath) { + if (metaClient == null) { + Optional<HoodieTableMetaClient> metaClientOpt = + hudiTableManager.loadTableMetaClientIfExists(basePath); + + if (!metaClientOpt.isPresent()) { + throw new CatalogSyncException( + "Failed to get meta client since table is not present in the base path " + basePath); + } + + metaClient = metaClientOpt.get(); + } + return metaClient; + } + + @Override + public Table getCreateTableRequest( + InternalTable table, CatalogTableIdentifier catalogTableIdentifier) { + HierarchicalTableIdentifier tableIdentifier = + toHierarchicalTableIdentifier(catalogTableIdentifier); + Table newTb = new Table(); + newTb.setDbName(tableIdentifier.getDatabaseName()); + newTb.setTableName(tableIdentifier.getTableName()); + try { + newTb.setOwner(UserGroupInformation.getCurrentUser().getShortUserName()); + } catch (IOException e) { + throw new CatalogSyncException( + "Failed to set owner for hms table: " + tableIdentifier.getTableName(), e); + } + + newTb.setCreateTime((int) Instant.now().toEpochMilli()); + Map<String, String> tableProperties = getTableProperties(table, table.getReadSchema()); + newTb.setParameters(tableProperties); + newTb.setSd(getStorageDescriptor(table)); + newTb.setPartitionKeys(getSchemaPartitionKeys(table)); + return newTb; + } + + @Override + public Table getUpdateTableRequest( + InternalTable table, Table hmsTable, CatalogTableIdentifier tableIdentifier) { + Map<String, String> parameters = hmsTable.getParameters(); + Map<String, String> tableParameters = hmsTable.getParameters(); + tableParameters.putAll(getTableProperties(table, table.getReadSchema())); + hmsTable.setParameters(tableParameters); + hmsTable.setSd(getStorageDescriptor(table)); + + hmsTable.setParameters(parameters); + hmsTable.getSd().setCols(schemaExtractor.toColumns(TableFormat.HUDI, table.getReadSchema())); + return hmsTable; + } + + private Map<String, String> getTableProperties(InternalTable table, InternalSchema schema) { + List<String> partitionFields = + table.getPartitioningFields().stream() + .map(field -> field.getSourceField().getName()) + .collect(Collectors.toList()); + Map<String, String> tableProperties = new HashMap<>(); + tableProperties.put(HUDI_METADATA_CONFIG, "true"); + Map<String, String> sparkTableProperties = + tablePropertiesExtractor.getSparkTableProperties( + partitionFields, "", hmsCatalogConfig.getSchemaLengthThreshold(), schema); + tableProperties.putAll(sparkTableProperties); + return tableProperties; + } + + private StorageDescriptor getStorageDescriptor(InternalTable table) { + final StorageDescriptor storageDescriptor = new StorageDescriptor(); + storageDescriptor.setCols(schemaExtractor.toColumns(TableFormat.HUDI, table.getReadSchema())); + storageDescriptor.setLocation(table.getBasePath()); + HoodieFileFormat fileFormat = + getMetaClient(table.getBasePath()).getTableConfig().getBaseFileFormat(); + String inputFormatClassName = HudiInputFormatUtils.getInputFormatClassName(fileFormat, false); + String outputFormatClassName = HudiInputFormatUtils.getOutputFormatClassName(fileFormat); + String serdeClassName = HudiInputFormatUtils.getSerDeClassName(fileFormat); + storageDescriptor.setInputFormat(inputFormatClassName); + storageDescriptor.setOutputFormat(outputFormatClassName); + Map<String, String> serdeProperties = getSerdeProperties(table.getBasePath()); + SerDeInfo serDeInfo = new SerDeInfo(); + serDeInfo.setSerializationLib(serdeClassName); + serDeInfo.setParameters(serdeProperties); + storageDescriptor.setSerdeInfo(serDeInfo); + return storageDescriptor; + } + + private static Map<String, String> getSerdeProperties(String basePath) { + Map<String, String> serdeProperties = new HashMap<>(); + serdeProperties.put(ConfigUtils.TABLE_SERDE_PATH, basePath); + serdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, String.valueOf(false)); + return serdeProperties; + } + + private List<FieldSchema> getSchemaPartitionKeys(InternalTable table) { + + List<InternalPartitionField> partitioningFields = table.getPartitioningFields(); + Map<String, FieldSchema> fieldSchemaMap = + schemaExtractor.toColumns(TableFormat.HUDI, table.getReadSchema()).stream() + .collect(Collectors.toMap(FieldSchema::getName, field -> field)); + + return partitioningFields.stream() + .map( + partitionField -> { + if (fieldSchemaMap.containsKey(partitionField.getSourceField().getName())) { + return fieldSchemaMap.get(partitionField.getSourceField().getName()); + } else { + return new FieldSchema(partitionField.getSourceField().getName(), "string", ""); + } + }) + .collect(Collectors.toList()); + } +} diff --git a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/HMSCatalogSyncClientTestBase.java b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/HMSCatalogSyncTestBase.java similarity index 95% rename from xtable-hive-metastore/src/test/java/org/apache/xtable/hms/HMSCatalogSyncClientTestBase.java rename to xtable-hive-metastore/src/test/java/org/apache/xtable/hms/HMSCatalogSyncTestBase.java index cd76c6be..62531e75 100644 --- a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/HMSCatalogSyncClientTestBase.java +++ b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/HMSCatalogSyncTestBase.java @@ -43,7 +43,7 @@ import org.apache.xtable.model.schema.PartitionTransformType; import org.apache.xtable.model.storage.CatalogType; import org.apache.xtable.model.storage.TableFormat; -public class HMSCatalogSyncClientTestBase { +public class HMSCatalogSyncTestBase { @Mock protected IMetaStoreClient mockMetaStoreClient; @Mock protected HMSCatalogConfig mockHMSCatalogConfig; @@ -139,6 +139,14 @@ public class HMSCatalogSyncClientTestBase { .readSchema(INTERNAL_SCHEMA) .partitioningFields(Collections.singletonList(PARTITION_FIELD)) .build(); + protected static final InternalTable TEST_EVOLVED_HUDI_INTERNAL_TABLE = + InternalTable.builder() + .basePath(TEST_BASE_PATH) + .tableFormat(TableFormat.HUDI) + .readSchema(UPDATED_INTERNAL_SCHEMA) + .partitioningFields(Collections.singletonList(PARTITION_FIELD)) + .build(); + protected static final ThreePartHierarchicalTableIdentifier TEST_CATALOG_TABLE_IDENTIFIER = new ThreePartHierarchicalTableIdentifier(TEST_HMS_DATABASE, TEST_HMS_TABLE); diff --git a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogPartitionSyncOperations.java b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogPartitionSyncOperations.java new file mode 100644 index 00000000..b1422d94 --- /dev/null +++ b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogPartitionSyncOperations.java @@ -0,0 +1,470 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.hms; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import lombok.SneakyThrows; + +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.thrift.TException; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.junit.jupiter.MockitoExtension; + +import org.apache.xtable.catalog.CatalogPartition; +import org.apache.xtable.catalog.CatalogPartitionSyncOperations; +import org.apache.xtable.exception.CatalogSyncException; + +@ExtendWith(MockitoExtension.class) +public class TestHMSCatalogPartitionSyncOperations extends HMSCatalogSyncTestBase { + + private CatalogPartitionSyncOperations hmsPartitionSyncOperations; + + void setupCommonMocks() { + hmsPartitionSyncOperations = + new HMSCatalogPartitionSyncOperations(mockMetaStoreClient, mockHMSCatalogConfig); + } + + @SneakyThrows + @Test + void testGetAllPartitions() { + setupCommonMocks(); + + Partition hivePartition1 = new Partition(); + hivePartition1.setValues(Collections.singletonList("value1")); + StorageDescriptor sd1 = new StorageDescriptor(); + sd1.setLocation("location1"); + hivePartition1.setSd(sd1); + + Partition hivePartition2 = new Partition(); + hivePartition2.setValues(Collections.singletonList("value2")); + StorageDescriptor sd2 = new StorageDescriptor(); + sd2.setLocation("location2"); + hivePartition2.setSd(sd2); + + List<Partition> hivePartitions = Arrays.asList(hivePartition1, hivePartition2); + when(mockMetaStoreClient.listPartitions( + TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), + TEST_CATALOG_TABLE_IDENTIFIER.getTableName(), + (short) -1)) + .thenReturn(hivePartitions); + List<CatalogPartition> partitions = + hmsPartitionSyncOperations.getAllPartitions(TEST_CATALOG_TABLE_IDENTIFIER); + + assertEquals(2, partitions.size()); + assertEquals("location1", partitions.get(0).getStorageLocation()); + assertEquals(1, partitions.get(0).getValues().size()); + assertEquals("value1", partitions.get(0).getValues().get(0)); + assertEquals("location2", partitions.get(1).getStorageLocation()); + assertEquals(1, partitions.get(1).getValues().size()); + assertEquals("value2", partitions.get(1).getValues().get(0)); + + verify(mockMetaStoreClient, times(1)) + .listPartitions( + TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), + TEST_CATALOG_TABLE_IDENTIFIER.getTableName(), + (short) -1); + } + + @Test + void testAddPartitionsToTableSuccess() throws Exception { + setupCommonMocks(); + when(mockHMSCatalogConfig.getMaxPartitionsPerRequest()).thenReturn(100); + CatalogPartition partition1 = + new CatalogPartition(Collections.singletonList("value1"), "location1"); + CatalogPartition partition2 = + new CatalogPartition(Collections.singletonList("value2"), "location2"); + List<CatalogPartition> partitionsToAdd = Arrays.asList(partition1, partition2); + + StorageDescriptor tableSd = new StorageDescriptor(); + tableSd.setCols(Collections.emptyList()); + tableSd.setInputFormat("inputFormat"); + tableSd.setOutputFormat("outputFormat"); + tableSd.setSerdeInfo(new SerDeInfo()); + + Table table = new Table(); + table.setSd(tableSd); + + when(mockMetaStoreClient.getTable( + TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), + TEST_CATALOG_TABLE_IDENTIFIER.getTableName())) + .thenReturn(table); + + // Execute the method + hmsPartitionSyncOperations.addPartitionsToTable(TEST_CATALOG_TABLE_IDENTIFIER, partitionsToAdd); + + // Verify behavior + ArgumentCaptor<List<Partition>> partitionCaptor = ArgumentCaptor.forClass(List.class); + + verify(mockMetaStoreClient, times(1)) + .add_partitions(partitionCaptor.capture(), eq(true), eq(false)); + + // Validate the captured partitions + List<Partition> capturedPartitions = partitionCaptor.getValue(); + assertEquals(2, capturedPartitions.size()); + + Partition capturedPartition1 = capturedPartitions.get(0); + assertEquals(partition1.getValues(), capturedPartition1.getValues()); + assertEquals(partition1.getStorageLocation(), capturedPartition1.getSd().getLocation()); + + Partition capturedPartition2 = capturedPartitions.get(1); + assertEquals(partition2.getValues(), capturedPartition2.getValues()); + assertEquals(partition2.getStorageLocation(), capturedPartition2.getSd().getLocation()); + } + + @Test + void testAddPartitionsToTableThrowsException() throws Exception { + setupCommonMocks(); + List<CatalogPartition> partitionsToAdd = + Collections.singletonList( + new CatalogPartition(Collections.singletonList("value1"), "location1")); + + when(mockMetaStoreClient.getTable( + TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), + TEST_CATALOG_TABLE_IDENTIFIER.getTableName())) + .thenThrow(new TException("Test exception")); + + // Execute and validate exception + CatalogSyncException exception = + assertThrows( + CatalogSyncException.class, + () -> + hmsPartitionSyncOperations.addPartitionsToTable( + TEST_CATALOG_TABLE_IDENTIFIER, partitionsToAdd)); + + assertInstanceOf(TException.class, exception.getCause()); + verify(mockMetaStoreClient, times(1)) + .getTable( + TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), + TEST_CATALOG_TABLE_IDENTIFIER.getTableName()); + verify(mockMetaStoreClient, never()).add_partitions(anyList(), anyBoolean(), anyBoolean()); + } + + @Test + void testUpdatePartitionsToTableSuccess() throws Exception { + setupCommonMocks(); + + CatalogPartition changedPartition1 = + new CatalogPartition(Collections.singletonList("value1"), "location1"); + CatalogPartition changedPartition2 = + new CatalogPartition(Collections.singletonList("value2"), "location2"); + List<CatalogPartition> changedPartitions = Arrays.asList(changedPartition1, changedPartition2); + + StorageDescriptor tableSd = new StorageDescriptor(); + tableSd.setCols(Collections.emptyList()); + tableSd.setInputFormat("inputFormat"); + tableSd.setOutputFormat("outputFormat"); + tableSd.setSerdeInfo(new SerDeInfo()); + + Table table = new Table(); + table.setSd(tableSd); + + when(mockMetaStoreClient.getTable( + TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), + TEST_CATALOG_TABLE_IDENTIFIER.getTableName())) + .thenReturn(table); + + // Execute the method + hmsPartitionSyncOperations.updatePartitionsToTable( + TEST_CATALOG_TABLE_IDENTIFIER, changedPartitions); + + // Capture calls to dropPartition and add_partition + ArgumentCaptor<List<Partition>> partitionCaptor = ArgumentCaptor.forClass(List.class); + verify(mockMetaStoreClient, times(1)) + .alter_partitions( + eq(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName()), + eq(TEST_CATALOG_TABLE_IDENTIFIER.getTableName()), + partitionCaptor.capture()); + List<Partition> capturedPartitions = partitionCaptor.getValue(); + + assertEquals(2, capturedPartitions.size()); + + Partition capturedPartition1 = capturedPartitions.get(0); + assertEquals(changedPartition1.getValues(), capturedPartition1.getValues()); + assertEquals(changedPartition1.getStorageLocation(), capturedPartition1.getSd().getLocation()); + + Partition capturedPartition2 = capturedPartitions.get(1); + assertEquals(changedPartition2.getValues(), capturedPartition2.getValues()); + assertEquals(changedPartition2.getStorageLocation(), capturedPartition2.getSd().getLocation()); + } + + @Test + void testUpdatePartitionsToTableThrowsException() throws Exception { + setupCommonMocks(); + CatalogPartition changedPartition = + new CatalogPartition(Collections.singletonList("value1"), "location1"); + + when(mockMetaStoreClient.getTable( + TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), + TEST_CATALOG_TABLE_IDENTIFIER.getTableName())) + .thenThrow(new TException("Test exception")); + + // Execute and validate exception + CatalogSyncException exception = + assertThrows( + CatalogSyncException.class, + () -> + hmsPartitionSyncOperations.updatePartitionsToTable( + TEST_CATALOG_TABLE_IDENTIFIER, Collections.singletonList(changedPartition))); + + assertInstanceOf(TException.class, exception.getCause()); + + verify(mockMetaStoreClient, times(1)) + .getTable( + TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), + TEST_CATALOG_TABLE_IDENTIFIER.getTableName()); + verify(mockMetaStoreClient, never()).alter_partitions(anyString(), anyString(), anyList()); + } + + @Test + void testDropPartitionsSuccess() throws Exception { + setupCommonMocks(); + + CatalogPartition partition1 = + new CatalogPartition(Collections.singletonList("value1"), "location1"); + CatalogPartition partition2 = + new CatalogPartition(Collections.singletonList("value2"), "location2"); + List<CatalogPartition> partitionsToDrop = Arrays.asList(partition1, partition2); + + // Execute the method + hmsPartitionSyncOperations.dropPartitions(TEST_CATALOG_TABLE_IDENTIFIER, partitionsToDrop); + + // Capture calls to dropPartition + ArgumentCaptor<List<String>> partitionValuesCaptor = ArgumentCaptor.forClass(List.class); + + verify(mockMetaStoreClient, times(2)) + .dropPartition( + eq(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName()), + eq(TEST_CATALOG_TABLE_IDENTIFIER.getTableName()), + partitionValuesCaptor.capture(), + eq(false)); + + // Validate captured arguments + List<List<String>> capturedPartitionValues = partitionValuesCaptor.getAllValues(); + assertEquals(2, capturedPartitionValues.size()); + assertEquals(partition1.getValues(), capturedPartitionValues.get(0)); + assertEquals(partition2.getValues(), capturedPartitionValues.get(1)); + } + + @Test + void testDropPartitionsEmptyList() throws Exception { + setupCommonMocks(); + List<CatalogPartition> partitionsToDrop = Collections.emptyList(); + + hmsPartitionSyncOperations.dropPartitions(TEST_CATALOG_TABLE_IDENTIFIER, partitionsToDrop); + + // Verify no calls to dropPartition + verify(mockMetaStoreClient, never()) + .dropPartition(anyString(), anyString(), anyList(), anyBoolean()); + } + + @Test + void testDropPartitionsThrowsException() throws Exception { + setupCommonMocks(); + + CatalogPartition partition1 = + new CatalogPartition(Collections.singletonList("value1"), "location1"); + List<CatalogPartition> partitionsToDrop = Collections.singletonList(partition1); + + doThrow(new TException("Test exception")) + .when(mockMetaStoreClient) + .dropPartition( + TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), + TEST_CATALOG_TABLE_IDENTIFIER.getTableName(), + partition1.getValues(), + false); + + // Execute and validate exception + CatalogSyncException exception = + assertThrows( + CatalogSyncException.class, + () -> + hmsPartitionSyncOperations.dropPartitions( + TEST_CATALOG_TABLE_IDENTIFIER, partitionsToDrop)); + + assertInstanceOf(TException.class, exception.getCause()); + + // Verify dropPartition call is made once + verify(mockMetaStoreClient, times(1)) + .dropPartition( + TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), + TEST_CATALOG_TABLE_IDENTIFIER.getTableName(), + partition1.getValues(), + false); + } + + @Test + void testGetTablePropertiesSuccess() throws Exception { + setupCommonMocks(); + + List<String> lastSyncedKeys = Arrays.asList("key1", "key2", "key3"); + + Map<String, String> mockParameters = new HashMap<>(); + mockParameters.put("key1", "value1"); + mockParameters.put("key2", "value2"); + mockParameters.put("irrelevantKey", "irrelevantKey"); + + Table mockTable = new Table(); + mockTable.setParameters(mockParameters); + + when(mockMetaStoreClient.getTable( + TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), + TEST_CATALOG_TABLE_IDENTIFIER.getTableName())) + .thenReturn(mockTable); + + // Execute the method + Map<String, String> result = + hmsPartitionSyncOperations.getTableProperties( + TEST_CATALOG_TABLE_IDENTIFIER, lastSyncedKeys); + + // Validate the result + assertEquals(2, result.size()); + assertEquals("value1", result.get("key1")); + assertEquals("value2", result.get("key2")); + assertNull(result.get("key3")); // key3 is not in mockParameters + } + + @Test + void testGetTablePropertiesThrowsException() throws Exception { + setupCommonMocks(); + + List<String> lastSyncedKeys = Arrays.asList("key1", "key2"); + + when(mockMetaStoreClient.getTable( + TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), + TEST_CATALOG_TABLE_IDENTIFIER.getTableName())) + .thenThrow(new TException("Test exception")); + + CatalogSyncException exception = + assertThrows( + CatalogSyncException.class, + () -> + hmsPartitionSyncOperations.getTableProperties( + TEST_CATALOG_TABLE_IDENTIFIER, lastSyncedKeys)); + + assertInstanceOf(TException.class, exception.getCause()); + } + + @Test + void testUpdateTablePropertiesSuccess() throws Exception { + setupCommonMocks(); + + Map<String, String> lastTimeSyncedProperties = new HashMap<>(); + lastTimeSyncedProperties.put("last_synced_time", "2023-12-01T12:00:00Z"); + lastTimeSyncedProperties.put("last_modified_by", "user123"); + + Map<String, String> existingParameters = new HashMap<>(); + existingParameters.put("existing_key", "existing_value"); + + Table mockTable = new Table(); + mockTable.setParameters(existingParameters); + + when(mockMetaStoreClient.getTable( + TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), + TEST_CATALOG_TABLE_IDENTIFIER.getTableName())) + .thenReturn(mockTable); + + // Execute the method + hmsPartitionSyncOperations.updateTableProperties( + TEST_CATALOG_TABLE_IDENTIFIER, lastTimeSyncedProperties); + + // Verify behavior + verify(mockMetaStoreClient, times(1)) + .alter_table( + eq(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName()), + eq(TEST_CATALOG_TABLE_IDENTIFIER.getTableName()), + eq(mockTable)); + + // Validate updated parameters + Map<String, String> updatedParameters = mockTable.getParameters(); + assertEquals(3, updatedParameters.size()); + assertEquals("2023-12-01T12:00:00Z", updatedParameters.get("last_synced_time")); + assertEquals("user123", updatedParameters.get("last_modified_by")); + assertEquals("existing_value", updatedParameters.get("existing_key")); + } + + @Test + void testUpdateTablePropertiesNoChanges() throws Exception { + setupCommonMocks(); + + // Empty properties map + Map<String, String> lastTimeSyncedProperties = Collections.emptyMap(); + + // Execute the method + hmsPartitionSyncOperations.updateTableProperties( + TEST_CATALOG_TABLE_IDENTIFIER, lastTimeSyncedProperties); + + // Verify no calls to MetaStoreClient + verify(mockMetaStoreClient, never()).getTable(anyString(), anyString()); + verify(mockMetaStoreClient, never()).alter_table(anyString(), anyString(), any()); + } + + @Test + void testUpdateTablePropertiesThrowsException() throws Exception { + setupCommonMocks(); + + Map<String, String> lastTimeSyncedProperties = + Collections.singletonMap("last_synced_time", "2023-12-01T12:00:00Z"); + + when(mockMetaStoreClient.getTable( + TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), + TEST_CATALOG_TABLE_IDENTIFIER.getTableName())) + .thenThrow(new TException("Test exception")); + + CatalogSyncException exception = + assertThrows( + CatalogSyncException.class, + () -> + hmsPartitionSyncOperations.updateTableProperties( + TEST_CATALOG_TABLE_IDENTIFIER, lastTimeSyncedProperties)); + + assertInstanceOf(TException.class, exception.getCause()); + + // Verify no alter table calls are made + verify(mockMetaStoreClient, times(1)) + .getTable( + TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), + TEST_CATALOG_TABLE_IDENTIFIER.getTableName()); + verify(mockMetaStoreClient, never()).alter_table(anyString(), anyString(), any()); + } +} diff --git a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogSyncClient.java b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogSyncClient.java index fad22ad2..9fae7eb2 100644 --- a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogSyncClient.java +++ b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogSyncClient.java @@ -26,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.never; @@ -37,6 +38,7 @@ import java.time.Instant; import java.time.ZoneId; import java.time.ZonedDateTime; import java.util.Collections; +import java.util.Optional; import java.util.ServiceLoader; import lombok.SneakyThrows; @@ -54,6 +56,7 @@ import org.mockito.MockedStatic; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; +import org.apache.xtable.catalog.CatalogPartitionSyncTool; import org.apache.xtable.catalog.CatalogTableBuilder; import org.apache.xtable.exception.CatalogSyncException; import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier; @@ -61,29 +64,29 @@ import org.apache.xtable.model.storage.CatalogType; import org.apache.xtable.spi.sync.CatalogSyncClient; @ExtendWith(MockitoExtension.class) -public class TestHMSCatalogSyncClient extends HMSCatalogSyncClientTestBase { +public class TestHMSCatalogSyncClient extends HMSCatalogSyncTestBase { @Mock private CatalogTableBuilder<Table, Table> mockTableBuilder; + @Mock private CatalogPartitionSyncTool mockPartitionSyncTool; private HMSCatalogSyncClient hmsCatalogSyncClient; - private HMSCatalogSyncClient createHMSCatalogSyncClient() { + private HMSCatalogSyncClient createHMSCatalogSyncClient(boolean includePartitionSyncTool) { + Optional<CatalogPartitionSyncTool> partitionSyncToolOpt = + includePartitionSyncTool ? Optional.of(mockPartitionSyncTool) : Optional.empty(); return new HMSCatalogSyncClient( TEST_CATALOG_CONFIG, mockHMSCatalogConfig, testConfiguration, mockMetaStoreClient, - mockTableBuilder); - } - - void setupCommonMocks() { - hmsCatalogSyncClient = createHMSCatalogSyncClient(); + mockTableBuilder, + partitionSyncToolOpt); } @SneakyThrows @ParameterizedTest @ValueSource(booleans = {true, false}) void testHasDatabase(boolean isDbPresent) { - setupCommonMocks(); + hmsCatalogSyncClient = createHMSCatalogSyncClient(false); Database db = new Database(TEST_HMS_DATABASE, null, null, Collections.emptyMap()); if (isDbPresent) { when(mockMetaStoreClient.getDatabase(TEST_HMS_DATABASE)).thenReturn(db); @@ -103,7 +106,7 @@ public class TestHMSCatalogSyncClient extends HMSCatalogSyncClientTestBase { @SneakyThrows @Test void testHasDatabaseFailure() { - setupCommonMocks(); + hmsCatalogSyncClient = createHMSCatalogSyncClient(false); when(mockMetaStoreClient.getDatabase(TEST_HMS_DATABASE)) .thenThrow(new TException("something went wrong")); CatalogSyncException exception = @@ -119,7 +122,7 @@ public class TestHMSCatalogSyncClient extends HMSCatalogSyncClientTestBase { @ParameterizedTest @ValueSource(booleans = {true, false}) void testGetTable(boolean isTablePresent) { - setupCommonMocks(); + hmsCatalogSyncClient = createHMSCatalogSyncClient(false); Table table = newTable(TEST_HMS_DATABASE, TEST_HMS_TABLE); if (isTablePresent) { when(mockMetaStoreClient.getTable(TEST_HMS_DATABASE, TEST_HMS_TABLE)).thenReturn(table); @@ -139,7 +142,7 @@ public class TestHMSCatalogSyncClient extends HMSCatalogSyncClientTestBase { @SneakyThrows @Test void testGetTableFailure() { - setupCommonMocks(); + hmsCatalogSyncClient = createHMSCatalogSyncClient(false); when(mockMetaStoreClient.getTable(TEST_HMS_DATABASE, TEST_HMS_TABLE)) .thenThrow(new TException("something went wrong")); CatalogSyncException exception = @@ -156,7 +159,7 @@ public class TestHMSCatalogSyncClient extends HMSCatalogSyncClientTestBase { @ParameterizedTest @ValueSource(booleans = {false, true}) void testCreateDatabase(boolean shouldFail) { - setupCommonMocks(); + hmsCatalogSyncClient = createHMSCatalogSyncClient(false); Database database = newDatabase(TEST_HMS_DATABASE); if (shouldFail) { Mockito.doThrow(new TException("something went wrong")) @@ -179,7 +182,7 @@ public class TestHMSCatalogSyncClient extends HMSCatalogSyncClientTestBase { @ParameterizedTest @ValueSource(booleans = {false, true}) void testDropTable(boolean shouldFail) { - setupCommonMocks(); + hmsCatalogSyncClient = createHMSCatalogSyncClient(false); if (shouldFail) { Mockito.doThrow(new TException("something went wrong")) .when(mockMetaStoreClient) @@ -200,9 +203,10 @@ public class TestHMSCatalogSyncClient extends HMSCatalogSyncClientTestBase { } @SneakyThrows - @Test - void testCreateTable_Success() { - setupCommonMocks(); + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testCreateTable_Success(boolean syncPartitions) { + hmsCatalogSyncClient = createHMSCatalogSyncClient(syncPartitions); Table testTable = new Table(); when(mockTableBuilder.getCreateTableRequest( TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER)) @@ -211,12 +215,20 @@ public class TestHMSCatalogSyncClient extends HMSCatalogSyncClientTestBase { verify(mockMetaStoreClient, times(1)).createTable(testTable); verify(mockTableBuilder, times(1)) .getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER); + if (syncPartitions) { + verify(mockPartitionSyncTool, times(1)) + .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE), eq(TEST_CATALOG_TABLE_IDENTIFIER)); + } else { + verify(mockPartitionSyncTool, never()) + .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE), eq(TEST_CATALOG_TABLE_IDENTIFIER)); + } } @SneakyThrows - @Test - void testCreateTable_ErrorGettingTableInput() { - setupCommonMocks(); + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testCreateTable_ErrorGettingTableInput(boolean syncPartitions) { + hmsCatalogSyncClient = createHMSCatalogSyncClient(syncPartitions); // error when getting iceberg table input doThrow(new RuntimeException("something went wrong")) @@ -230,12 +242,15 @@ public class TestHMSCatalogSyncClient extends HMSCatalogSyncClientTestBase { verify(mockTableBuilder, times(1)) .getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER); verify(mockMetaStoreClient, never()).createTable(any()); + verify(mockPartitionSyncTool, never()) + .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE), eq(TEST_CATALOG_TABLE_IDENTIFIER)); } @SneakyThrows - @Test - void testCreateTable_ErrorCreatingTable() { - setupCommonMocks(); + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testCreateTable_ErrorCreatingTable(boolean syncPartitions) { + hmsCatalogSyncClient = createHMSCatalogSyncClient(syncPartitions); // error when creating table Table testTable = new Table(); @@ -257,12 +272,15 @@ public class TestHMSCatalogSyncClient extends HMSCatalogSyncClientTestBase { verify(mockTableBuilder, times(1)) .getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER); verify(mockMetaStoreClient, times(1)).createTable(testTable); + verify(mockPartitionSyncTool, never()) + .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE), eq(TEST_CATALOG_TABLE_IDENTIFIER)); } @SneakyThrows - @Test - void testRefreshTable_Success() { - setupCommonMocks(); + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testRefreshTable_Success(boolean syncPartitions) { + hmsCatalogSyncClient = createHMSCatalogSyncClient(syncPartitions); Table origTable = new Table(); Table updatedTable = new Table(origTable); updatedTable.putToParameters(METADATA_LOCATION_PROP, ICEBERG_METADATA_FILE_LOCATION_V2); @@ -276,12 +294,20 @@ public class TestHMSCatalogSyncClient extends HMSCatalogSyncClientTestBase { verify(mockTableBuilder, times(1)) .getUpdateTableRequest( TEST_ICEBERG_INTERNAL_TABLE, origTable, TEST_CATALOG_TABLE_IDENTIFIER); + if (syncPartitions) { + verify(mockPartitionSyncTool, times(1)) + .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE), eq(TEST_CATALOG_TABLE_IDENTIFIER)); + } else { + verify(mockPartitionSyncTool, never()) + .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE), eq(TEST_CATALOG_TABLE_IDENTIFIER)); + } } @SneakyThrows - @Test - void testRefreshTable_ErrorGettingUpdatedTable() { - setupCommonMocks(); + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testRefreshTable_ErrorGettingUpdatedTable(boolean syncPartitions) { + hmsCatalogSyncClient = createHMSCatalogSyncClient(syncPartitions); // error when getting iceberg table input Table testTable = new Table(); @@ -298,12 +324,15 @@ public class TestHMSCatalogSyncClient extends HMSCatalogSyncClientTestBase { .getUpdateTableRequest( TEST_ICEBERG_INTERNAL_TABLE, testTable, TEST_CATALOG_TABLE_IDENTIFIER); verify(mockMetaStoreClient, never()).alter_table(any(), any(), any()); + verify(mockPartitionSyncTool, never()) + .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE), eq(TEST_CATALOG_TABLE_IDENTIFIER)); } @SneakyThrows - @Test - void testRefreshTable_ErrorRefreshingTable() { - setupCommonMocks(); + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testRefreshTable_ErrorRefreshingTable(boolean syncPartitions) { + hmsCatalogSyncClient = createHMSCatalogSyncClient(syncPartitions); // error when creating table Table origTable = new Table(); @@ -329,12 +358,14 @@ public class TestHMSCatalogSyncClient extends HMSCatalogSyncClientTestBase { TEST_ICEBERG_INTERNAL_TABLE, origTable, TEST_CATALOG_TABLE_IDENTIFIER); verify(mockMetaStoreClient, times(1)) .alter_table(TEST_HMS_DATABASE, TEST_HMS_TABLE, updatedTable); + verify(mockPartitionSyncTool, never()) + .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE), eq(TEST_CATALOG_TABLE_IDENTIFIER)); } @SneakyThrows @Test void testCreateOrReplaceTable() { - setupCommonMocks(); + hmsCatalogSyncClient = createHMSCatalogSyncClient(false); ZonedDateTime zonedDateTime = Instant.ofEpochMilli(System.currentTimeMillis()).atZone(ZoneId.systemDefault()); diff --git a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogTableBuilderFactory.java b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogTableBuilderFactory.java index 45d45a2d..31ea33c9 100644 --- a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogTableBuilderFactory.java +++ b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogTableBuilderFactory.java @@ -18,10 +18,10 @@ package org.apache.xtable.hms; -import static org.apache.xtable.hms.HMSCatalogSyncClientTestBase.FIELD_SCHEMA; -import static org.apache.xtable.hms.HMSCatalogSyncClientTestBase.TEST_CATALOG_TABLE_IDENTIFIER; -import static org.apache.xtable.hms.HMSCatalogSyncClientTestBase.TEST_HMS_DATABASE; -import static org.apache.xtable.hms.HMSCatalogSyncClientTestBase.TEST_HMS_TABLE; +import static org.apache.xtable.hms.HMSCatalogSyncTestBase.FIELD_SCHEMA; +import static org.apache.xtable.hms.HMSCatalogSyncTestBase.TEST_CATALOG_TABLE_IDENTIFIER; +import static org.apache.xtable.hms.HMSCatalogSyncTestBase.TEST_HMS_DATABASE; +import static org.apache.xtable.hms.HMSCatalogSyncTestBase.TEST_HMS_TABLE; import static org.apache.xtable.hms.table.TestIcebergHMSCatalogTableBuilder.getTestHmsTableParameters; import static org.apache.xtable.hms.table.TestIcebergHMSCatalogTableBuilder.getTestHmsTableStorageDescriptor; import static org.junit.jupiter.api.Assertions.assertEquals; diff --git a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestDeltaHMSCatalogTableBuilder.java b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestDeltaHMSCatalogTableBuilder.java index 1a7d0cb4..4e799915 100644 --- a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestDeltaHMSCatalogTableBuilder.java +++ b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestDeltaHMSCatalogTableBuilder.java @@ -38,11 +38,11 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; -import org.apache.xtable.hms.HMSCatalogSyncClientTestBase; +import org.apache.xtable.hms.HMSCatalogSyncTestBase; import org.apache.xtable.model.storage.TableFormat; @ExtendWith(MockitoExtension.class) -public class TestDeltaHMSCatalogTableBuilder extends HMSCatalogSyncClientTestBase { +public class TestDeltaHMSCatalogTableBuilder extends HMSCatalogSyncTestBase { private DeltaHMSCatalogTableBuilder mockDeltaHmsCatalogSyncRequestProvider; diff --git a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestHudiHMSCatalogTableBuilder.java b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestHudiHMSCatalogTableBuilder.java new file mode 100644 index 00000000..adf88c99 --- /dev/null +++ b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestHudiHMSCatalogTableBuilder.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.hms.table; + +import static org.apache.xtable.hms.table.HudiHMSCatalogTableBuilder.HUDI_METADATA_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.hadoop.hive.metastore.api.Table; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; + +import org.apache.xtable.hms.HMSCatalogSyncTestBase; +import org.apache.xtable.hms.HMSSchemaExtractor; +import org.apache.xtable.hudi.HudiTableManager; +import org.apache.xtable.hudi.catalog.HudiCatalogTablePropertiesExtractor; + +@ExtendWith(MockitoExtension.class) +public class TestHudiHMSCatalogTableBuilder extends HMSCatalogSyncTestBase { + + @Mock private HoodieTableMetaClient mockMetaClient; + @Mock private HudiTableManager mockHudiTableManager; + @Mock private HoodieTableConfig mockTableConfig; + @Mock private HudiCatalogTablePropertiesExtractor mockTablePropertiesExtractor; + + private HudiHMSCatalogTableBuilder hudiHMSCatalogTableBuilder; + + private HudiHMSCatalogTableBuilder createMockHudiHMSCatalogSyncRequestProvider() { + return new HudiHMSCatalogTableBuilder( + mockHMSCatalogConfig, + HMSSchemaExtractor.getInstance(), + mockHudiTableManager, + mockMetaClient, + mockTablePropertiesExtractor); + } + + void setupCommonMocks() { + hudiHMSCatalogTableBuilder = createMockHudiHMSCatalogSyncRequestProvider(); + when(mockHMSCatalogConfig.getSchemaLengthThreshold()).thenReturn(1000); + } + + void setupMetaClientMocks() { + when(mockTableConfig.getBaseFileFormat()).thenReturn(HoodieFileFormat.PARQUET); + when(mockMetaClient.getTableConfig()).thenReturn(mockTableConfig); + } + + @Test + void testGetCreateTableInput() { + setupCommonMocks(); + setupMetaClientMocks(); + + List<String> partitionFields = + TEST_HUDI_INTERNAL_TABLE.getPartitioningFields().stream() + .map(partitionField -> partitionField.getSourceField().getName()) + .collect(Collectors.toList()); + + Table table = + hudiHMSCatalogTableBuilder.getCreateTableRequest( + TEST_HUDI_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER); + + ArgumentCaptor<List<String>> partitionsCaptor = ArgumentCaptor.forClass(List.class); + verify(mockTablePropertiesExtractor) + .getSparkTableProperties( + partitionsCaptor.capture(), + eq(""), + any(Integer.class), + eq(TEST_HUDI_INTERNAL_TABLE.getReadSchema())); + assertEquals(partitionFields.size(), partitionsCaptor.getValue().size()); + assertEquals(partitionFields, partitionsCaptor.getValue()); + assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getTableName(), table.getTableName()); + assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), table.getDbName()); + assertEquals(3, table.getSd().getCols().size()); + assertEquals(1, table.getPartitionKeys().size()); + assertNotNull(table.getParameters()); + assertFalse(table.getParameters().isEmpty()); + assertEquals(table.getParameters().get(HUDI_METADATA_CONFIG), "true"); + } + + @Test + void testGetUpdateTableInput() { + setupCommonMocks(); + setupMetaClientMocks(); + + List<String> partitionFields = + TEST_HUDI_INTERNAL_TABLE.getPartitioningFields().stream() + .map(partitionField -> partitionField.getSourceField().getName()) + .collect(Collectors.toList()); + Table table = + hudiHMSCatalogTableBuilder.getCreateTableRequest( + TEST_HUDI_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER); + Table updatedTable = + hudiHMSCatalogTableBuilder.getUpdateTableRequest( + TEST_EVOLVED_HUDI_INTERNAL_TABLE, table, TEST_CATALOG_TABLE_IDENTIFIER); + + ArgumentCaptor<List<String>> partitionsCaptor = ArgumentCaptor.forClass(List.class); + verify(mockTablePropertiesExtractor) + .getSparkTableProperties( + partitionsCaptor.capture(), + eq(""), + any(Integer.class), + eq(TEST_HUDI_INTERNAL_TABLE.getReadSchema())); + assertEquals(partitionFields.size(), partitionsCaptor.getValue().size()); + assertEquals(partitionFields, partitionsCaptor.getValue()); + assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getTableName(), updatedTable.getTableName()); + assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), updatedTable.getDbName()); + assertEquals(4, updatedTable.getSd().getCols().size()); + assertEquals(1, updatedTable.getPartitionKeys().size()); + assertNotNull(updatedTable.getParameters()); + assertFalse(table.getParameters().isEmpty()); + assertEquals(table.getParameters().get(HUDI_METADATA_CONFIG), "true"); + } +} diff --git a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestIcebergHMSCatalogTableBuilder.java b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestIcebergHMSCatalogTableBuilder.java index 14d39c44..b1e8df77 100644 --- a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestIcebergHMSCatalogTableBuilder.java +++ b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestIcebergHMSCatalogTableBuilder.java @@ -52,11 +52,11 @@ import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableOperations; import org.apache.iceberg.hadoop.HadoopTables; -import org.apache.xtable.hms.HMSCatalogSyncClientTestBase; +import org.apache.xtable.hms.HMSCatalogSyncTestBase; import org.apache.xtable.hms.HMSSchemaExtractor; @ExtendWith(MockitoExtension.class) -public class TestIcebergHMSCatalogTableBuilder extends HMSCatalogSyncClientTestBase { +public class TestIcebergHMSCatalogTableBuilder extends HMSCatalogSyncTestBase { @Mock private HadoopTables mockIcebergHadoopTables; @Mock private BaseTable mockIcebergBaseTable; diff --git a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java index 349b5ca9..899365e4 100644 --- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java +++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java @@ -64,6 +64,7 @@ import org.apache.xtable.model.catalog.CatalogTableIdentifier; import org.apache.xtable.model.catalog.HierarchicalTableIdentifier; import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier; import org.apache.xtable.model.storage.CatalogType; +import org.apache.xtable.model.storage.TableFormat; import org.apache.xtable.model.sync.SyncMode; import org.apache.xtable.reflection.ReflectionUtils; import org.apache.xtable.spi.extractor.CatalogConversionSource; @@ -153,7 +154,9 @@ public class RunCatalogSync { TargetTable targetTable = TargetTable.builder() .name(sourceTable.getName()) - .basePath(sourceTable.getBasePath()) + .basePath( + getSourceTableLocation( + targetCatalogTableIdentifier.getTableFormat(), sourceTable)) .namespace(sourceTable.getNamespace()) .formatName(targetCatalogTableIdentifier.getTableFormat()) .additionalProperties(sourceTable.getAdditionalProperties()) @@ -228,15 +231,25 @@ public class RunCatalogSync { .additionalProperties(sourceProperties) .build(); } else if (catalogConversionSource.isPresent()) { + TableIdentifier tableIdentifier = sourceTableIdentifier.getTableIdentifier(); sourceTable = - catalogConversionSource - .get() - .getSourceTable( - getCatalogTableIdentifier(sourceTableIdentifier.getTableIdentifier())); + catalogConversionSource.get().getSourceTable(getCatalogTableIdentifier(tableIdentifier)); + if (tableIdentifier.getPartitionSpec() != null) { + sourceTable + .getAdditionalProperties() + .put(HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG, tableIdentifier.getPartitionSpec()); + } } return sourceTable; } + static String getSourceTableLocation(String targetTableFormat, SourceTable sourceTable) { + return sourceTable.getFormatName().equals(TableFormat.ICEBERG) + && targetTableFormat.equals(TableFormat.HUDI) + ? sourceTable.getDataPath() + : sourceTable.getBasePath(); + } + static Map<String, ConversionSourceProvider> getConversionSourceProviders( List<String> tableFormats, TableFormatConverters tableFormatConverters, @@ -345,6 +358,8 @@ public class RunCatalogSync { * HierarchicalTableIdentifier} */ String hierarchicalId; + /** Specifies the partition spec of the table */ + String partitionSpec; } /** diff --git a/xtable-utilities/src/test/resources/catalogConfig.yaml b/xtable-utilities/src/test/resources/catalogConfig.yaml index 05b2df4b..88cff2b6 100644 --- a/xtable-utilities/src/test/resources/catalogConfig.yaml +++ b/xtable-utilities/src/test/resources/catalogConfig.yaml @@ -45,6 +45,7 @@ datasets: - sourceCatalogTableIdentifier: tableIdentifier: hierarchicalId: "source-database-1.source-1" + partitionSpec: "cs_sold_date_sk:VALUE" targetCatalogTableIdentifiers: - catalogId: "target-1" tableFormat: "DELTA"