This is an automated email from the ASF dual-hosted git repository. vinish pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/main by this push: new fc4d6e89 Add implementation for XTable REST Service (#704) fc4d6e89 is described below commit fc4d6e8960eb47b722ee2f319496e11fc7d33cfc Author: Rahil C <32500120+rahi...@users.noreply.github.com> AuthorDate: Fri May 9 22:20:02 2025 -0700 Add implementation for XTable REST Service (#704) * Add module for xtable rest service * Use builder pattern, spotless fix * Add unit test for conversion resource * Refactor code for easier testing, add unit test for service * Fix other failing modules tests by moving quarkus bom into xtable service * Add metadata helper utils, test other formats * Address Vinish intial set of comments * add unit test for util * minor fix for other unit test * spotless * fix dependency conflict to make unit tests run in ci * minor mvn fix * Address Vinish remaining comments * Address Vinish comments * Make sourceProviders private final --------- Co-authored-by: Vinish Reddy <vin...@apache.org> --- pom.xml | 11 +- xtable-service/README.md | 52 ++++ ...enshot 2025-05-01 at 9.04.59\342\200\257AM.png" | Bin 0 -> 277148 bytes ...enshot 2025-05-01 at 9.05.10\342\200\257AM.png" | Bin 0 -> 289179 bytes xtable-service/pom.xml | 174 +++++++++++++ .../apache/xtable/service/ConversionResource.java | 45 ++++ .../apache/xtable/service/ConversionService.java | 269 +++++++++++++++++++++ .../xtable/service/ConversionServiceConfig.java | 36 +++ .../xtable/service/models/ConvertTableRequest.java | 64 +++++ .../service/models/ConvertTableResponse.java | 39 +++ .../xtable/service/models/ConvertedTable.java | 48 ++++ .../src/main/resources/application.properties | 17 ++ .../src/main/resources/xtable-hadoop-defaults.xml | 91 +++++++ .../xtable/service/TestConversionResource.java | 79 ++++++ .../xtable/service/TestConversionService.java | 216 +++++++++++++++++ 15 files changed, 1139 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index d70d45de..bed4d63b 100644 --- a/pom.xml +++ b/pom.xml @@ -53,6 +53,7 @@ <module>xtable-utilities</module> <module>xtable-aws</module> <module>xtable-hive-metastore</module> + <module>xtable-service</module> </modules> <properties> @@ -63,6 +64,7 @@ <avro.version>1.11.4</avro.version> <log4j.version>2.22.0</log4j.version> <junit.version>5.11.4</junit.version> + <junit.platform.runner.version>1.11.4</junit.platform.runner.version> <lombok.version>1.18.36</lombok.version> <lombok-maven-plugin.version>1.18.20.0</lombok-maven-plugin.version> <hadoop.version>3.4.1</hadoop.version> @@ -97,6 +99,11 @@ <apache-jar-resource-bundle.version>1.7</apache-jar-resource-bundle.version> <apache-incubator-disclaimer-resource-bundle.version>1.7</apache-incubator-disclaimer-resource-bundle.version> <scala-collection-compat.version>2.12.0</scala-collection-compat.version> + <quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id> + <quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id> + <quarkus.platform.version>3.2.12.Final</quarkus.platform.version> <!-- compatible with Java 11 --> + <antlr4.version>4.9.3</antlr4.version> + <jol.core.version>0.16</jol.core.version> <!-- Test properties --> <skipTests>false</skipTests> @@ -457,7 +464,7 @@ <dependency> <groupId>org.openjdk.jol</groupId> <artifactId>jol-core</artifactId> - <version>0.16</version> + <version>${jol.core.version}</version> <scope>test</scope> </dependency> @@ -483,7 +490,7 @@ <dependency> <groupId>org.junit.platform</groupId> <artifactId>junit-platform-runner</artifactId> - <version>1.11.4</version> + <version>${junit.platform.runner.version}</version> <scope>test</scope> </dependency> <dependency> diff --git a/xtable-service/README.md b/xtable-service/README.md new file mode 100644 index 00000000..00a0b119 --- /dev/null +++ b/xtable-service/README.md @@ -0,0 +1,52 @@ +<!-- + - Licensed to the Apache Software Foundation (ASF) under one + - or more contributor license agreements. See the NOTICE file + - distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + --> + +# XTable REST Service + +The `rest-service-open-api.yaml` defines the api contract for running table format conversion using XTable's REST service. +See XTable's `spec` module for more details: https://github.com/apache/incubator-xtable/tree/main/spec + +## How to run the service locally + +#### Before running the service, ensure that you have the required credentials set in your enviroment needed to read and write to cloud storage. + +To run the service locally, first ensure you have built the project with +```sh +mvn clean install -DskipTests +``` + + +Then you can run start the quarkus service using the following command: +```sh +mvn quarkus:dev -pl xtable-service +``` +This will start the service on `http://localhost:8080`. + +Note quarkus will automatically reload the service when you make changes to the code. + +## Testing with Postman + +If you would like to test the service with an api client, you can download Postman https://www.postman.com/downloads/ + +Ensure that when you are testing that you have set the service URL, headers, and request body correctly. +See the screenshots below for an example. + + + + \ No newline at end of file diff --git "a/xtable-service/examples/Screenshot 2025-05-01 at 9.04.59\342\200\257AM.png" "b/xtable-service/examples/Screenshot 2025-05-01 at 9.04.59\342\200\257AM.png" new file mode 100644 index 00000000..2a7eb4da Binary files /dev/null and "b/xtable-service/examples/Screenshot 2025-05-01 at 9.04.59\342\200\257AM.png" differ diff --git "a/xtable-service/examples/Screenshot 2025-05-01 at 9.05.10\342\200\257AM.png" "b/xtable-service/examples/Screenshot 2025-05-01 at 9.05.10\342\200\257AM.png" new file mode 100644 index 00000000..5c541b25 Binary files /dev/null and "b/xtable-service/examples/Screenshot 2025-05-01 at 9.05.10\342\200\257AM.png" differ diff --git a/xtable-service/pom.xml b/xtable-service/pom.xml new file mode 100644 index 00000000..12dff680 --- /dev/null +++ b/xtable-service/pom.xml @@ -0,0 +1,174 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.xtable</groupId> + <artifactId>xtable</artifactId> + <version>0.2.0-SNAPSHOT</version> + </parent> + + <artifactId>xtable-service</artifactId> + + <dependencyManagement> + <dependencies> + <dependency> + <groupId>${quarkus.platform.group-id}</groupId> + <artifactId>${quarkus.platform.artifact-id}</artifactId> + <version>${quarkus.platform.version}</version> + <type>pom</type> + <scope>import</scope> + </dependency> + <dependency> + <groupId>org.antlr</groupId> + <artifactId>antlr4-runtime</artifactId> + <version>${antlr4.version}</version> + </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-reflect</artifactId> + <version>${scala.version}</version> + </dependency> + </dependencies> + </dependencyManagement> + + <dependencies> + <dependency> + <groupId>org.apache.xtable</groupId> + <artifactId>xtable-core_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-aws</artifactId> + </dependency> + + <!-- Spark --> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_${scala.binary.version}</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + <version>${jackson.version}</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${jackson.version}</version> + </dependency> + + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-arc</artifactId> + </dependency> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-resteasy-reactive</artifactId> + </dependency> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-resteasy-reactive-jackson</artifactId> + </dependency> + <dependency> + <groupId>io.rest-assured</groupId> + <artifactId>rest-assured</artifactId> + <scope>test</scope> + </dependency> + + <!-- Junit --> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-api</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-params</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-engine</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.platform</groupId> + <artifactId>junit-platform-commons</artifactId> + <version>${junit.platform.runner.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.platform</groupId> + <artifactId>junit-platform-engine</artifactId> + <version>${junit.platform.runner.version}</version> + <scope>test</scope> + </dependency> + + <!-- Mockito --> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-junit-jupiter</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.openjdk.jol</groupId> + <artifactId>jol-core</artifactId> + <version>${jol.core.version}</version> + <scope>runtime</scope> + </dependency> + + </dependencies> + <build> + <plugins> + <plugin> + <groupId>${quarkus.platform.group-id}</groupId> + <artifactId>quarkus-maven-plugin</artifactId> + <version>${quarkus.platform.version}</version> + <extensions>true</extensions> + <executions> + <execution> + <goals> + <goal>build</goal> + <goal>generate-code</goal> + <goal>generate-code-tests</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + + +</project> diff --git a/xtable-service/src/main/java/org/apache/xtable/service/ConversionResource.java b/xtable-service/src/main/java/org/apache/xtable/service/ConversionResource.java new file mode 100644 index 00000000..ba70b040 --- /dev/null +++ b/xtable-service/src/main/java/org/apache/xtable/service/ConversionResource.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.service; + +import org.apache.xtable.service.models.ConvertTableRequest; +import org.apache.xtable.service.models.ConvertTableResponse; + +import io.smallrye.common.annotation.Blocking; +import jakarta.inject.Inject; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.MediaType; + +@Path("/v1/conversion") +@Produces(MediaType.APPLICATION_JSON) +@Consumes(MediaType.APPLICATION_JSON) +public class ConversionResource { + + @Inject ConversionService conversionService; + + @POST + @Path("/table") + @Blocking + public ConvertTableResponse convertTable(ConvertTableRequest convertTableRequest) { + return conversionService.convertTable(convertTableRequest); + } +} diff --git a/xtable-service/src/main/java/org/apache/xtable/service/ConversionService.java b/xtable-service/src/main/java/org/apache/xtable/service/ConversionService.java new file mode 100644 index 00000000..1d4ad32e --- /dev/null +++ b/xtable-service/src/main/java/org/apache/xtable/service/ConversionService.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.service; + +import static org.apache.xtable.conversion.ConversionUtils.convertToSourceTable; +import static org.apache.xtable.model.storage.TableFormat.DELTA; +import static org.apache.xtable.model.storage.TableFormat.HUDI; +import static org.apache.xtable.model.storage.TableFormat.ICEBERG; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import lombok.extern.log4j.Log4j2; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.hudi.common.table.timeline.HoodieInstant; + +import org.apache.iceberg.SchemaParser; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.xtable.avro.AvroSchemaConverter; +import org.apache.xtable.conversion.ConversionConfig; +import org.apache.xtable.conversion.ConversionController; +import org.apache.xtable.conversion.ConversionSourceProvider; +import org.apache.xtable.conversion.SourceTable; +import org.apache.xtable.conversion.TargetTable; +import org.apache.xtable.delta.DeltaConversionSourceProvider; +import org.apache.xtable.hudi.HudiConversionSourceProvider; +import org.apache.xtable.iceberg.IcebergConversionSourceProvider; +import org.apache.xtable.iceberg.IcebergSchemaExtractor; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.storage.TableFormat; +import org.apache.xtable.schema.SparkSchemaExtractor; +import org.apache.xtable.service.models.ConvertTableRequest; +import org.apache.xtable.service.models.ConvertTableResponse; +import org.apache.xtable.service.models.ConvertedTable; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +/** + * Service for managing table format conversions. + * + * <p>It supports formats such as ICEBERG, HUDI, and DELTA. The conversion process involves creating + * a source table, generating target tables, and then executing the conversion via a designated + * conversion controller. + */ +@Log4j2 +@ApplicationScoped +public class ConversionService { + private final ConversionController conversionController; + private final ConversionServiceConfig serviceConfig; + private final Configuration hadoopConf; + private final Map<String, ConversionSourceProvider<?>> sourceProviders; + + /** + * Constructs a ConversionService instance with required dependencies. + * + * <p>This constructor initializes the ConversionService using the provided service configuration. + * It retrieves the Hadoop configuration, creates a new ConversionController with the Hadoop + * configuration, and initializes conversion source providers based on the Hadoop configuration. + * + * @param serviceConfig the conversion service configuration + */ + @Inject + public ConversionService(ConversionServiceConfig serviceConfig) { + this.serviceConfig = serviceConfig; + this.hadoopConf = getHadoopConf(); + this.conversionController = new ConversionController(hadoopConf); + this.sourceProviders = initSourceProviders(hadoopConf); + } + + /** + * Retrieves the Hadoop configuration. + * + * <p>This method creates a new {@code Configuration} instance, reads the Hadoop configuration + * file path from the service configuration, and attempts to load the configuration from the + * specified XML file. If no resources are loaded, it logs a warning. If an error occurs during + * configuration loading, it logs an error message. + * + * @return the initialized Hadoop {@code Configuration} + */ + private Configuration getHadoopConf() { + Configuration conf = new Configuration(); + String hadoopConfigPath = serviceConfig.getHadoopConfigPath(); + try { + // Load configuration from the specified XML file + conf.addResource(hadoopConfigPath); + + // If the resource wasn’t found, log a warning + if (conf.size() == 0) { + log.warn( + "Could not load Hadoop configuration from: {}. Using default Hadoop configuration.", + hadoopConfigPath); + } + } catch (Exception e) { + log.error( + "Error loading Hadoop configuration from: {}. Exception: {}", + hadoopConfigPath, + e.getMessage(), + e); + } + return conf; + } + + /** + * Initializes conversion source providers for different table formats using the provided Hadoop + * configuration. + * + * <p>This method creates and initializes source providers for HUDI, DELTA, and ICEBERG formats. + * Each provider is initialized with the given Hadoop configuration and then mapped to its + * respective table format identifier. + * + * @param hadoopConf the Hadoop configuration used to initialize the source providers + * @return a map mapping table format identifiers to their corresponding initialized conversion + * source providers + */ + private Map<String, ConversionSourceProvider<?>> initSourceProviders(Configuration hadoopConf) { + Map<String, ConversionSourceProvider<?>> sourceProviders = new HashMap<>(); + ConversionSourceProvider<HoodieInstant> hudiConversionSourceProvider = + new HudiConversionSourceProvider(); + ConversionSourceProvider<Long> deltaConversionSourceProvider = + new DeltaConversionSourceProvider(); + ConversionSourceProvider<org.apache.iceberg.Snapshot> icebergConversionSourceProvider = + new IcebergConversionSourceProvider(); + + hudiConversionSourceProvider.init(hadoopConf); + deltaConversionSourceProvider.init(hadoopConf); + icebergConversionSourceProvider.init(hadoopConf); + + sourceProviders.put(HUDI, hudiConversionSourceProvider); + sourceProviders.put(DELTA, deltaConversionSourceProvider); + sourceProviders.put(ICEBERG, icebergConversionSourceProvider); + + return sourceProviders; + } + + /** + * Constructs a new ConversionService instance for testing purposes. + * + * <p>This constructor is visible for testing using dependency injection. It allows the injection + * of a preconfigured ConversionController, Hadoop configuration, and source providers. + * + * @param serviceConfig the conversion service configuration + * @param conversionController a preconfigured conversion controller + * @param hadoopConf the Hadoop configuration to be used for initializing resources + * @param sourceProviders a map of conversion source providers keyed by table format + */ + @VisibleForTesting + public ConversionService( + ConversionServiceConfig serviceConfig, + ConversionController conversionController, + Configuration hadoopConf, + Map<String, ConversionSourceProvider<?>> sourceProviders) { + this.serviceConfig = serviceConfig; + this.conversionController = conversionController; + this.hadoopConf = hadoopConf; + this.sourceProviders = sourceProviders; + } + + /** + * Converts a source table to one or more target table formats. + * + * @param convertTableRequest the conversion request containing source table details and target + * formats + * @return a ConvertTableResponse containing details of the converted target tables + */ + public ConvertTableResponse convertTable(ConvertTableRequest convertTableRequest) { + SourceTable sourceTable = + SourceTable.builder() + .name(convertTableRequest.getSourceTableName()) + .basePath(convertTableRequest.getSourceTablePath()) + .formatName(convertTableRequest.getSourceFormat()) + .build(); + + List<TargetTable> targetTables = new ArrayList<>(); + for (String targetFormat : convertTableRequest.getTargetFormats()) { + TargetTable targetTable = + TargetTable.builder() + .name(convertTableRequest.getSourceTableName()) + .basePath(convertTableRequest.getSourceTablePath()) + .formatName(targetFormat) + .build(); + targetTables.add(targetTable); + } + + ConversionConfig conversionConfig = + ConversionConfig.builder().sourceTable(sourceTable).targetTables(targetTables).build(); + + conversionController.sync( + conversionConfig, sourceProviders.get(convertTableRequest.getSourceFormat())); + + List<ConvertedTable> convertedTables = new ArrayList<>(); + for (TargetTable targetTable : targetTables) { + InternalTable internalTable = + sourceProviders + .get(targetTable.getFormatName()) + .getConversionSourceInstance(convertToSourceTable(targetTable)) + .getCurrentTable(); + String schemaString = extractSchemaString(targetTable, internalTable); + convertedTables.add( + ConvertedTable.builder() + .targetFormat(internalTable.getName()) + .targetSchema(schemaString) + .targetMetadataPath(internalTable.getLatestMetdataPath()) + .build()); + } + return new ConvertTableResponse(convertedTables); + } + + /** + * Extracts the schema string from the given internal table based on the target table format. + * + * <p>This method supports the following table formats: + * + * <ul> + * <li><b>HUDI</b>: Converts the internal schema to an Avro schema and returns its string + * representation. + * <li><b>ICEBERG</b>: Converts the internal schema to an Iceberg schema and returns its JSON + * representation. + * <li><b>DELTA</b>: Converts the internal schema to a Spark schema and returns its JSON + * representation. + * </ul> + * + * @param targetTable the target table containing the desired format information + * @param internalTable the internal table from which the schema is read + * @return the string representation of the converted schema + * @throws UnsupportedOperationException if the target table format is not supported + */ + private String extractSchemaString(TargetTable targetTable, InternalTable internalTable) { + switch (targetTable.getFormatName()) { + case TableFormat.HUDI: + return AvroSchemaConverter.getInstance() + .fromInternalSchema(internalTable.getReadSchema()) + .toString(); + case TableFormat.ICEBERG: + org.apache.iceberg.Schema iceSchema = + IcebergSchemaExtractor.getInstance().toIceberg(internalTable.getReadSchema()); + return SchemaParser.toJson(iceSchema); + case TableFormat.DELTA: + return SparkSchemaExtractor.getInstance() + .fromInternalSchema(internalTable.getReadSchema()) + .json(); + default: + throw new UnsupportedOperationException( + "Unsupported table format: " + targetTable.getFormatName()); + } + } +} diff --git a/xtable-service/src/main/java/org/apache/xtable/service/ConversionServiceConfig.java b/xtable-service/src/main/java/org/apache/xtable/service/ConversionServiceConfig.java new file mode 100644 index 00000000..1da7c059 --- /dev/null +++ b/xtable-service/src/main/java/org/apache/xtable/service/ConversionServiceConfig.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.service; + +import org.eclipse.microprofile.config.inject.ConfigProperty; + +import jakarta.enterprise.context.ApplicationScoped; + +@ApplicationScoped +public class ConversionServiceConfig { + + public static final String HADOOP_DEFAULTS_XML = "xtable-hadoop-defaults.xml"; + + @ConfigProperty(name = "xtable.hadoop-config-path", defaultValue = HADOOP_DEFAULTS_XML) + private String hadoopConfigPath; + + public String getHadoopConfigPath() { + return hadoopConfigPath; + } +} diff --git a/xtable-service/src/main/java/org/apache/xtable/service/models/ConvertTableRequest.java b/xtable-service/src/main/java/org/apache/xtable/service/models/ConvertTableRequest.java new file mode 100644 index 00000000..465c3c0c --- /dev/null +++ b/xtable-service/src/main/java/org/apache/xtable/service/models/ConvertTableRequest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.service.models; + +import java.util.List; +import java.util.Map; + +import lombok.Builder; +import lombok.Getter; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +@Getter +@Builder +public class ConvertTableRequest { + @JsonProperty("source-format") + private String sourceFormat; + + @JsonProperty("source-table-name") + private String sourceTableName; + + @JsonProperty("source-table-path") + private String sourceTablePath; + + @JsonProperty("target-formats") + private List<String> targetFormats; + + @JsonProperty("configurations") + private Map<String, String> configurations; + + public ConvertTableRequest() {} + + @JsonCreator + public ConvertTableRequest( + @JsonProperty("source-format") String sourceFormat, + @JsonProperty("source-table-name") String sourceTableName, + @JsonProperty("source-table-path") String sourceTablePath, + @JsonProperty("target-format") List<String> targetFormat, + @JsonProperty("configurations") Map<String, String> configurations) { + + this.sourceFormat = sourceFormat; + this.sourceTableName = sourceTableName; + this.sourceTablePath = sourceTablePath; + this.targetFormats = targetFormat; + this.configurations = configurations; + } +} diff --git a/xtable-service/src/main/java/org/apache/xtable/service/models/ConvertTableResponse.java b/xtable-service/src/main/java/org/apache/xtable/service/models/ConvertTableResponse.java new file mode 100644 index 00000000..1581ea19 --- /dev/null +++ b/xtable-service/src/main/java/org/apache/xtable/service/models/ConvertTableResponse.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.service.models; + +import java.util.List; + +import lombok.Builder; +import lombok.Getter; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +@Getter +@Builder +public class ConvertTableResponse { + @JsonProperty("convertedTables") + private List<ConvertedTable> convertedTables; + + @JsonCreator + public ConvertTableResponse(@JsonProperty List<ConvertedTable> convertedTables) { + this.convertedTables = convertedTables; + } +} diff --git a/xtable-service/src/main/java/org/apache/xtable/service/models/ConvertedTable.java b/xtable-service/src/main/java/org/apache/xtable/service/models/ConvertedTable.java new file mode 100644 index 00000000..12bc915e --- /dev/null +++ b/xtable-service/src/main/java/org/apache/xtable/service/models/ConvertedTable.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.service.models; + +import lombok.Builder; +import lombok.Getter; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +@Getter +@Builder +public class ConvertedTable { + @JsonProperty("target-format") + private String targetFormat; + + @JsonProperty("target-metadata-path") + private String targetMetadataPath; + + @JsonProperty("target-schema") + private String targetSchema; + + @JsonCreator + public ConvertedTable( + @JsonProperty String targetFormat, + @JsonProperty String targetMetadataPath, + @JsonProperty String targetSchema) { + this.targetFormat = targetFormat; + this.targetMetadataPath = targetMetadataPath; + this.targetSchema = targetSchema; + } +} diff --git a/xtable-service/src/main/resources/application.properties b/xtable-service/src/main/resources/application.properties new file mode 100644 index 00000000..f00ee7c8 --- /dev/null +++ b/xtable-service/src/main/resources/application.properties @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +quarkus.log.level=INFO \ No newline at end of file diff --git a/xtable-service/src/main/resources/xtable-hadoop-defaults.xml b/xtable-service/src/main/resources/xtable-hadoop-defaults.xml new file mode 100644 index 00000000..0262bd83 --- /dev/null +++ b/xtable-service/src/main/resources/xtable-hadoop-defaults.xml @@ -0,0 +1,91 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. +--> +<configuration> + + <!-- Default file system for local file scheme, file:/// --> + <property> + <name>fs.file.impl</name> + <value>org.apache.hadoop.fs.LocalFileSystem</value> + </property> + + <!-- Default configs for Azure storage scheme, abfs:// --> + <property> + <name>fs.azure.account.auth.type</name> + <value>OAuth</value> + </property> + <property> + <name>fs.azure.account.oauth.provider.type</name> + <value>org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider</value> + </property> + <!-- other required properties for OAuth --> + <!-- + <property> + <name>fs.azure.account.oauth2.client.endpoint</name> + <value>https://login.microsoftonline.com/ TENANT-ID /oauth2/token</value> + </property> + <property> + <name>fs.azure.account.oauth2.client.id</name> + <value> APPLICATION-ID </value> + </property> + <property> + <name>fs.azure.account.oauth2.client.secret</name> + <value> APPLICATION-SECRET </value> + </property> + --> + + <!-- Default file system for AWS S3/S3A scheme, s3:// --> + <property> + <name>fs.s3.impl</name> + <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> + </property> + <property> + <name>fs.s3.aws.credentials.provider</name> + <value>software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider</value> + </property> + <property> + <name>fs.s3a.impl</name> + <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> + </property> + <property> + <name>fs.s3a.aws.credentials.provider</name> + <value>software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider</value> + </property> + + <!-- Default file system for GCP scheme, gs:// --> + <property> + <name>fs.gs.impl</name> + <value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem</value> + </property> + <property> + <name>fs.AbstractFileSystem.gs.impl</name> + <value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS</value> + </property> + + <!-- Default Spark configs for Delta Table conversions --> + <property> + <name>spark.master</name> + <value>local[2]</value> + </property> + + <!-- Whether to write avro list structures in the old way (2 levels) or the new one (3 levels) --> + <property> + <name>parquet.avro.write-old-list-structure</name> + <value>false</value> + </property> + +</configuration> diff --git a/xtable-service/src/test/java/org/apache/xtable/service/TestConversionResource.java b/xtable-service/src/test/java/org/apache/xtable/service/TestConversionResource.java new file mode 100644 index 00000000..5591db86 --- /dev/null +++ b/xtable-service/src/test/java/org/apache/xtable/service/TestConversionResource.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.service; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Arrays; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import org.apache.xtable.model.storage.TableFormat; +import org.apache.xtable.service.models.ConvertTableRequest; +import org.apache.xtable.service.models.ConvertTableResponse; +import org.apache.xtable.service.models.ConvertedTable; + +@ExtendWith(MockitoExtension.class) +class TestConversionResource { + + private static final String SOURCE_TABLE_NAME = "users"; + private static final String SOURCE_TABLE_BASE_PATH = "s3://bucket/tables/users"; + private static final String TARGET_ICEBERG_METADATA_PATH = "s3://bucket/tables/users/metadata"; + + @Mock private ConversionService conversionService; + + @InjectMocks private ConversionResource resource; + + @Test + void testConvertTableResource() { + ConvertTableRequest req = + ConvertTableRequest.builder() + .sourceFormat(TableFormat.DELTA) + .sourceTableName(SOURCE_TABLE_NAME) + .sourceTablePath(SOURCE_TABLE_BASE_PATH) + .targetFormats(Arrays.asList(TableFormat.ICEBERG)) + .build(); + + ConvertedTable icebergTable = + ConvertedTable.builder() + .targetFormat(TableFormat.ICEBERG) + .targetMetadataPath(TARGET_ICEBERG_METADATA_PATH) + .build(); + + ConvertTableResponse expected = + ConvertTableResponse.builder().convertedTables(Arrays.asList(icebergTable)).build(); + when(conversionService.convertTable(req)).thenReturn(expected); + ConvertTableResponse actual = resource.convertTable(req); + verify(conversionService).convertTable(req); + + assertNotNull(actual); + assertSame(expected, actual, "Resource should return the exact response from the service"); + + assertEquals(1, actual.getConvertedTables().size()); + assertEquals(TableFormat.ICEBERG, actual.getConvertedTables().get(0).getTargetFormat()); + assertEquals( + TARGET_ICEBERG_METADATA_PATH, actual.getConvertedTables().get(0).getTargetMetadataPath()); + } +} diff --git a/xtable-service/src/test/java/org/apache/xtable/service/TestConversionService.java b/xtable-service/src/test/java/org/apache/xtable/service/TestConversionService.java new file mode 100644 index 00000000..d22b561c --- /dev/null +++ b/xtable-service/src/test/java/org/apache/xtable/service/TestConversionService.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.service; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.hadoop.conf.Configuration; +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; + +import org.apache.iceberg.SchemaParser; + +import org.apache.xtable.avro.AvroSchemaConverter; +import org.apache.xtable.conversion.ConversionController; +import org.apache.xtable.conversion.ConversionSourceProvider; +import org.apache.xtable.iceberg.IcebergSchemaExtractor; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.storage.TableFormat; +import org.apache.xtable.schema.SparkSchemaExtractor; +import org.apache.xtable.service.models.ConvertTableRequest; +import org.apache.xtable.service.models.ConvertTableResponse; +import org.apache.xtable.service.models.ConvertedTable; +import org.apache.xtable.spi.extractor.ConversionSource; + +@ExtendWith(MockitoExtension.class) +class TestConversionService { + private static final String SOURCE_NAME = "users"; + private static final String SOURCE_PATH = "s3://bucket/tables/users"; + private static final String HUDI_META_PATH = "s3://bucket/tables/users/.hoodie"; + private static final String ICEBERG_META_PATH = + "s3://bucket/tables/users/metadata/v1.metadata.json"; + private static final String DELTA_META_PATH = "s3://bucket/tables/users/delta_log"; + + private static final String HUDI_SCHEMA_JSON = + "{\n" + + " \"type\":\"record\",\n" + + " \"name\":\"Users\",\n" + + " \"fields\":[{\"name\":\"id\",\"type\":\"string\"}]\n" + + "}"; + + private static final String ICEBERG_JSON = + "{\"type\":\"record\",\"name\":\"Users\"," + + "\"fields\":[{\"name\":\"id\",\"type\":\"string\",\"field-id\":1}]}"; + + private static final String DELTA_JSON = + "{\"type\":\"struct\",\"fields\":[" + + "{\"name\":\"id\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}"; + + @Mock private ConversionServiceConfig serviceConfig; + + @Mock private ConversionController controller; + + @Mock ConversionSourceProvider provider; + + @Mock ConversionSource conversionSrc; + + @Mock InternalTable internalTbl; + + @Mock InternalSchema internalSchema; + + private ConversionService service; + private Configuration conf; + + @BeforeEach + void setUp() { + this.conf = new Configuration(); + Map<String, ConversionSourceProvider<?>> providers = new HashMap<>(); + providers.put(TableFormat.DELTA, provider); + providers.put(TableFormat.HUDI, provider); + providers.put(TableFormat.ICEBERG, provider); + service = new ConversionService(serviceConfig, controller, this.conf, providers); + } + + @Test + void convertToTargetHudi() { + ConvertTableRequest req = + ConvertTableRequest.builder() + .sourceFormat(TableFormat.DELTA) + .sourceTableName(SOURCE_NAME) + .sourceTablePath(SOURCE_PATH) + .targetFormats(Collections.singletonList(TableFormat.HUDI)) + .build(); + + Schema avroSchema = new Schema.Parser().parse(HUDI_SCHEMA_JSON); + try (MockedStatic<AvroSchemaConverter> avroConv = mockStatic(AvroSchemaConverter.class)) { + when(controller.sync(any(), eq(provider))).thenReturn(null); + when(provider.getConversionSourceInstance(any())).thenReturn(conversionSrc); + when(conversionSrc.getCurrentTable()).thenReturn(internalTbl); + + when(internalTbl.getName()).thenReturn(TableFormat.HUDI); + when(internalTbl.getLatestMetdataPath()).thenReturn(HUDI_META_PATH); + when(internalTbl.getReadSchema()).thenReturn(internalSchema); + + AvroSchemaConverter converter = mock(AvroSchemaConverter.class); + avroConv.when(AvroSchemaConverter::getInstance).thenReturn(converter); + when(converter.fromInternalSchema(internalSchema)).thenReturn(avroSchema); + + ConvertTableResponse resp = service.convertTable(req); + + verify(controller).sync(any(), eq(provider)); + assertEquals(1, resp.getConvertedTables().size()); + ConvertedTable ct = resp.getConvertedTables().get(0); + assertEquals(TableFormat.HUDI, ct.getTargetFormat()); + assertEquals(HUDI_META_PATH, ct.getTargetMetadataPath()); + assertEquals(avroSchema.toString(), ct.getTargetSchema()); + } + } + + @Test + void convertToTargetIceberg() { + ConvertTableRequest req = + ConvertTableRequest.builder() + .sourceFormat(TableFormat.DELTA) + .sourceTableName(SOURCE_NAME) + .sourceTablePath(SOURCE_PATH) + .targetFormats(Collections.singletonList(TableFormat.ICEBERG)) + .build(); + + org.apache.iceberg.Schema icebergSchema = mock(org.apache.iceberg.Schema.class); + try (MockedStatic<IcebergSchemaExtractor> iceExt = mockStatic(IcebergSchemaExtractor.class); + MockedStatic<SchemaParser> parserMock = mockStatic(SchemaParser.class)) { + + when(controller.sync(any(), eq(provider))).thenReturn(null); + when(provider.getConversionSourceInstance(any())).thenReturn(conversionSrc); + when(conversionSrc.getCurrentTable()).thenReturn(internalTbl); + + when(internalTbl.getName()).thenReturn(TableFormat.ICEBERG); + when(internalTbl.getLatestMetdataPath()).thenReturn(ICEBERG_META_PATH); + when(internalTbl.getReadSchema()).thenReturn(internalSchema); + + IcebergSchemaExtractor extractor = mock(IcebergSchemaExtractor.class); + iceExt.when(IcebergSchemaExtractor::getInstance).thenReturn(extractor); + when(extractor.toIceberg(internalSchema)).thenReturn(icebergSchema); + + parserMock.when(() -> SchemaParser.toJson(icebergSchema)).thenReturn(ICEBERG_JSON); + + ConvertTableResponse resp = service.convertTable(req); + + verify(controller).sync(any(), eq(provider)); + assertEquals(1, resp.getConvertedTables().size()); + ConvertedTable ct = resp.getConvertedTables().get(0); + assertEquals(TableFormat.ICEBERG, ct.getTargetFormat()); + assertEquals(ICEBERG_META_PATH, ct.getTargetMetadataPath()); + assertEquals(ICEBERG_JSON, ct.getTargetSchema()); + } + } + + @Test + void convertToTargetDelta() { + ConvertTableRequest req = + ConvertTableRequest.builder() + .sourceFormat(TableFormat.ICEBERG) + .sourceTableName(SOURCE_NAME) + .sourceTablePath(SOURCE_PATH) + .targetFormats(Collections.singletonList(TableFormat.DELTA)) + .build(); + + StructType structType = mock(StructType.class); + try (MockedStatic<SparkSchemaExtractor> sparkExt = mockStatic(SparkSchemaExtractor.class)) { + when(controller.sync(any(), eq(provider))).thenReturn(null); + when(provider.getConversionSourceInstance(any())).thenReturn(conversionSrc); + when(conversionSrc.getCurrentTable()).thenReturn(internalTbl); + + when(internalTbl.getName()).thenReturn(TableFormat.DELTA); + when(internalTbl.getLatestMetdataPath()).thenReturn(DELTA_META_PATH); + when(internalTbl.getReadSchema()).thenReturn(internalSchema); + + SparkSchemaExtractor extractor = mock(SparkSchemaExtractor.class); + sparkExt.when(SparkSchemaExtractor::getInstance).thenReturn(extractor); + when(extractor.fromInternalSchema(internalSchema)).thenReturn(structType); + when(structType.json()).thenReturn(DELTA_JSON); + + ConvertTableResponse resp = service.convertTable(req); + + verify(controller).sync(any(), eq(provider)); + assertEquals(1, resp.getConvertedTables().size()); + ConvertedTable ct = resp.getConvertedTables().get(0); + assertEquals(TableFormat.DELTA, ct.getTargetFormat()); + assertEquals(DELTA_META_PATH, ct.getTargetMetadataPath()); + assertEquals(DELTA_JSON, ct.getTargetSchema()); + } + } +}