This is an automated email from the ASF dual-hosted git repository.
vinish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/main by this push:
new 2e71e156 [590] Changes in xtable-core for Hudi Catalog Sync
2e71e156 is described below
commit 2e71e156667f9304442e834410e95abd03837fc6
Author: Vamsi <[email protected]>
AuthorDate: Tue Feb 11 20:00:00 2025 +0530
[590] Changes in xtable-core for Hudi Catalog Sync
---
xtable-core/pom.xml | 6 +
.../{CatalogUtils.java => CatalogPartition.java} | 36 +-
.../xtable/catalog/CatalogPartitionEvent.java | 49 +++
.../catalog/CatalogPartitionSyncOperations.java | 99 +++++
...logUtils.java => CatalogPartitionSyncTool.java} | 31 +-
.../org/apache/xtable/catalog/CatalogUtils.java | 31 ++
.../apache/xtable/delta/DeltaConversionTarget.java | 3 +-
.../apache/xtable/delta/DeltaSchemaExtractor.java | 96 -----
.../xtable/exception/CatalogSyncException.java | 4 +
.../org/apache/xtable/hudi/HudiTableManager.java | 4 +-
.../hudi/catalog/HudiCatalogPartitionSyncTool.java | 471 +++++++++++++++++++++
.../xtable/hudi/catalog/HudiCatalogTableUtils.java | 116 +++++
.../xtable/hudi/catalog/HudiInputFormatUtils.java | 112 +++++
.../apache/xtable/schema/SparkSchemaExtractor.java | 133 ++++++
.../xtable/delta/TestDeltaSchemaExtractor.java | 134 ------
.../catalog/TestHudiCatalogPartitionSyncTool.java | 323 ++++++++++++++
.../hudi/catalog/TestHudiCatalogTableUtils.java | 144 +++++++
.../hudi/catalog/TestHudiInputFormatUtils.java | 118 ++++++
.../TestSparkSchemaExtractor.java} | 258 ++---------
.../org/apache/xtable/testutil/ITTestUtils.java | 2 +
20 files changed, 1678 insertions(+), 492 deletions(-)
diff --git a/xtable-core/pom.xml b/xtable-core/pom.xml
index 68d7a176..24bc31df 100644
--- a/xtable-core/pom.xml
+++ b/xtable-core/pom.xml
@@ -82,6 +82,12 @@
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-java-client</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hudi</groupId>
+ <artifactId>hudi-sync-common</artifactId>
+ <version>${hudi.version}</version>
+ <scope>provided</scope>
+ </dependency>
<!-- Iceberg dependencies -->
<dependency>
diff --git
a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogUtils.java
b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogPartition.java
similarity index 52%
copy from xtable-core/src/main/java/org/apache/xtable/catalog/CatalogUtils.java
copy to
xtable-core/src/main/java/org/apache/xtable/catalog/CatalogPartition.java
index 2e00ce1d..f2dfd4d3 100644
--- a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogUtils.java
+++ b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogPartition.java
@@ -18,21 +18,31 @@
package org.apache.xtable.catalog;
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
+import java.util.List;
-import org.apache.xtable.model.catalog.CatalogTableIdentifier;
-import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
+import lombok.Getter;
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public class CatalogUtils {
+/**
+ * This class is designed to encapsulate a set of partition values and the
corresponding storage
+ * location where the data for this partition is stored.
+ */
+@Getter
+public class CatalogPartition {
+
+ /**
+ * A list of values defining this partition. For example, these values might
correspond to
+ * partition keys in a dataset (e.g., year, month, day).
+ */
+ private final List<String> values;
+
+ /**
+ * The storage location associated with this partition. Typically, this
would be a path in a file
+ * system or object store.
+ */
+ private final String storageLocation;
- public static HierarchicalTableIdentifier toHierarchicalTableIdentifier(
- CatalogTableIdentifier tableIdentifier) {
- if (tableIdentifier instanceof HierarchicalTableIdentifier) {
- return (HierarchicalTableIdentifier) tableIdentifier;
- }
- throw new IllegalArgumentException(
- "Invalid tableIdentifier implementation: " +
tableIdentifier.getClass().getName());
+ public CatalogPartition(List<String> values, String storageLocation) {
+ this.values = values;
+ this.storageLocation = storageLocation;
}
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogPartitionEvent.java
b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogPartitionEvent.java
new file mode 100644
index 00000000..2e58caa0
--- /dev/null
+++
b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogPartitionEvent.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.catalog;
+
+/** Partition Event captures any partition that needs to be added or updated.
*/
+public class CatalogPartitionEvent {
+
+ public enum PartitionEventType {
+ ADD,
+ UPDATE,
+ DROP
+ }
+
+ public PartitionEventType eventType;
+ public String storagePartition;
+
+ CatalogPartitionEvent(PartitionEventType eventType, String storagePartition)
{
+ this.eventType = eventType;
+ this.storagePartition = storagePartition;
+ }
+
+ public static CatalogPartitionEvent newPartitionAddEvent(String
storagePartition) {
+ return new CatalogPartitionEvent(PartitionEventType.ADD, storagePartition);
+ }
+
+ public static CatalogPartitionEvent newPartitionUpdateEvent(String
storagePartition) {
+ return new CatalogPartitionEvent(PartitionEventType.UPDATE,
storagePartition);
+ }
+
+ public static CatalogPartitionEvent newPartitionDropEvent(String
storagePartition) {
+ return new CatalogPartitionEvent(PartitionEventType.DROP,
storagePartition);
+ }
+}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogPartitionSyncOperations.java
b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogPartitionSyncOperations.java
new file mode 100644
index 00000000..5b3266cc
--- /dev/null
+++
b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogPartitionSyncOperations.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.catalog;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+
+/**
+ * Defines operations for managing partitions in an external catalog.
+ *
+ * <p>This interface provides methods to perform CRUD (Create, Read, Update,
Delete) operations on
+ * partitions associated with a table in an external catalog system.
+ */
+public interface CatalogPartitionSyncOperations {
+
+ /**
+ * Retrieves all partitions associated with the specified table.
+ *
+ * @param tableIdentifier an object identifying the table whose partitions
are to be fetched.
+ * @return a list of {@link CatalogPartition} objects representing all
partitions of the specified
+ * table.
+ */
+ List<CatalogPartition> getAllPartitions(CatalogTableIdentifier
tableIdentifier);
+
+ /**
+ * Adds new partitions to the specified table in the catalog.
+ *
+ * @param tableIdentifier an object identifying the table where partitions
are to be added.
+ * @param partitionsToAdd a list of partitions to be added to the table.
+ */
+ void addPartitionsToTable(
+ CatalogTableIdentifier tableIdentifier, List<CatalogPartition>
partitionsToAdd);
+
+ /**
+ * Updates the specified partitions for a table in the catalog.
+ *
+ * @param tableIdentifier an object identifying the table whose partitions
are to be updated.
+ * @param changedPartitions a list of partitions to be updated in the table.
+ */
+ void updatePartitionsToTable(
+ CatalogTableIdentifier tableIdentifier, List<CatalogPartition>
changedPartitions);
+
+ /**
+ * Removes the specified partitions from a table in the catalog.
+ *
+ * @param tableIdentifier an object identifying the table from which
partitions are to be dropped.
+ * @param partitionsToDrop a list of partitions to be removed from the table.
+ */
+ void dropPartitions(
+ CatalogTableIdentifier tableIdentifier, List<CatalogPartition>
partitionsToDrop);
+
+ /**
+ * Retrieves the properties indicating the last synchronization state for
the given table.
+ *
+ * <p>This method provides a default implementation that returns an empty
map. Implementations of
+ * this interface can override it to fetch the actual last synced properties
from a catalog.
+ *
+ * @param tableIdentifier the identifier of the table whose last synced
properties are to be
+ * fetched.
+ * @param keysToRetrieve a list of keys representing the specific properties
to retrieve.
+ * @return a map of key-value pairs representing the last synchronization
properties.
+ */
+ default Map<String, String> getTableProperties(
+ CatalogTableIdentifier tableIdentifier, List<String> keysToRetrieve) {
+ return new HashMap<>();
+ }
+
+ /**
+ * Updates the properties indicating the last synchronization state for the
given table.
+ *
+ * <p>This method provides a default implementation that performs no
operation. Implementations of
+ * this interface can override it to update the last synced properties in a
catalog.
+ *
+ * @param tableIdentifier the identifier of the table whose last synced
properties are to be
+ * updated.
+ * @param propertiesToUpdate a map of key-value pairs representing the
updated properties.
+ */
+ default void updateTableProperties(
+ CatalogTableIdentifier tableIdentifier, Map<String, String>
propertiesToUpdate) {}
+}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogUtils.java
b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogPartitionSyncTool.java
similarity index 51%
copy from xtable-core/src/main/java/org/apache/xtable/catalog/CatalogUtils.java
copy to
xtable-core/src/main/java/org/apache/xtable/catalog/CatalogPartitionSyncTool.java
index 2e00ce1d..57f2f132 100644
--- a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogUtils.java
+++
b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogPartitionSyncTool.java
@@ -18,21 +18,24 @@
package org.apache.xtable.catalog;
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
-
+import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.catalog.CatalogTableIdentifier;
-import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public class CatalogUtils {
+/**
+ * Defines methods to synchronize all partitions from the storage to the
catalog. Implementations of
+ * this interface will handle the logic for syncing partitions, including
detecting partition
+ * changes and updating the catalog accordingly.
+ */
+public interface CatalogPartitionSyncTool {
- public static HierarchicalTableIdentifier toHierarchicalTableIdentifier(
- CatalogTableIdentifier tableIdentifier) {
- if (tableIdentifier instanceof HierarchicalTableIdentifier) {
- return (HierarchicalTableIdentifier) tableIdentifier;
- }
- throw new IllegalArgumentException(
- "Invalid tableIdentifier implementation: " +
tableIdentifier.getClass().getName());
- }
+ /**
+ * Syncs all partitions on storage to the catalog.
+ *
+ * @param oneTable The object representing the table whose partitions are
being synced. This
+ * object contains necessary details to perform the sync operation.
+ * @param tableIdentifier The table in the catalog.
+ * @return {@code true} if one or more partition(s) are changed in the
catalog; {@code false}
+ * otherwise.
+ */
+ public boolean syncPartitions(InternalTable oneTable, CatalogTableIdentifier
tableIdentifier);
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogUtils.java
b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogUtils.java
index 2e00ce1d..7a845255 100644
--- a/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogUtils.java
+++ b/xtable-core/src/main/java/org/apache/xtable/catalog/CatalogUtils.java
@@ -18,11 +18,20 @@
package org.apache.xtable.catalog;
+import java.util.Optional;
+
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hudi.sync.common.model.PartitionValueExtractor;
+
+import org.apache.xtable.hudi.catalog.HudiCatalogPartitionSyncTool;
import org.apache.xtable.model.catalog.CatalogTableIdentifier;
import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
+import org.apache.xtable.model.storage.TableFormat;
+import org.apache.xtable.reflection.ReflectionUtils;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class CatalogUtils {
@@ -35,4 +44,26 @@ public class CatalogUtils {
throw new IllegalArgumentException(
"Invalid tableIdentifier implementation: " +
tableIdentifier.getClass().getName());
}
+
+ public static Optional<CatalogPartitionSyncTool> getPartitionSyncTool(
+ String tableFormat,
+ String partitionValueExtractorClass,
+ CatalogPartitionSyncOperations catalogPartitionSyncOperations,
+ Configuration configuration) {
+
+ if (partitionValueExtractorClass.isEmpty()) {
+ return Optional.empty();
+ }
+
+ // In Iceberg and Delta, partitions are automatically synchronized with
catalogs when
+ // table metadata is updated. However, for Hudi, we need to sync them
manually
+ if (tableFormat.equals(TableFormat.HUDI)) {
+ PartitionValueExtractor partitionValueExtractor =
+ ReflectionUtils.createInstanceOfClass(partitionValueExtractorClass);
+ return Optional.of(
+ new HudiCatalogPartitionSyncTool(
+ catalogPartitionSyncOperations, partitionValueExtractor,
configuration));
+ }
+ return Optional.empty();
+ }
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
index b34fa449..343a2d21 100644
---
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
+++
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
@@ -63,6 +63,7 @@ import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.storage.DataFilesDiff;
import org.apache.xtable.model.storage.PartitionFileGroup;
import org.apache.xtable.model.storage.TableFormat;
+import org.apache.xtable.schema.SparkSchemaExtractor;
import org.apache.xtable.spi.sync.ConversionTarget;
public class DeltaConversionTarget implements ConversionTarget {
@@ -247,7 +248,7 @@ public class DeltaConversionTarget implements
ConversionTarget {
private void setLatestSchema(InternalSchema schema) {
this.latestSchemaInternal = schema;
- this.latestSchema = schemaExtractor.fromInternalSchema(schema);
+ this.latestSchema =
SparkSchemaExtractor.getInstance().fromInternalSchema(schema);
}
private void commitTransaction() {
diff --git
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java
index a10ee120..d1303e84 100644
---
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java
+++
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java
@@ -29,17 +29,13 @@ import lombok.NoArgsConstructor;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.MapType;
import org.apache.spark.sql.types.Metadata;
-import org.apache.spark.sql.types.MetadataBuilder;
-import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.xtable.collectors.CustomCollectors;
import org.apache.xtable.exception.NotSupportedException;
-import org.apache.xtable.exception.SchemaExtractorException;
import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.schema.InternalType;
@@ -59,104 +55,12 @@ import org.apache.xtable.schema.SchemaUtils;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class DeltaSchemaExtractor {
private static final String DELTA_COLUMN_MAPPING_ID =
"delta.columnMapping.id";
- private static final String COMMENT = "comment";
private static final DeltaSchemaExtractor INSTANCE = new
DeltaSchemaExtractor();
public static DeltaSchemaExtractor getInstance() {
return INSTANCE;
}
- public StructType fromInternalSchema(InternalSchema internalSchema) {
- StructField[] fields =
- internalSchema.getFields().stream()
- .map(
- field ->
- new StructField(
- field.getName(),
- convertFieldType(field),
- field.getSchema().isNullable(),
- getMetaData(field.getSchema())))
- .toArray(StructField[]::new);
- return new StructType(fields);
- }
-
- private DataType convertFieldType(InternalField field) {
- switch (field.getSchema().getDataType()) {
- case ENUM:
- case STRING:
- return DataTypes.StringType;
- case INT:
- return DataTypes.IntegerType;
- case LONG:
- case TIMESTAMP_NTZ:
- return DataTypes.LongType;
- case BYTES:
- case FIXED:
- case UUID:
- return DataTypes.BinaryType;
- case BOOLEAN:
- return DataTypes.BooleanType;
- case FLOAT:
- return DataTypes.FloatType;
- case DATE:
- return DataTypes.DateType;
- case TIMESTAMP:
- return DataTypes.TimestampType;
- case DOUBLE:
- return DataTypes.DoubleType;
- case DECIMAL:
- int precision =
- (int)
field.getSchema().getMetadata().get(InternalSchema.MetadataKey.DECIMAL_PRECISION);
- int scale =
- (int)
field.getSchema().getMetadata().get(InternalSchema.MetadataKey.DECIMAL_SCALE);
- return DataTypes.createDecimalType(precision, scale);
- case RECORD:
- return fromInternalSchema(field.getSchema());
- case MAP:
- InternalField key =
- field.getSchema().getFields().stream()
- .filter(
- mapField ->
-
InternalField.Constants.MAP_KEY_FIELD_NAME.equals(mapField.getName()))
- .findFirst()
- .orElseThrow(() -> new SchemaExtractorException("Invalid map
schema"));
- InternalField value =
- field.getSchema().getFields().stream()
- .filter(
- mapField ->
-
InternalField.Constants.MAP_VALUE_FIELD_NAME.equals(mapField.getName()))
- .findFirst()
- .orElseThrow(() -> new SchemaExtractorException("Invalid map
schema"));
- return DataTypes.createMapType(
- convertFieldType(key), convertFieldType(value),
value.getSchema().isNullable());
- case LIST:
- InternalField element =
- field.getSchema().getFields().stream()
- .filter(
- arrayField ->
-
InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME.equals(
- arrayField.getName()))
- .findFirst()
- .orElseThrow(() -> new SchemaExtractorException("Invalid array
schema"));
- return DataTypes.createArrayType(
- convertFieldType(element), element.getSchema().isNullable());
- default:
- throw new NotSupportedException("Unsupported type: " +
field.getSchema().getDataType());
- }
- }
-
- private Metadata getMetaData(InternalSchema schema) {
- InternalType type = schema.getDataType();
- MetadataBuilder metadataBuilder = new MetadataBuilder();
- if (type == InternalType.UUID) {
- metadataBuilder.putString(InternalSchema.XTABLE_LOGICAL_TYPE, "uuid");
- }
- if (schema.getComment() != null) {
- metadataBuilder.putString(COMMENT, schema.getComment());
- }
- return metadataBuilder.build();
- }
-
public InternalSchema toInternalSchema(StructType structType) {
return toInternalSchema(structType, null, false, null, null);
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/exception/CatalogSyncException.java
b/xtable-core/src/main/java/org/apache/xtable/exception/CatalogSyncException.java
index 1960641c..5d783004 100644
---
a/xtable-core/src/main/java/org/apache/xtable/exception/CatalogSyncException.java
+++
b/xtable-core/src/main/java/org/apache/xtable/exception/CatalogSyncException.java
@@ -34,4 +34,8 @@ public class CatalogSyncException extends InternalException {
public CatalogSyncException(String message, Throwable e) {
super(ErrorCode.CATALOG_SYNC_GENERIC_EXCEPTION, message, e);
}
+
+ public CatalogSyncException(String message) {
+ super(ErrorCode.CATALOG_SYNC_GENERIC_EXCEPTION, message);
+ }
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiTableManager.java
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiTableManager.java
index 1ac1b5aa..7c48e88d 100644
--- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiTableManager.java
+++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiTableManager.java
@@ -44,7 +44,7 @@ import org.apache.xtable.model.storage.DataLayoutStrategy;
/** A class used to initialize new Hudi tables and load the metadata of
existing tables. */
@Log4j2
@RequiredArgsConstructor(staticName = "of")
-class HudiTableManager {
+public class HudiTableManager {
private static final String NONPARTITIONED_KEY_GENERATOR =
"org.apache.hudi.keygen.NonpartitionedKeyGenerator";
private static final String CUSTOM_KEY_GENERATOR =
"org.apache.hudi.keygen.CustomKeyGenerator";
@@ -61,7 +61,7 @@ class HudiTableManager {
* @param tableDataPath the path for the table
* @return {@link HoodieTableMetaClient} if table exists, otherwise null
*/
- Optional<HoodieTableMetaClient> loadTableMetaClientIfExists(String
tableDataPath) {
+ public Optional<HoodieTableMetaClient> loadTableMetaClientIfExists(String
tableDataPath) {
try {
return Optional.of(
HoodieTableMetaClient.builder()
diff --git
a/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogPartitionSyncTool.java
b/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogPartitionSyncTool.java
new file mode 100644
index 00000000..792c7063
--- /dev/null
+++
b/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogPartitionSyncTool.java
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.hudi.catalog;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import lombok.extern.log4j.Log4j2;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.hadoop.CachingPath;
+import org.apache.hudi.sync.common.model.PartitionValueExtractor;
+
+import org.apache.xtable.catalog.CatalogPartition;
+import org.apache.xtable.catalog.CatalogPartitionEvent;
+import org.apache.xtable.catalog.CatalogPartitionSyncOperations;
+import org.apache.xtable.catalog.CatalogPartitionSyncTool;
+import org.apache.xtable.exception.CatalogSyncException;
+import org.apache.xtable.hudi.HudiTableManager;
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+
+@Log4j2
+public class HudiCatalogPartitionSyncTool implements CatalogPartitionSyncTool {
+
+ private final CatalogPartitionSyncOperations catalogClient;
+ private final HudiTableManager hudiTableManager;
+ private final PartitionValueExtractor partitionValuesExtractor;
+ private final Configuration configuration;
+
+ public static final String LAST_COMMIT_TIME_SYNC = "last_commit_time_sync";
+ public static final String LAST_COMMIT_COMPLETION_TIME_SYNC =
"last_commit_completion_time_sync";
+
+ public HudiCatalogPartitionSyncTool(
+ CatalogPartitionSyncOperations catalogClient,
+ PartitionValueExtractor partitionValueExtractor,
+ Configuration configuration) {
+ this.catalogClient = catalogClient;
+ this.hudiTableManager = HudiTableManager.of(configuration);
+ this.partitionValuesExtractor = partitionValueExtractor;
+ this.configuration = configuration;
+ }
+
+ @VisibleForTesting
+ HudiCatalogPartitionSyncTool(
+ CatalogPartitionSyncOperations catalogClient,
+ HudiTableManager hudiTableManager,
+ PartitionValueExtractor partitionValueExtractor,
+ Configuration configuration) {
+ this.catalogClient = catalogClient;
+ this.hudiTableManager = hudiTableManager;
+ this.partitionValuesExtractor = partitionValueExtractor;
+ this.configuration = configuration;
+ }
+
+ HoodieTableMetaClient getMetaClient(String basePath) {
+ Optional<HoodieTableMetaClient> metaClientOpt =
+ hudiTableManager.loadTableMetaClientIfExists(basePath);
+
+ if (!metaClientOpt.isPresent()) {
+ throw new CatalogSyncException(
+ "failed to get meta client since table is not present in the base
path " + basePath);
+ }
+
+ return metaClientOpt.get();
+ }
+
+ /**
+ * Syncs all partitions on storage to the catalog, by only making
incremental changes.
+ *
+ * @param tableIdentifier The table in the catalog.
+ * @return {@code true} if one or more partition(s) are changed in the
catalog; {@code false}
+ * otherwise.
+ */
+ private boolean syncAllPartitions(
+ HoodieTableMetaClient metaClient,
+ InternalTable internalTable,
+ CatalogTableIdentifier tableIdentifier) {
+ try {
+ if (internalTable.getPartitioningFields().isEmpty()) {
+ return false;
+ }
+
+ List<CatalogPartition> allPartitionsInCatalog =
+ catalogClient.getAllPartitions(tableIdentifier);
+ List<String> allPartitionsOnStorage =
+ getAllPartitionPathsOnStorage(internalTable.getBasePath());
+ boolean partitionsChanged =
+ syncPartitions(
+ metaClient,
+ tableIdentifier,
+ getPartitionEvents(metaClient, allPartitionsInCatalog,
allPartitionsOnStorage));
+ if (partitionsChanged) {
+ updateLastCommitTimeSynced(metaClient, tableIdentifier);
+ }
+ return partitionsChanged;
+ } catch (Exception e) {
+ throw new CatalogSyncException(
+ "Failed to sync partitions for table " + tableIdentifier.getId(), e);
+ }
+ }
+
+ /**
+ * Syncs all partitions on storage to the catalog, by only making
incremental changes.
+ *
+ * @param tableIdentifier The table in the catalog.
+ * @return {@code true} if one or more partition(s) are changed in the
catalog; {@code false}
+ * otherwise.
+ */
+ @Override
+ public boolean syncPartitions(InternalTable table, CatalogTableIdentifier
tableIdentifier) {
+ Map<String, String> lastCommitTimeSyncedProperties =
+ catalogClient.getTableProperties(
+ tableIdentifier,
+ Arrays.asList(LAST_COMMIT_TIME_SYNC,
LAST_COMMIT_COMPLETION_TIME_SYNC));
+ Option<String> lastCommitTimeSynced =
+
Option.ofNullable(lastCommitTimeSyncedProperties.get(LAST_COMMIT_TIME_SYNC));
+ Option<String> lastCommitCompletionTimeSynced =
+
Option.ofNullable(lastCommitTimeSyncedProperties.get(LAST_COMMIT_COMPLETION_TIME_SYNC));
+ HoodieTableMetaClient metaClient = getMetaClient(table.getBasePath());
+ if (!lastCommitTimeSynced.isPresent()
+ ||
metaClient.getActiveTimeline().isBeforeTimelineStarts(lastCommitTimeSynced.get()))
{
+ // If the last commit time synced is before the start of the active
timeline,
+ // the Hive sync falls back to list all partitions on storage, instead of
+ // reading active and archived timelines for written partitions.
+ log.info(
+ "Sync all partitions given the last commit time synced is empty or "
+ + "before the start of the active timeline. Listing all
partitions in "
+ + table.getBasePath());
+ return syncAllPartitions(metaClient, table, tableIdentifier);
+ } else {
+ List<String> writtenPartitionsSince =
+ getWrittenPartitionsSince(
+ metaClient,
+ Option.ofNullable(lastCommitTimeSynced.get()),
+ Option.ofNullable(lastCommitCompletionTimeSynced.get()));
+ log.info("Storage partitions scan complete. Found " +
writtenPartitionsSince.size());
+
+ // Sync the partitions if needed
+ // find dropped partitions, if any, in the latest commit
+ Set<String> droppedPartitions =
+ getDroppedPartitionsSince(
+ metaClient,
+ Option.ofNullable(lastCommitTimeSynced.get()),
+ Option.of(lastCommitCompletionTimeSynced.get()));
+ boolean partitionsChanged =
+ syncPartitions(metaClient, tableIdentifier, writtenPartitionsSince,
droppedPartitions);
+ if (partitionsChanged) {
+ updateLastCommitTimeSynced(metaClient, tableIdentifier);
+ }
+ return partitionsChanged;
+ }
+ }
+
+ private void updateLastCommitTimeSynced(
+ HoodieTableMetaClient metaClient, CatalogTableIdentifier
tableIdentifier) {
+ HoodieTimeline activeTimeline = metaClient.getActiveTimeline();
+ Option<String> lastCommitSynced =
activeTimeline.lastInstant().map(HoodieInstant::getTimestamp);
+ Option<String> lastCommitCompletionSynced =
+ activeTimeline
+ .getInstantsOrderedByStateTransitionTime()
+ .skip(activeTimeline.countInstants() - 1)
+ .findFirst()
+ .map(i -> Option.of(i.getStateTransitionTime()))
+ .orElse(Option.empty());
+
+ if (lastCommitSynced.isPresent()) {
+ Map<String, String> lastSyncedProperties = new HashMap<>();
+ lastSyncedProperties.put(LAST_COMMIT_TIME_SYNC, lastCommitSynced.get());
+ lastSyncedProperties.put(LAST_COMMIT_COMPLETION_TIME_SYNC,
lastCommitCompletionSynced.get());
+ catalogClient.updateTableProperties(tableIdentifier,
lastSyncedProperties);
+ }
+ }
+
+ /**
+ * Gets all relative partitions paths in the Hudi table on storage.
+ *
+ * @return All relative partitions paths.
+ */
+ public List<String> getAllPartitionPathsOnStorage(String basePath) {
+ HoodieLocalEngineContext engineContext = new
HoodieLocalEngineContext(configuration);
+ // ToDo - if we need to config to validate assumeDatePartitioning
+ return FSUtils.getAllPartitionPaths(engineContext, basePath, true, false);
+ }
+
+ public List<String> getWrittenPartitionsSince(
+ HoodieTableMetaClient metaClient,
+ Option<String> lastCommitTimeSynced,
+ Option<String> lastCommitCompletionTimeSynced) {
+ if (!lastCommitTimeSynced.isPresent()) {
+ String basePath = metaClient.getBasePathV2().toUri().toString();
+ log.info("Last commit time synced is not known, listing all partitions
in " + basePath);
+ return getAllPartitionPathsOnStorage(basePath);
+ } else {
+ log.info(
+ "Last commit time synced is "
+ + lastCommitTimeSynced.get()
+ + ", Getting commits since then");
+ return TimelineUtils.getWrittenPartitions(
+ TimelineUtils.getCommitsTimelineAfter(
+ metaClient, lastCommitTimeSynced.get(),
lastCommitCompletionTimeSynced));
+ }
+ }
+
+ /**
+ * Get the set of dropped partitions since the last synced commit. If last
sync time is not known
+ * then consider only active timeline. Going through archive timeline is a
costly operation, and
+ * it should be avoided unless some start time is given.
+ */
+ private Set<String> getDroppedPartitionsSince(
+ HoodieTableMetaClient metaClient,
+ Option<String> lastCommitTimeSynced,
+ Option<String> lastCommitCompletionTimeSynced) {
+ HoodieTimeline timeline =
+ lastCommitTimeSynced.isPresent()
+ ? TimelineUtils.getCommitsTimelineAfter(
+ metaClient, lastCommitTimeSynced.get(),
lastCommitCompletionTimeSynced)
+ : metaClient.getActiveTimeline();
+ return new HashSet<>(TimelineUtils.getDroppedPartitions(timeline));
+ }
+
+ /**
+ * Syncs added, updated, and dropped partitions to the catalog.
+ *
+ * @param tableIdentifier The table in the catalog.
+ * @param partitionEventList The partition change event list.
+ * @return {@code true} if one or more partition(s) are changed in the
catalog; {@code false}
+ * otherwise.
+ */
+ private boolean syncPartitions(
+ HoodieTableMetaClient metaClient,
+ CatalogTableIdentifier tableIdentifier,
+ List<CatalogPartitionEvent> partitionEventList) {
+ List<CatalogPartition> newPartitions =
+ filterPartitions(
+ metaClient.getBasePathV2(),
+ partitionEventList,
+ CatalogPartitionEvent.PartitionEventType.ADD);
+ if (!newPartitions.isEmpty()) {
+ log.info("New Partitions " + newPartitions);
+ catalogClient.addPartitionsToTable(tableIdentifier, newPartitions);
+ }
+
+ List<CatalogPartition> updatePartitions =
+ filterPartitions(
+ metaClient.getBasePathV2(),
+ partitionEventList,
+ CatalogPartitionEvent.PartitionEventType.UPDATE);
+ if (!updatePartitions.isEmpty()) {
+ log.info("Changed Partitions " + updatePartitions);
+ catalogClient.updatePartitionsToTable(tableIdentifier, updatePartitions);
+ }
+
+ List<CatalogPartition> dropPartitions =
+ filterPartitions(
+ metaClient.getBasePathV2(),
+ partitionEventList,
+ CatalogPartitionEvent.PartitionEventType.DROP);
+ if (!dropPartitions.isEmpty()) {
+ log.info("Drop Partitions " + dropPartitions);
+ catalogClient.dropPartitions(tableIdentifier, dropPartitions);
+ }
+
+ return !updatePartitions.isEmpty() || !newPartitions.isEmpty() ||
!dropPartitions.isEmpty();
+ }
+
+ private List<CatalogPartition> filterPartitions(
+ Path basePath,
+ List<CatalogPartitionEvent> events,
+ CatalogPartitionEvent.PartitionEventType eventType) {
+ return events.stream()
+ .filter(s -> s.eventType == eventType)
+ .map(
+ s ->
+ new CatalogPartition(
+
partitionValuesExtractor.extractPartitionValuesInPath(s.storagePartition),
+ new Path(basePath, s.storagePartition).toUri().toString()))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Syncs the list of storage partitions passed in (checks if the partition
is in hive, if not adds
+ * it or if the partition path does not match, it updates the partition
path).
+ *
+ * @param tableIdentifier The table name in the catalog.
+ * @param writtenPartitionsSince Partitions has been added, updated, or
dropped since last synced.
+ * @param droppedPartitions Partitions that are dropped since last sync.
+ * @return {@code true} if one or more partition(s) are changed in the
catalog; {@code false}
+ * otherwise.
+ */
+ private boolean syncPartitions(
+ HoodieTableMetaClient metaClient,
+ CatalogTableIdentifier tableIdentifier,
+ List<String> writtenPartitionsSince,
+ Set<String> droppedPartitions) {
+ try {
+ if (writtenPartitionsSince.isEmpty()) {
+ return false;
+ }
+
+ List<CatalogPartition> hivePartitions =
getTablePartitions(tableIdentifier);
+ return syncPartitions(
+ metaClient,
+ tableIdentifier,
+ getPartitionEvents(
+ metaClient, hivePartitions, writtenPartitionsSince,
droppedPartitions));
+ } catch (Exception e) {
+ throw new CatalogSyncException(
+ "Failed to sync partitions for table " + tableIdentifier.getId(), e);
+ }
+ }
+
+ /**
+ * Fetch partitions from meta service, will try to push down more filters to
avoid fetching too
+ * many unnecessary partitions.
+ */
+ private List<CatalogPartition> getTablePartitions(CatalogTableIdentifier
tableIdentifier) {
+ return catalogClient.getAllPartitions(tableIdentifier);
+ }
+
+ /**
+ * Gets the partition events for changed partitions.
+ *
+ * <p>This compares the list of all partitions of a table stored in the
catalog and on the
+ * storage: (1) Partitions exist in the catalog, but NOT the storage: drops
them in the catalog;
+ * (2) Partitions exist on the storage, but NOT the catalog: adds them to
the catalog; (3)
+ * Partitions exist in both, but the partition path is different: update
them in the catalog.
+ *
+ * @param allPartitionsInCatalog All partitions of a table stored in the
catalog.
+ * @param allPartitionsOnStorage All partitions of a table stored on the
storage.
+ * @return partition events for changed partitions.
+ */
+ private List<CatalogPartitionEvent> getPartitionEvents(
+ HoodieTableMetaClient metaClient,
+ List<CatalogPartition> allPartitionsInCatalog,
+ List<String> allPartitionsOnStorage) {
+ Map<String, String> paths =
getPartitionValuesToPathMapping(allPartitionsInCatalog);
+ Set<String> partitionsToDrop = new HashSet<>(paths.keySet());
+
+ List<CatalogPartitionEvent> events = new ArrayList<>();
+ for (String storagePartition : allPartitionsOnStorage) {
+ Path storagePartitionPath =
+ FSUtils.getPartitionPath(metaClient.getBasePathV2(),
storagePartition);
+ String fullStoragePartitionPath =
+
Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
+ List<String> storagePartitionValues =
+
partitionValuesExtractor.extractPartitionValuesInPath(storagePartition);
+
+ if (!storagePartitionValues.isEmpty()) {
+ String storageValue = String.join(", ", storagePartitionValues);
+ // Remove partitions that exist on storage from the `partitionsToDrop`
set,
+ // so the remaining partitions that exist in the catalog should be
dropped
+ partitionsToDrop.remove(storageValue);
+ if (!paths.containsKey(storageValue)) {
+
events.add(CatalogPartitionEvent.newPartitionAddEvent(storagePartition));
+ } else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) {
+
events.add(CatalogPartitionEvent.newPartitionUpdateEvent(storagePartition));
+ }
+ }
+ }
+
+ partitionsToDrop.forEach(
+ storageValue -> {
+ String storagePath = paths.get(storageValue);
+ try {
+ String relativePath =
+ FSUtils.getRelativePartitionPath(
+ metaClient.getBasePathV2(), new CachingPath(storagePath));
+
events.add(CatalogPartitionEvent.newPartitionDropEvent(relativePath));
+ } catch (IllegalArgumentException e) {
+ log.error(
+ "Cannot parse the path stored in the catalog, ignoring it for "
+ + "generating DROP partition event: \""
+ + storagePath
+ + "\".",
+ e);
+ }
+ });
+ return events;
+ }
+
+ /**
+ * Iterate over the storage partitions and find if there are any new
partitions that need to be
+ * added or updated. Generate a list of PartitionEvent based on the changes
required.
+ */
+ public List<CatalogPartitionEvent> getPartitionEvents(
+ HoodieTableMetaClient metaClient,
+ List<CatalogPartition> partitionsInCatalog,
+ List<String> writtenPartitionsOnStorage,
+ Set<String> droppedPartitionsOnStorage) {
+ Map<String, String> paths =
getPartitionValuesToPathMapping(partitionsInCatalog);
+
+ List<CatalogPartitionEvent> events = new ArrayList<>();
+ for (String storagePartition : writtenPartitionsOnStorage) {
+ Path storagePartitionPath =
+ FSUtils.getPartitionPath(metaClient.getBasePathV2(),
storagePartition);
+ String fullStoragePartitionPath =
+
Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
+ List<String> storagePartitionValues =
+
partitionValuesExtractor.extractPartitionValuesInPath(storagePartition);
+
+ if (droppedPartitionsOnStorage.contains(storagePartition)) {
+
events.add(CatalogPartitionEvent.newPartitionDropEvent(storagePartition));
+ } else {
+ if (!storagePartitionValues.isEmpty()) {
+ String storageValue = String.join(", ", storagePartitionValues);
+ if (!paths.containsKey(storageValue)) {
+
events.add(CatalogPartitionEvent.newPartitionAddEvent(storagePartition));
+ } else if
(!paths.get(storageValue).equals(fullStoragePartitionPath)) {
+
events.add(CatalogPartitionEvent.newPartitionUpdateEvent(storagePartition));
+ }
+ }
+ }
+ }
+ return events;
+ }
+
+ /**
+ * Gets the partition values to the absolute path mapping based on the
partition information from
+ * the catalog.
+ *
+ * @param partitionsInCatalog Partitions in the catalog.
+ * @return The partition values to the absolute path mapping.
+ */
+ private Map<String, String> getPartitionValuesToPathMapping(
+ List<CatalogPartition> partitionsInCatalog) {
+ Map<String, String> paths = new HashMap<>();
+ for (CatalogPartition tablePartition : partitionsInCatalog) {
+ List<String> hivePartitionValues = tablePartition.getValues();
+ String fullTablePartitionPath =
+ Path.getPathWithoutSchemeAndAuthority(new
Path(tablePartition.getStorageLocation()))
+ .toUri()
+ .getPath();
+ paths.put(String.join(", ", hivePartitionValues),
fullTablePartitionPath);
+ }
+ return paths;
+ }
+}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTableUtils.java
b/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTableUtils.java
new file mode 100644
index 00000000..e6d99706
--- /dev/null
+++
b/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTableUtils.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.hudi.catalog;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.spark.sql.types.StructType;
+
+import org.apache.hudi.common.util.StringUtils;
+
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+import org.apache.xtable.schema.SparkSchemaExtractor;
+
+/** Util class to fetch details about Hudi table */
+public class HudiCatalogTableUtils {
+
+ /**
+ * Get Spark Sql related table properties. This is used for spark datasource
table.
+ *
+ * @param schema The schema to write to the table.
+ * @return A new parameters added the spark's table properties.
+ */
+ public static Map<String, String> getSparkTableProperties(
+ List<String> partitionNames,
+ String sparkVersion,
+ int schemaLengthThreshold,
+ InternalSchema schema) {
+ List<InternalField> partitionCols = new ArrayList<>();
+ List<InternalField> dataCols = new ArrayList<>();
+ Map<String, InternalField> column2Field = new HashMap<>();
+
+ for (InternalField field : schema.getFields()) {
+ column2Field.put(field.getName(), field);
+ }
+ // Get partition columns and data columns.
+ for (String partitionName : partitionNames) {
+ // Default the unknown partition fields to be String.
+ // Keep the same logical with HiveSchemaUtil#getPartitionKeyType.
+ partitionCols.add(
+ column2Field.getOrDefault(
+ partitionName,
+ InternalField.builder()
+ .name(partitionName)
+ .schema(
+ InternalSchema.builder()
+ .dataType(InternalType.BYTES)
+ .isNullable(false)
+ .build())
+ .build()));
+ }
+
+ for (InternalField field : schema.getFields()) {
+ if (!partitionNames.contains(field.getName())) {
+ dataCols.add(field);
+ }
+ }
+
+ List<InternalField> reOrderedFields = new ArrayList<>();
+ reOrderedFields.addAll(dataCols);
+ reOrderedFields.addAll(partitionCols);
+ InternalSchema reorderedSchema =
+ InternalSchema.builder()
+ .fields(reOrderedFields)
+ .dataType(InternalType.RECORD)
+ .name(schema.getName())
+ .build();
+
+ StructType sparkSchema =
SparkSchemaExtractor.getInstance().fromInternalSchema(reorderedSchema);
+
+ Map<String, String> sparkProperties = new HashMap<>();
+ sparkProperties.put("spark.sql.sources.provider", "hudi");
+ if (!StringUtils.isNullOrEmpty(sparkVersion)) {
+ sparkProperties.put("spark.sql.create.version", sparkVersion);
+ }
+ // Split the schema string to multi-parts according the
schemaLengthThreshold size.
+ String schemaString = sparkSchema.json();
+ int numSchemaPart = (schemaString.length() + schemaLengthThreshold - 1) /
schemaLengthThreshold;
+ sparkProperties.put("spark.sql.sources.schema.numParts",
String.valueOf(numSchemaPart));
+ // Add each part of schema string to sparkProperties
+ for (int i = 0; i < numSchemaPart; i++) {
+ int start = i * schemaLengthThreshold;
+ int end = Math.min(start + schemaLengthThreshold, schemaString.length());
+ sparkProperties.put("spark.sql.sources.schema.part." + i,
schemaString.substring(start, end));
+ }
+ // Add partition columns
+ if (!partitionNames.isEmpty()) {
+ sparkProperties.put(
+ "spark.sql.sources.schema.numPartCols",
String.valueOf(partitionNames.size()));
+ for (int i = 0; i < partitionNames.size(); i++) {
+ sparkProperties.put("spark.sql.sources.schema.partCol." + i,
partitionNames.get(i));
+ }
+ }
+ return sparkProperties;
+ }
+}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiInputFormatUtils.java
b/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiInputFormatUtils.java
new file mode 100644
index 00000000..4d644494
--- /dev/null
+++
b/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiInputFormatUtils.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.hudi.catalog;
+
+import org.apache.hudi.common.model.HoodieFileFormat;
+
+import org.apache.xtable.exception.NotSupportedException;
+
+public class HudiInputFormatUtils {
+
+ /** Input Format, that provides a real-time view of data in a Hoodie table.
*/
+ private static final String HUDI_PARQUET_REALTIME_INPUT_FORMAT_CLASS =
+ "org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat";
+
+ /**
+ * HoodieInputFormat which understands the Hoodie File Structure and filters
files based on the
+ * Hoodie Mode.
+ */
+ private static final String HUDI_PARQUET_INPUT_FORMAT_CLASS =
+ "org.apache.hudi.hadoop.HoodieParquetInputFormat";
+
+ /** HoodieRealtimeInputFormat for HUDI datasets which store data in HFile
base file format. */
+ private static final String HUDI_HFILE_REALTIME_INPUT_FORMAT_CLASS =
+ "org.apache.hudi.hadoop.realtime.HoodieHFileRealtimeInputFormat";
+
+ /** HoodieInputFormat for HUDI datasets which store data in HFile base file
format. */
+ private static final String HUDI_HFILE_INPUT_FORMAT_CLASS =
+ "org.apache.hudi.hadoop.HoodieHFileInputFormat";
+
+ /** A MapReduce/ Hive input format for ORC files. */
+ private static final String ORC_INPUT_FORMAT_CLASS =
+ "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat";
+
+ /** A Hive OutputFormat for ORC files. */
+ private static final String ORC_OUTPUT_FORMAT_CLASS =
+ "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat";
+
+ /** A Parquet OutputFormat for Hive (with the deprecated package mapred) */
+ private static final String MAPRED_PARQUET_OUTPUT_FORMAT_CLASS =
+ "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat";
+
+ /** A ParquetHiveSerDe for Hive (with the deprecated package mapred) */
+ private static final String PARQUET_SERDE_CLASS =
+ "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe";
+
+ /**
+ * A serde class for ORC. It transparently passes the object to/ from the
ORC file reader/ writer.
+ */
+ private static final String ORC_SERDE_CLASS =
"org.apache.hadoop.hive.ql.io.orc.OrcSerde";
+
+ public static String getInputFormatClassName(HoodieFileFormat
baseFileFormat, boolean realtime) {
+ switch (baseFileFormat) {
+ case PARQUET:
+ if (realtime) {
+ return HUDI_PARQUET_REALTIME_INPUT_FORMAT_CLASS;
+ } else {
+ return HUDI_PARQUET_INPUT_FORMAT_CLASS;
+ }
+ case HFILE:
+ if (realtime) {
+ return HUDI_HFILE_REALTIME_INPUT_FORMAT_CLASS;
+ } else {
+ return HUDI_HFILE_INPUT_FORMAT_CLASS;
+ }
+ case ORC:
+ return ORC_INPUT_FORMAT_CLASS;
+ default:
+ throw new NotSupportedException(
+ "Hudi InputFormat not implemented for base file format " +
baseFileFormat);
+ }
+ }
+
+ public static String getOutputFormatClassName(HoodieFileFormat
baseFileFormat) {
+ switch (baseFileFormat) {
+ case PARQUET:
+ case HFILE:
+ return MAPRED_PARQUET_OUTPUT_FORMAT_CLASS;
+ case ORC:
+ return ORC_OUTPUT_FORMAT_CLASS;
+ default:
+ throw new NotSupportedException("No OutputFormat for base file format
" + baseFileFormat);
+ }
+ }
+
+ public static String getSerDeClassName(HoodieFileFormat baseFileFormat) {
+ switch (baseFileFormat) {
+ case PARQUET:
+ case HFILE:
+ return PARQUET_SERDE_CLASS;
+ case ORC:
+ return ORC_SERDE_CLASS;
+ default:
+ throw new NotSupportedException("No SerDe for base file format " +
baseFileFormat);
+ }
+ }
+}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/schema/SparkSchemaExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/schema/SparkSchemaExtractor.java
new file mode 100644
index 00000000..cda64941
--- /dev/null
+++
b/xtable-core/src/main/java/org/apache/xtable/schema/SparkSchemaExtractor.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.schema;
+
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.MetadataBuilder;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import org.apache.xtable.exception.NotSupportedException;
+import org.apache.xtable.exception.SchemaExtractorException;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+
+public class SparkSchemaExtractor {
+
+ private static final SparkSchemaExtractor INSTANCE = new
SparkSchemaExtractor();
+ private static final String COMMENT = "comment";
+
+ public static SparkSchemaExtractor getInstance() {
+ return INSTANCE;
+ }
+
+ public StructType fromInternalSchema(InternalSchema internalSchema) {
+ StructField[] fields =
+ internalSchema.getFields().stream()
+ .map(
+ field ->
+ new StructField(
+ field.getName(),
+ convertFieldType(field),
+ field.getSchema().isNullable(),
+ getMetaData(field.getSchema())))
+ .toArray(StructField[]::new);
+ return new StructType(fields);
+ }
+
+ private DataType convertFieldType(InternalField field) {
+ switch (field.getSchema().getDataType()) {
+ case ENUM:
+ case STRING:
+ return DataTypes.StringType;
+ case INT:
+ return DataTypes.IntegerType;
+ case LONG:
+ case TIMESTAMP_NTZ:
+ return DataTypes.LongType;
+ case BYTES:
+ case FIXED:
+ case UUID:
+ return DataTypes.BinaryType;
+ case BOOLEAN:
+ return DataTypes.BooleanType;
+ case FLOAT:
+ return DataTypes.FloatType;
+ case DATE:
+ return DataTypes.DateType;
+ case TIMESTAMP:
+ return DataTypes.TimestampType;
+ case DOUBLE:
+ return DataTypes.DoubleType;
+ case DECIMAL:
+ int precision =
+ (int)
field.getSchema().getMetadata().get(InternalSchema.MetadataKey.DECIMAL_PRECISION);
+ int scale =
+ (int)
field.getSchema().getMetadata().get(InternalSchema.MetadataKey.DECIMAL_SCALE);
+ return DataTypes.createDecimalType(precision, scale);
+ case RECORD:
+ return fromInternalSchema(field.getSchema());
+ case MAP:
+ InternalField key =
+ field.getSchema().getFields().stream()
+ .filter(
+ mapField ->
+
InternalField.Constants.MAP_KEY_FIELD_NAME.equals(mapField.getName()))
+ .findFirst()
+ .orElseThrow(() -> new SchemaExtractorException("Invalid map
schema"));
+ InternalField value =
+ field.getSchema().getFields().stream()
+ .filter(
+ mapField ->
+
InternalField.Constants.MAP_VALUE_FIELD_NAME.equals(mapField.getName()))
+ .findFirst()
+ .orElseThrow(() -> new SchemaExtractorException("Invalid map
schema"));
+ return DataTypes.createMapType(
+ convertFieldType(key), convertFieldType(value),
value.getSchema().isNullable());
+ case LIST:
+ InternalField element =
+ field.getSchema().getFields().stream()
+ .filter(
+ arrayField ->
+
InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME.equals(
+ arrayField.getName()))
+ .findFirst()
+ .orElseThrow(() -> new SchemaExtractorException("Invalid array
schema"));
+ return DataTypes.createArrayType(
+ convertFieldType(element), element.getSchema().isNullable());
+ default:
+ throw new NotSupportedException("Unsupported type: " +
field.getSchema().getDataType());
+ }
+ }
+
+ private Metadata getMetaData(InternalSchema schema) {
+ InternalType type = schema.getDataType();
+ MetadataBuilder metadataBuilder = new MetadataBuilder();
+ if (type == InternalType.UUID) {
+ metadataBuilder.putString(InternalSchema.XTABLE_LOGICAL_TYPE, "uuid");
+ }
+ if (schema.getComment() != null) {
+ metadataBuilder.putString(COMMENT, schema.getComment());
+ }
+ return metadataBuilder.build();
+ }
+}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java
index 361245fe..9c235a19 100644
---
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java
+++
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java
@@ -246,44 +246,12 @@ public class TestDeltaSchemaExtractor {
.add("requiredDecimal", DataTypes.createDecimalType(10, 2), false)
.add("optionalDecimal", DataTypes.createDecimalType(10, 2), true);
- Assertions.assertEquals(
- structRepresentation,
- DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchema));
Assertions.assertEquals(
internalSchema,
DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation));
}
@Test
public void testFixedBytes() {
- InternalSchema internalSchemaOriginal =
- InternalSchema.builder()
- .name("struct")
- .dataType(InternalType.RECORD)
- .isNullable(false)
- .fields(
- Arrays.asList(
- InternalField.builder()
- .name("requiredFixed")
- .schema(
- InternalSchema.builder()
- .name("fixed")
- .dataType(InternalType.FIXED)
- .isNullable(false)
- .comment("comment")
- .build())
- .build(),
- InternalField.builder()
- .name("optionalFixed")
- .schema(
- InternalSchema.builder()
- .name("fixed")
- .dataType(InternalType.FIXED)
- .isNullable(true)
- .build())
-
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
- .build()))
- .build();
-
InternalSchema internalSchemaAfterRoundTrip =
InternalSchema.builder()
.name("struct")
@@ -317,9 +285,6 @@ public class TestDeltaSchemaExtractor {
.add("requiredFixed", DataTypes.BinaryType, false, "comment")
.add("optionalFixed", DataTypes.BinaryType, true);
- Assertions.assertEquals(
- structRepresentation,
-
DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchemaOriginal));
Assertions.assertEquals(
internalSchemaAfterRoundTrip,
DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation));
@@ -360,101 +325,14 @@ public class TestDeltaSchemaExtractor {
.build()))
.build();
- InternalSchema internalSchemaTimestampNtz =
- InternalSchema.builder()
- .name("struct")
- .dataType(InternalType.RECORD)
- .isNullable(false)
- .fields(
- Arrays.asList(
- InternalField.builder()
- .name("requiredTimestampNtz")
- .schema(
- InternalSchema.builder()
- .name("timestampNtz")
- .dataType(InternalType.TIMESTAMP_NTZ)
- .isNullable(false)
- .build())
- .build(),
- InternalField.builder()
- .name("optionalTimestampNtz")
- .schema(
- InternalSchema.builder()
- .name("timestampNtz")
- .dataType(InternalType.TIMESTAMP_NTZ)
- .isNullable(true)
- .build())
-
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
- .build()))
- .build();
-
StructType structRepresentationTimestamp =
new StructType()
.add("requiredTimestamp", DataTypes.TimestampType, false)
.add("optionalTimestamp", DataTypes.TimestampType, true);
- StructType structRepresentationTimestampNtz =
- new StructType()
- .add("requiredTimestampNtz", DataTypes.LongType, false)
- .add("optionalTimestampNtz", DataTypes.LongType, true);
-
- Assertions.assertEquals(
- structRepresentationTimestamp,
-
DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchemaTimestamp));
Assertions.assertEquals(
internalSchemaTimestamp,
DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentationTimestamp));
- Assertions.assertEquals(
- structRepresentationTimestampNtz,
-
DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchemaTimestampNtz));
- }
-
- @Test
- public void testEnums() {
- Map<InternalSchema.MetadataKey, Object> requiredEnumMetadata = new
HashMap<>();
- requiredEnumMetadata.put(InternalSchema.MetadataKey.ENUM_VALUES,
Arrays.asList("ONE", "TWO"));
- Map<InternalSchema.MetadataKey, Object> optionalEnumMetadata = new
HashMap<>();
- optionalEnumMetadata.put(
- InternalSchema.MetadataKey.ENUM_VALUES, Arrays.asList("THREE",
"FOUR"));
-
- InternalSchema internalSchema =
- InternalSchema.builder()
- .name("struct")
- .dataType(InternalType.RECORD)
- .isNullable(false)
- .fields(
- Arrays.asList(
- InternalField.builder()
- .name("requiredEnum")
- .schema(
- InternalSchema.builder()
- .name("REQUIRED_ENUM")
- .dataType(InternalType.ENUM)
- .isNullable(false)
- .metadata(requiredEnumMetadata)
- .build())
- .build(),
- InternalField.builder()
- .name("optionalEnum")
- .schema(
- InternalSchema.builder()
- .name("OPTIONAL_ENUM")
- .dataType(InternalType.ENUM)
- .isNullable(true)
- .metadata(optionalEnumMetadata)
- .build())
-
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
- .build()))
- .build();
-
- StructType structRepresentation =
- new StructType()
- .add("requiredEnum", DataTypes.StringType, false)
- .add("optionalEnum", DataTypes.StringType, true);
-
- Assertions.assertEquals(
- structRepresentation,
- DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchema));
}
@Test
@@ -567,9 +445,6 @@ public class TestDeltaSchemaExtractor {
false)
.add("recordMap", DataTypes.createMapType(DataTypes.IntegerType,
mapElement, true));
- Assertions.assertEquals(
- structRepresentation,
- DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchema));
Assertions.assertEquals(
internalSchema,
DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation));
}
@@ -660,9 +535,6 @@ public class TestDeltaSchemaExtractor {
.add("intList", DataTypes.createArrayType(DataTypes.IntegerType,
false), false)
.add("recordList", DataTypes.createArrayType(elementSchema, true),
true);
- Assertions.assertEquals(
- structRepresentation,
- DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchema));
Assertions.assertEquals(
internalSchema,
DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation));
}
@@ -757,9 +629,6 @@ public class TestDeltaSchemaExtractor {
false),
true,
"comment");
- Assertions.assertEquals(
- structRepresentation,
- DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchema));
Assertions.assertEquals(
internalSchema,
DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation));
}
@@ -939,9 +808,6 @@ public class TestDeltaSchemaExtractor {
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
.build()))
.build();
- Assertions.assertEquals(
- structRepresentation,
- DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchema));
Assertions.assertEquals(
internalSchema,
DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation));
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogPartitionSyncTool.java
b/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogPartitionSyncTool.java
new file mode 100644
index 00000000..0c33013a
--- /dev/null
+++
b/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogPartitionSyncTool.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.hudi.catalog;
+
+import static
org.apache.xtable.hudi.catalog.HudiCatalogPartitionSyncTool.LAST_COMMIT_COMPLETION_TIME_SYNC;
+import static
org.apache.xtable.hudi.catalog.HudiCatalogPartitionSyncTool.LAST_COMMIT_TIME_SYNC;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import lombok.SneakyThrows;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.sync.common.model.PartitionValueExtractor;
+
+import org.apache.xtable.avro.AvroSchemaConverter;
+import org.apache.xtable.catalog.CatalogPartition;
+import org.apache.xtable.catalog.CatalogPartitionSyncOperations;
+import org.apache.xtable.hudi.HudiTableManager;
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+
+@ExtendWith(MockitoExtension.class)
+public class TestHudiCatalogPartitionSyncTool {
+
+ protected static final String HMS_DATABASE = "hms_db";
+ protected static final String HMS_TABLE = "hms_table";
+ protected static final String TEST_BASE_PATH = "test-base-path";
+
+ protected static String avroSchema =
+
"{\"type\":\"record\",\"name\":\"SimpleRecord\",\"namespace\":\"com.example\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"partitionKey\",\"type\":\"string\"}]}";
+ protected static final InternalTable TEST_INTERNAL_TABLE_WITH_SCHEMA =
+ InternalTable.builder()
+ .basePath(TEST_BASE_PATH)
+ .readSchema(
+ AvroSchemaConverter.getInstance()
+ .toInternalSchema(new Schema.Parser().parse(avroSchema)))
+ .partitioningFields(
+ Collections.singletonList(
+ InternalPartitionField.builder()
+ .sourceField(
+ InternalField.builder()
+ .name("partitionKey")
+ .schema(
+
InternalSchema.builder().dataType(InternalType.STRING).build())
+ .build())
+ .build()))
+ .build();
+
+ protected static final ThreePartHierarchicalTableIdentifier
TEST_TABLE_IDENTIFIER =
+ new ThreePartHierarchicalTableIdentifier(HMS_DATABASE, HMS_TABLE);
+
+ @Mock private CatalogPartitionSyncOperations mockCatalogClient;
+ @Mock private HoodieTableMetaClient mockMetaClient;
+ @Mock private PartitionValueExtractor mockPartitionValueExtractor;
+ @Mock private HudiTableManager mockHudiTableManager;
+
+ private final Configuration mockConfiguration = new Configuration();
+ private HudiCatalogPartitionSyncTool mockHudiCatalogPartitionSyncTool;
+
+ private HudiCatalogPartitionSyncTool createMockHudiPartitionSyncTool() {
+ return new HudiCatalogPartitionSyncTool(
+ mockCatalogClient, mockHudiTableManager, mockPartitionValueExtractor,
mockConfiguration);
+ }
+
+ private void setupCommonMocks() {
+ mockHudiCatalogPartitionSyncTool = createMockHudiPartitionSyncTool();
+ }
+
+ @SneakyThrows
+ @Test
+ void testSyncAllPartitions() {
+ setupCommonMocks();
+
+ String partitionKey1 = "key1";
+ String partitionKey2 = "key2";
+ ZonedDateTime zonedDateTime =
+
Instant.ofEpochMilli(System.currentTimeMillis()).atZone(ZoneId.systemDefault());
+ try (MockedStatic<ZonedDateTime> mockZonedDateTime =
mockStatic(ZonedDateTime.class);
+ MockedStatic<FSUtils> mockFSUtils = mockStatic(FSUtils.class)) {
+ mockZonedDateTime.when(ZonedDateTime::now).thenReturn(zonedDateTime);
+ List<String> mockedPartitions = Arrays.asList(partitionKey1,
partitionKey2);
+ mockFSUtils
+ .when(() -> FSUtils.getAllPartitionPaths(any(), eq(TEST_BASE_PATH),
eq(true), eq(false)))
+ .thenReturn(mockedPartitions);
+ mockFSUtils
+ .when(() -> FSUtils.getPartitionPath(new Path(TEST_BASE_PATH),
partitionKey1))
+ .thenReturn(new Path(TEST_BASE_PATH + "/" + partitionKey1));
+ mockFSUtils
+ .when(() -> FSUtils.getPartitionPath(new Path(TEST_BASE_PATH),
partitionKey2))
+ .thenReturn(new Path(TEST_BASE_PATH + "/" + partitionKey2));
+ when(mockHudiTableManager.loadTableMetaClientIfExists(TEST_BASE_PATH))
+ .thenReturn(Optional.of(mockMetaClient));
+ when(mockMetaClient.getBasePathV2()).thenReturn(new
Path(TEST_BASE_PATH));
+
when(mockPartitionValueExtractor.extractPartitionValuesInPath(partitionKey1))
+ .thenReturn(Collections.singletonList(partitionKey1));
+
when(mockPartitionValueExtractor.extractPartitionValuesInPath(partitionKey2))
+ .thenReturn(Collections.singletonList(partitionKey2));
+
+ HoodieActiveTimeline mockTimeline = mock(HoodieActiveTimeline.class);
+ HoodieInstant instant1 =
+ new HoodieInstant(HoodieInstant.State.COMPLETED, "replacecommit",
"100", "1000");
+ HoodieInstant instant2 =
+ new HoodieInstant(HoodieInstant.State.COMPLETED, "replacecommit",
"101", "1100");
+ when(mockTimeline.countInstants()).thenReturn(2);
+ when(mockTimeline.lastInstant()).thenReturn(Option.of(instant2));
+ when(mockTimeline.getInstantsOrderedByStateTransitionTime())
+ .thenReturn(Stream.of(instant1, instant2));
+ when(mockMetaClient.getActiveTimeline()).thenReturn(mockTimeline);
+
+ CatalogPartition p1 =
+ new CatalogPartition(Collections.singletonList(partitionKey1),
partitionKey1);
+ when(mockCatalogClient.getAllPartitions(TEST_TABLE_IDENTIFIER))
+ .thenReturn(Collections.singletonList(p1));
+
+ assertTrue(
+ mockHudiCatalogPartitionSyncTool.syncPartitions(
+ TEST_INTERNAL_TABLE_WITH_SCHEMA, TEST_TABLE_IDENTIFIER));
+
+ ArgumentCaptor<List<CatalogPartition>> addPartitionsCaptor =
+ ArgumentCaptor.forClass(List.class);
+ verify(mockCatalogClient, times(1))
+ .addPartitionsToTable(eq(TEST_TABLE_IDENTIFIER),
addPartitionsCaptor.capture());
+ List<CatalogPartition> addedPartitions = addPartitionsCaptor.getValue();
+ assertEquals(1, addedPartitions.size());
+ assertEquals(
+ TEST_BASE_PATH + "/" + partitionKey2,
addedPartitions.get(0).getStorageLocation());
+ assertEquals(1, addedPartitions.get(0).getValues().size());
+ assertEquals(partitionKey2, addedPartitions.get(0).getValues().get(0));
+
+ verify(mockCatalogClient, times(0)).dropPartitions(any(), anyList());
+
+ ArgumentCaptor<Map<String, String>> lastSyncedPropertiesCaptor =
+ ArgumentCaptor.forClass(Map.class);
+ verify(mockCatalogClient, times(1))
+ .updateTableProperties(eq(TEST_TABLE_IDENTIFIER),
lastSyncedPropertiesCaptor.capture());
+ Map<String, String> lastSyncedProperties =
lastSyncedPropertiesCaptor.getValue();
+ assertEquals("101", lastSyncedProperties.get(LAST_COMMIT_TIME_SYNC));
+ assertEquals("1100",
lastSyncedProperties.get(LAST_COMMIT_COMPLETION_TIME_SYNC));
+ }
+ }
+
+ @SneakyThrows
+ @Test
+ void testSyncPartitionsSinceLastSyncTime() {
+ setupCommonMocks();
+
+ String partitionKey1 = "key1";
+ String partitionKey2 = "key2";
+ String partitionKey3 = "key3";
+ ZonedDateTime zonedDateTime =
+
Instant.ofEpochMilli(System.currentTimeMillis()).atZone(ZoneId.systemDefault());
+ try (MockedStatic<ZonedDateTime> mockZonedDateTime =
mockStatic(ZonedDateTime.class);
+ MockedStatic<FSUtils> mockFSUtils = mockStatic(FSUtils.class);
+ MockedStatic<TimelineUtils> mockedTimelineUtils =
mockStatic(TimelineUtils.class)) {
+ mockZonedDateTime.when(ZonedDateTime::now).thenReturn(zonedDateTime);
+ List<String> mockedPartitions = Arrays.asList(partitionKey1,
partitionKey2);
+ mockFSUtils
+ .when(() -> FSUtils.getAllPartitionPaths(any(), eq(TEST_BASE_PATH),
eq(true), eq(false)))
+ .thenReturn(mockedPartitions);
+ mockFSUtils
+ .when(() -> FSUtils.getPartitionPath(new Path(TEST_BASE_PATH),
partitionKey2))
+ .thenReturn(new Path(TEST_BASE_PATH + "/" + partitionKey2));
+ mockFSUtils
+ .when(() -> FSUtils.getPartitionPath(new Path(TEST_BASE_PATH),
partitionKey3))
+ .thenReturn(new Path(TEST_BASE_PATH + "/" + partitionKey3));
+ when(mockHudiTableManager.loadTableMetaClientIfExists(TEST_BASE_PATH))
+ .thenReturn(Optional.of(mockMetaClient));
+ when(mockMetaClient.getBasePathV2()).thenReturn(new
Path(TEST_BASE_PATH));
+
when(mockPartitionValueExtractor.extractPartitionValuesInPath(partitionKey2))
+ .thenReturn(Collections.singletonList(partitionKey2));
+
when(mockPartitionValueExtractor.extractPartitionValuesInPath(partitionKey3))
+ .thenReturn(Collections.singletonList(partitionKey3));
+
+ HoodieActiveTimeline mockTimeline = mock(HoodieActiveTimeline.class);
+ HoodieInstant instant1 =
+ new HoodieInstant(HoodieInstant.State.COMPLETED, "replacecommit",
"100", "1000");
+ HoodieInstant instant2 =
+ new HoodieInstant(HoodieInstant.State.COMPLETED, "replacecommit",
"101", "1100");
+
+ when(mockTimeline.countInstants()).thenReturn(2);
+ when(mockTimeline.lastInstant()).thenReturn(Option.of(instant2));
+ when(mockTimeline.getInstantsOrderedByStateTransitionTime())
+ .thenReturn(Stream.of(instant1, instant2));
+ when(mockMetaClient.getActiveTimeline()).thenReturn(mockTimeline);
+
+ Map<String, String> lastSyncedTimeProperties = new HashMap<>();
+ lastSyncedTimeProperties.put(LAST_COMMIT_TIME_SYNC, "100");
+ lastSyncedTimeProperties.put(LAST_COMMIT_COMPLETION_TIME_SYNC, "1000");
+ when(mockCatalogClient.getTableProperties(
+ TEST_TABLE_IDENTIFIER,
+ Arrays.asList(LAST_COMMIT_TIME_SYNC,
LAST_COMMIT_COMPLETION_TIME_SYNC)))
+ .thenReturn(lastSyncedTimeProperties);
+ when(mockTimeline.isBeforeTimelineStarts("100")).thenReturn(false);
+
+ // prepare timeline util mocks
+ mockedTimelineUtils
+ .when(() -> TimelineUtils.getWrittenPartitions(any()))
+ .thenReturn(Arrays.asList(partitionKey2, partitionKey3));
+ mockedTimelineUtils
+ .when(
+ () -> TimelineUtils.getCommitsTimelineAfter(mockMetaClient,
"100", Option.of("1000")))
+ .thenReturn(mockTimeline);
+ mockedTimelineUtils
+ .when(() -> TimelineUtils.getDroppedPartitions(mockTimeline))
+ .thenReturn(Collections.singletonList(partitionKey2));
+
+ CatalogPartition p1 =
+ new CatalogPartition(
+ Collections.singletonList(partitionKey1), TEST_BASE_PATH + "/" +
partitionKey1);
+ CatalogPartition p2 =
+ new CatalogPartition(
+ Collections.singletonList(partitionKey2), TEST_BASE_PATH + "/" +
partitionKey2);
+ when(mockCatalogClient.getAllPartitions(TEST_TABLE_IDENTIFIER))
+ .thenReturn(Arrays.asList(p1, p2));
+
+ assertTrue(
+ mockHudiCatalogPartitionSyncTool.syncPartitions(
+ TEST_INTERNAL_TABLE_WITH_SCHEMA, TEST_TABLE_IDENTIFIER));
+
+ // verify add partitions
+ ArgumentCaptor<ThreePartHierarchicalTableIdentifier>
tableIdentifierArgumentCaptor =
+ ArgumentCaptor.forClass(ThreePartHierarchicalTableIdentifier.class);
+ ArgumentCaptor<List<CatalogPartition>> addPartitionsCaptor =
+ ArgumentCaptor.forClass(List.class);
+ verify(mockCatalogClient, times(1))
+ .addPartitionsToTable(
+ tableIdentifierArgumentCaptor.capture(),
addPartitionsCaptor.capture());
+ List<CatalogPartition> addedPartitions = addPartitionsCaptor.getValue();
+ assertEquals(
+ TEST_TABLE_IDENTIFIER.getDatabaseName(),
+ tableIdentifierArgumentCaptor.getValue().getDatabaseName());
+ assertEquals(
+ TEST_TABLE_IDENTIFIER.getTableName(),
+ tableIdentifierArgumentCaptor.getValue().getTableName());
+ assertEquals(1, addedPartitions.size());
+ assertEquals(
+ TEST_BASE_PATH + "/" + partitionKey3,
addedPartitions.get(0).getStorageLocation());
+ assertEquals(1, addedPartitions.get(0).getValues().size());
+ assertEquals(partitionKey3, addedPartitions.get(0).getValues().get(0));
+
+ // verify drop partitions
+ ArgumentCaptor<List<CatalogPartition>> dropPartitionsCaptor =
+ ArgumentCaptor.forClass(List.class);
+ verify(mockCatalogClient, times(1))
+ .dropPartitions(tableIdentifierArgumentCaptor.capture(),
dropPartitionsCaptor.capture());
+ List<CatalogPartition> droppedPartitions =
dropPartitionsCaptor.getValue();
+ assertEquals(
+ TEST_TABLE_IDENTIFIER.getDatabaseName(),
+ tableIdentifierArgumentCaptor.getValue().getDatabaseName());
+ assertEquals(
+ TEST_TABLE_IDENTIFIER.getTableName(),
+ tableIdentifierArgumentCaptor.getValue().getTableName());
+ assertEquals(droppedPartitions.size(), 1);
+ assertEquals(
+ TEST_BASE_PATH + "/" + partitionKey2,
droppedPartitions.get(0).getStorageLocation());
+ assertEquals(1, droppedPartitions.get(0).getValues().size());
+ assertEquals(partitionKey2, droppedPartitions.get(0).getValues().get(0));
+
+ // verify update last synced properties
+ ArgumentCaptor<Map<String, String>> lastSyncedPropertiesCaptor =
+ ArgumentCaptor.forClass(Map.class);
+ verify(mockCatalogClient, times(1))
+ .updateTableProperties(eq(TEST_TABLE_IDENTIFIER),
lastSyncedPropertiesCaptor.capture());
+ Map<String, String> lastSyncedProperties =
lastSyncedPropertiesCaptor.getValue();
+ assertEquals("101", lastSyncedProperties.get(LAST_COMMIT_TIME_SYNC));
+ assertEquals("1100",
lastSyncedProperties.get(LAST_COMMIT_COMPLETION_TIME_SYNC));
+ }
+ }
+}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogTableUtils.java
b/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogTableUtils.java
new file mode 100644
index 00000000..e39d63e5
--- /dev/null
+++
b/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogTableUtils.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.hudi.catalog;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+
+public class TestHudiCatalogTableUtils {
+
+ @Test
+ void testGetSparkTableProperties() {
+
+ List<String> partitionNames = Arrays.asList("region", "category");
+ String sparkVersion = "3.2.1";
+ int schemaLengthThreshold = 1000;
+ InternalSchema schema =
+ InternalSchema.builder()
+ .fields(
+ Arrays.asList(
+ InternalField.builder()
+ .name("id")
+ .schema(
+ InternalSchema.builder()
+ .dataType(InternalType.INT)
+ .isNullable(false)
+ .build())
+ .build(),
+ InternalField.builder()
+ .name("name")
+ .schema(
+ InternalSchema.builder()
+ .dataType(InternalType.STRING)
+ .isNullable(false)
+ .build())
+ .build(),
+ InternalField.builder()
+ .name("region")
+ .schema(
+ InternalSchema.builder()
+ .dataType(InternalType.STRING)
+ .isNullable(false)
+ .build())
+ .build(),
+ InternalField.builder()
+ .name("category")
+ .schema(
+ InternalSchema.builder()
+ .dataType(InternalType.STRING)
+ .isNullable(false)
+ .build())
+ .build()))
+ .dataType(InternalType.RECORD)
+ .name("testSchema")
+ .build();
+
+ Map<String, String> result =
+ HudiCatalogTableUtils.getSparkTableProperties(
+ partitionNames, sparkVersion, schemaLengthThreshold, schema);
+
+ // Validate results
+ assertEquals("hudi", result.get("spark.sql.sources.provider"));
+ assertEquals("3.2.1", result.get("spark.sql.create.version"));
+ assertEquals("1", result.get("spark.sql.sources.schema.numParts"));
+ assertEquals(
+
"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}},{\"name\":\"region\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}},{\"name\":\"category\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}",
+ result.get("spark.sql.sources.schema.part.0"));
+ assertEquals("2", result.get("spark.sql.sources.schema.numPartCols"));
+ assertEquals("region", result.get("spark.sql.sources.schema.partCol.0"));
+ assertEquals("category", result.get("spark.sql.sources.schema.partCol.1"));
+ }
+
+ @Test
+ void testGetSparkTablePropertiesEmptyPartitions() {
+ // Setup input data with no partitions
+ List<String> partitionNames = Collections.emptyList();
+ int schemaLengthThreshold = 50;
+ InternalSchema schema =
+ InternalSchema.builder()
+ .fields(
+ Arrays.asList(
+ InternalField.builder()
+ .name("id")
+ .schema(
+ InternalSchema.builder()
+ .dataType(InternalType.INT)
+ .isNullable(false)
+ .build())
+ .build(),
+ InternalField.builder()
+ .name("name")
+ .schema(
+ InternalSchema.builder()
+ .dataType(InternalType.STRING)
+ .isNullable(false)
+ .build())
+ .build()))
+ .dataType(InternalType.RECORD)
+ .name("testSchema")
+ .build();
+
+ // Call the method
+ Map<String, String> result =
+ HudiCatalogTableUtils.getSparkTableProperties(
+ partitionNames, "", schemaLengthThreshold, schema);
+
+ assertEquals("hudi", result.get("spark.sql.sources.provider"));
+ assertNull(result.get("spark.sql.create.version"));
+ assertEquals("4", result.get("spark.sql.sources.schema.numParts"));
+ assertEquals(
+
"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}",
+ result.get("spark.sql.sources.schema.part.0")
+ + result.get("spark.sql.sources.schema.part.1")
+ + result.get("spark.sql.sources.schema.part.2")
+ + result.get("spark.sql.sources.schema.part.3"));
+ assertNull(result.get("spark.sql.sources.schema.numPartCols"));
+ }
+}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiInputFormatUtils.java
b/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiInputFormatUtils.java
new file mode 100644
index 00000000..3d28efc0
--- /dev/null
+++
b/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiInputFormatUtils.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.hudi.catalog;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.hudi.common.model.HoodieFileFormat;
+
+import org.apache.xtable.exception.NotSupportedException;
+
+public class TestHudiInputFormatUtils {
+
+ @Test
+ void testGetInputFormatClassName_Parquet() {
+ String inputFormat =
+ HudiInputFormatUtils.getInputFormatClassName(HoodieFileFormat.PARQUET,
false);
+ assertEquals("org.apache.hudi.hadoop.HoodieParquetInputFormat",
inputFormat);
+
+ String realtimeInputFormat =
+ HudiInputFormatUtils.getInputFormatClassName(HoodieFileFormat.PARQUET,
true);
+ assertEquals(
+ "org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat",
realtimeInputFormat);
+ }
+
+ @Test
+ void testGetInputFormatClassName_HFile() {
+ String inputFormat =
+ HudiInputFormatUtils.getInputFormatClassName(HoodieFileFormat.HFILE,
false);
+ assertEquals("org.apache.hudi.hadoop.HoodieHFileInputFormat", inputFormat);
+
+ String realtimeInputFormat =
+ HudiInputFormatUtils.getInputFormatClassName(HoodieFileFormat.HFILE,
true);
+ assertEquals(
+ "org.apache.hudi.hadoop.realtime.HoodieHFileRealtimeInputFormat",
realtimeInputFormat);
+ }
+
+ @Test
+ void testGetInputFormatClassName_Orc() {
+ String inputFormat =
HudiInputFormatUtils.getInputFormatClassName(HoodieFileFormat.ORC, false);
+ assertEquals("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat",
inputFormat);
+ }
+
+ @Test
+ void testGetInputFormatClassName_UnsupportedFormat() {
+ Exception exception =
+ assertThrows(
+ NotSupportedException.class,
+ () ->
HudiInputFormatUtils.getInputFormatClassName(HoodieFileFormat.HOODIE_LOG,
false));
+ assertTrue(
+ exception.getMessage().contains("Hudi InputFormat not implemented for
base file format"));
+ }
+
+ @Test
+ void testGetOutputFormatClassName() {
+ String parquetOutputFormat =
+
HudiInputFormatUtils.getOutputFormatClassName(HoodieFileFormat.PARQUET);
+ assertEquals(
+ "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
parquetOutputFormat);
+
+ String hfileOutputFormat =
+ HudiInputFormatUtils.getOutputFormatClassName(HoodieFileFormat.HFILE);
+ assertEquals(
+ "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
hfileOutputFormat);
+
+ String orcOutputFormat =
HudiInputFormatUtils.getOutputFormatClassName(HoodieFileFormat.ORC);
+ assertEquals("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat",
orcOutputFormat);
+ }
+
+ @Test
+ void testGetOutputFormatClassName_UnsupportedFormat() {
+ Exception exception =
+ assertThrows(
+ NotSupportedException.class,
+ () ->
HudiInputFormatUtils.getOutputFormatClassName(HoodieFileFormat.HOODIE_LOG));
+ assertTrue(exception.getMessage().contains("No OutputFormat for base file
format"));
+ }
+
+ @Test
+ void testGetSerDeClassName() {
+ String parquetSerDe =
HudiInputFormatUtils.getSerDeClassName(HoodieFileFormat.PARQUET);
+
assertEquals("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
parquetSerDe);
+
+ String hfileSerDe =
HudiInputFormatUtils.getSerDeClassName(HoodieFileFormat.HFILE);
+
assertEquals("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
hfileSerDe);
+
+ String orcSerDe =
HudiInputFormatUtils.getSerDeClassName(HoodieFileFormat.ORC);
+ assertEquals("org.apache.hadoop.hive.ql.io.orc.OrcSerde", orcSerDe);
+ }
+
+ @Test
+ void testGetSerDeClassName_UnsupportedFormat() {
+ Exception exception =
+ assertThrows(
+ NotSupportedException.class,
+ () ->
HudiInputFormatUtils.getSerDeClassName(HoodieFileFormat.HOODIE_LOG));
+ assertTrue(exception.getMessage().contains("No SerDe for base file
format"));
+ }
+}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java
b/xtable-core/src/test/java/org/apache/xtable/schema/TestSparkSchemaExtractor.java
similarity index 73%
copy from
xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java
copy to
xtable-core/src/test/java/org/apache/xtable/schema/TestSparkSchemaExtractor.java
index 361245fe..59385f05 100644
---
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java
+++
b/xtable-core/src/test/java/org/apache/xtable/schema/TestSparkSchemaExtractor.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.xtable.delta;
+package org.apache.xtable.schema;
import java.util.Arrays;
import java.util.Collections;
@@ -24,8 +24,6 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.spark.sql.types.DataTypes;
-import org.apache.spark.sql.types.Metadata;
-import org.apache.spark.sql.types.MetadataBuilder;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -34,15 +32,14 @@ import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.schema.InternalType;
-public class TestDeltaSchemaExtractor {
-
+public class TestSparkSchemaExtractor {
@Test
public void testPrimitiveTypes() {
Map<InternalSchema.MetadataKey, Object> decimalMetadata = new HashMap<>();
decimalMetadata.put(InternalSchema.MetadataKey.DECIMAL_PRECISION, 10);
decimalMetadata.put(InternalSchema.MetadataKey.DECIMAL_SCALE, 2);
- InternalSchema internalSchema =
+ InternalSchema InternalSchemaRepresentation =
InternalSchema.builder()
.name("struct")
.dataType(InternalType.RECORD)
@@ -56,7 +53,6 @@ public class TestDeltaSchemaExtractor {
.name("boolean")
.dataType(InternalType.BOOLEAN)
.isNullable(false)
- .comment("requiredBooleanComment")
.build())
.build(),
InternalField.builder()
@@ -227,7 +223,7 @@ public class TestDeltaSchemaExtractor {
StructType structRepresentation =
new StructType()
- .add("requiredBoolean", DataTypes.BooleanType, false,
"requiredBooleanComment")
+ .add("requiredBoolean", DataTypes.BooleanType, false)
.add("optionalBoolean", DataTypes.BooleanType, true)
.add("requiredInt", DataTypes.IntegerType, false)
.add("optionalInt", DataTypes.IntegerType, true)
@@ -248,14 +244,12 @@ public class TestDeltaSchemaExtractor {
Assertions.assertEquals(
structRepresentation,
- DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchema));
- Assertions.assertEquals(
- internalSchema,
DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation));
+
SparkSchemaExtractor.getInstance().fromInternalSchema(InternalSchemaRepresentation));
}
@Test
public void testFixedBytes() {
- InternalSchema internalSchemaOriginal =
+ InternalSchema InternalSchemaRepresentationOriginal =
InternalSchema.builder()
.name("struct")
.dataType(InternalType.RECORD)
@@ -269,7 +263,6 @@ public class TestDeltaSchemaExtractor {
.name("fixed")
.dataType(InternalType.FIXED)
.isNullable(false)
- .comment("comment")
.build())
.build(),
InternalField.builder()
@@ -284,7 +277,7 @@ public class TestDeltaSchemaExtractor {
.build()))
.build();
- InternalSchema internalSchemaAfterRoundTrip =
+ InternalSchema InternalSchemaRepresentationAfterRoundTrip =
InternalSchema.builder()
.name("struct")
.dataType(InternalType.RECORD)
@@ -298,7 +291,6 @@ public class TestDeltaSchemaExtractor {
.name("binary")
.dataType(InternalType.BYTES)
.isNullable(false)
- .comment("comment")
.build())
.build(),
InternalField.builder()
@@ -314,15 +306,13 @@ public class TestDeltaSchemaExtractor {
.build();
StructType structRepresentation =
new StructType()
- .add("requiredFixed", DataTypes.BinaryType, false, "comment")
+ .add("requiredFixed", DataTypes.BinaryType, false)
.add("optionalFixed", DataTypes.BinaryType, true);
Assertions.assertEquals(
structRepresentation,
-
DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchemaOriginal));
- Assertions.assertEquals(
- internalSchemaAfterRoundTrip,
-
DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation));
+ SparkSchemaExtractor.getInstance()
+ .fromInternalSchema(InternalSchemaRepresentationOriginal));
}
@Test
@@ -330,7 +320,7 @@ public class TestDeltaSchemaExtractor {
Map<InternalSchema.MetadataKey, Object> metadata =
Collections.singletonMap(
InternalSchema.MetadataKey.TIMESTAMP_PRECISION,
InternalSchema.MetadataValue.MICROS);
- InternalSchema internalSchemaTimestamp =
+ InternalSchema InternalSchemaRepresentationTimestamp =
InternalSchema.builder()
.name("struct")
.dataType(InternalType.RECORD)
@@ -360,7 +350,7 @@ public class TestDeltaSchemaExtractor {
.build()))
.build();
- InternalSchema internalSchemaTimestampNtz =
+ InternalSchema InternalSchemaRepresentationTimestampNtz =
InternalSchema.builder()
.name("struct")
.dataType(InternalType.RECORD)
@@ -400,13 +390,12 @@ public class TestDeltaSchemaExtractor {
Assertions.assertEquals(
structRepresentationTimestamp,
-
DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchemaTimestamp));
- Assertions.assertEquals(
- internalSchemaTimestamp,
-
DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentationTimestamp));
+ SparkSchemaExtractor.getInstance()
+ .fromInternalSchema(InternalSchemaRepresentationTimestamp));
Assertions.assertEquals(
structRepresentationTimestampNtz,
-
DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchemaTimestampNtz));
+ SparkSchemaExtractor.getInstance()
+ .fromInternalSchema(InternalSchemaRepresentationTimestampNtz));
}
@Test
@@ -417,7 +406,7 @@ public class TestDeltaSchemaExtractor {
optionalEnumMetadata.put(
InternalSchema.MetadataKey.ENUM_VALUES, Arrays.asList("THREE",
"FOUR"));
- InternalSchema internalSchema =
+ InternalSchema InternalSchemaRepresentation =
InternalSchema.builder()
.name("struct")
.dataType(InternalType.RECORD)
@@ -454,7 +443,7 @@ public class TestDeltaSchemaExtractor {
Assertions.assertEquals(
structRepresentation,
- DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchema));
+
SparkSchemaExtractor.getInstance().fromInternalSchema(InternalSchemaRepresentation));
}
@Test
@@ -488,7 +477,7 @@ public class TestDeltaSchemaExtractor {
.build()))
.dataType(InternalType.RECORD)
.build();
- InternalSchema internalSchema =
+ InternalSchema InternalSchemaRepresentation =
InternalSchema.builder()
.name("struct")
.dataType(InternalType.RECORD)
@@ -569,9 +558,7 @@ public class TestDeltaSchemaExtractor {
Assertions.assertEquals(
structRepresentation,
- DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchema));
- Assertions.assertEquals(
- internalSchema,
DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation));
+
SparkSchemaExtractor.getInstance().fromInternalSchema(InternalSchemaRepresentation));
}
@Test
@@ -605,7 +592,7 @@ public class TestDeltaSchemaExtractor {
.build()))
.dataType(InternalType.RECORD)
.build();
- InternalSchema internalSchema =
+ InternalSchema InternalSchemaRepresentation =
InternalSchema.builder()
.name("struct")
.dataType(InternalType.RECORD)
@@ -662,14 +649,12 @@ public class TestDeltaSchemaExtractor {
Assertions.assertEquals(
structRepresentation,
- DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchema));
- Assertions.assertEquals(
- internalSchema,
DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation));
+
SparkSchemaExtractor.getInstance().fromInternalSchema(InternalSchemaRepresentation));
}
@Test
public void testNestedRecords() {
- InternalSchema internalSchema =
+ InternalSchema InternalSchemaRepresentation =
InternalSchema.builder()
.name("struct")
.dataType(InternalType.RECORD)
@@ -684,7 +669,6 @@ public class TestDeltaSchemaExtractor {
.name("struct")
.dataType(InternalType.RECORD)
.isNullable(true)
- .comment("comment")
.fields(
Arrays.asList(
InternalField.builder()
@@ -695,7 +679,6 @@ public class TestDeltaSchemaExtractor {
.name("integer")
.dataType(InternalType.INT)
.isNullable(true)
-
.comment("nestedOptionalIntComment")
.build())
.defaultValue(
InternalField.Constants.NULL_DEFAULT_VALUE)
@@ -745,204 +728,15 @@ public class TestDeltaSchemaExtractor {
.add(
"nestedOne",
new StructType()
- .add(
- "nestedOptionalInt",
- DataTypes.IntegerType,
- true,
- "nestedOptionalIntComment")
+ .add("nestedOptionalInt", DataTypes.IntegerType, true)
.add("nestedRequiredDouble", DataTypes.DoubleType, false)
.add(
"nestedTwo",
new StructType().add("doublyNestedString",
DataTypes.StringType, true),
false),
- true,
- "comment");
- Assertions.assertEquals(
- structRepresentation,
- DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchema));
- Assertions.assertEquals(
- internalSchema,
DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation));
- }
-
- @Test
- public void testFieldIdsInDeltaSchema() {
- StructType structRepresentation =
- new StructType()
- .add(
- "nestedOne",
- new StructType()
- .add(
- "nestedOptionalInt",
- DataTypes.IntegerType,
- true,
- Metadata.fromJson("{\"delta.columnMapping.id\": 3}"))
- .add(
- "nestedRequiredDouble",
- DataTypes.DoubleType,
- false,
- Metadata.fromJson("{\"delta.columnMapping.id\": 5}"))
- .add(
- "nestedTwo",
- new StructType()
- .add(
- "doublyNestedString",
- DataTypes.StringType,
- true,
-
Metadata.fromJson("{\"delta.columnMapping.id\": 12}")),
- false,
- Metadata.fromJson("{\"delta.columnMapping.id\": 10}")),
- true,
- Metadata.fromJson("{\"delta.columnMapping.id\": 2}"));
-
- InternalSchema internalSchema =
- InternalSchema.builder()
- .name("struct")
- .dataType(InternalType.RECORD)
- .isNullable(false)
- .fields(
- Collections.singletonList(
- InternalField.builder()
- .name("nestedOne")
- .fieldId(2)
-
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
- .schema(
- InternalSchema.builder()
- .name("struct")
- .dataType(InternalType.RECORD)
- .isNullable(true)
- .fields(
- Arrays.asList(
- InternalField.builder()
- .name("nestedOptionalInt")
- .fieldId(3)
- .parentPath("nestedOne")
- .schema(
- InternalSchema.builder()
- .name("integer")
- .dataType(InternalType.INT)
- .isNullable(true)
- .build())
- .defaultValue(
-
InternalField.Constants.NULL_DEFAULT_VALUE)
- .build(),
- InternalField.builder()
- .name("nestedRequiredDouble")
- .fieldId(5)
- .parentPath("nestedOne")
- .schema(
- InternalSchema.builder()
- .name("double")
-
.dataType(InternalType.DOUBLE)
- .isNullable(false)
- .build())
- .build(),
- InternalField.builder()
- .name("nestedTwo")
- .fieldId(10)
- .parentPath("nestedOne")
- .schema(
- InternalSchema.builder()
- .name("struct")
-
.dataType(InternalType.RECORD)
- .isNullable(false)
- .fields(
-
Collections.singletonList(
-
InternalField.builder()
-
.name("doublyNestedString")
- .fieldId(12)
-
.parentPath("nestedOne.nestedTwo")
- .schema(
-
InternalSchema.builder()
-
.name("string")
-
.dataType(
-
InternalType.STRING)
-
.isNullable(true)
-
.build())
- .defaultValue(
-
InternalField.Constants
-
.NULL_DEFAULT_VALUE)
- .build()))
- .build())
- .build()))
- .build())
- .build()))
- .build();
- Assertions.assertEquals(
- internalSchema,
DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation));
- }
-
- @Test
- void generateColumnsAreNotTranslatedToInternalSchema() {
- StructType structRepresentation =
- new StructType()
- .add("birthDate", DataTypes.TimestampType, false)
- .add(
- "birthYear",
- DataTypes.TimestampType,
- true,
-
Metadata.fromJson("{\"delta.generationExpression\":\"YEAR(birthDate)\"}"));
- InternalSchema internalSchema =
- InternalSchema.builder()
- .dataType(InternalType.RECORD)
- .name("struct")
- .fields(
- Collections.singletonList(
- InternalField.builder()
- .schema(
- InternalSchema.builder()
- .name("timestamp")
- .dataType(InternalType.TIMESTAMP)
- .metadata(
- Collections.singletonMap(
-
InternalSchema.MetadataKey.TIMESTAMP_PRECISION,
- InternalSchema.MetadataValue.MICROS))
- .build())
- .name("birthDate")
- .build()))
- .build();
- Assertions.assertEquals(
- internalSchema,
DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation));
- }
-
- @Test
- public void testIcebergToDeltaUUIDSupport() {
- Metadata metadata =
- new MetadataBuilder().putString(InternalSchema.XTABLE_LOGICAL_TYPE,
"uuid").build();
- StructType structRepresentation =
- new StructType()
- .add("requiredUUID", DataTypes.BinaryType, false, metadata)
- .add("optionalUUID", DataTypes.BinaryType, true, metadata);
- InternalSchema internalSchema =
- InternalSchema.builder()
- .name("struct")
- .dataType(InternalType.RECORD)
- .isNullable(false)
- .fields(
- Arrays.asList(
- InternalField.builder()
- .name("requiredUUID")
- .schema(
- InternalSchema.builder()
- .name("binary")
- .dataType(InternalType.UUID)
- .isNullable(false)
- .build())
- .build(),
- InternalField.builder()
- .name("optionalUUID")
- .schema(
- InternalSchema.builder()
- .name("binary")
- .dataType(InternalType.UUID)
- .isNullable(true)
- .build())
-
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
- .build()))
- .build();
+ true);
Assertions.assertEquals(
structRepresentation,
- DeltaSchemaExtractor.getInstance().fromInternalSchema(internalSchema));
- Assertions.assertEquals(
- internalSchema,
DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation));
+
SparkSchemaExtractor.getInstance().fromInternalSchema(InternalSchemaRepresentation));
}
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java
b/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java
index 4505d851..75c3833a 100644
--- a/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java
+++ b/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java
@@ -21,6 +21,7 @@ package org.apache.xtable.testutil;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.Assertions;
@@ -148,6 +149,7 @@ public class ITTestUtils {
.name("source_table_name")
.basePath("file://base_path/v1/")
.formatName("ICEBERG")
+ .additionalProperties(new Properties())
.build();
}