This is an automated email from the ASF dual-hosted git repository.
vinish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/main by this push:
new fb724f0c [590] Add interfaces for CatalogSyncClient and CatalogSync
fb724f0c is described below
commit fb724f0c8989c684259c24ebad3cefbefdc43432
Author: Vinish Reddy <[email protected]>
AuthorDate: Thu Dec 26 11:10:59 2024 -0800
[590] Add interfaces for CatalogSyncClient and CatalogSync
---
pom.xml | 15 ++-
rfc/template.md | 55 ---------
xtable-api/pom.xml | 4 +
.../org/apache/xtable/conversion/SourceTable.java | 2 +-
.../CatalogTableIdentifier.java} | 25 +---
.../HierarchicalTableIdentifier.java} | 30 ++---
.../ThreePartHierarchicalTableIdentifier.java | 105 +++++++++++++++++
...ErrorCode.java => CatalogRefreshException.java} | 23 ++--
.../apache/xtable/model/exception/ErrorCode.java | 3 +-
.../org/apache/xtable/model/sync/SyncResult.java | 28 ++++-
.../extractor/CatalogConversionSource.java} | 30 ++---
.../xtable/spi/extractor/ConversionSource.java | 9 ++
.../org/apache/xtable/spi/sync/CatalogSync.java | 129 +++++++++++++++++++++
.../apache/xtable/spi/sync/CatalogSyncClient.java | 71 ++++++++++++
.../org/apache/xtable/spi/sync/CatalogUtils.java | 63 ++++++++++
.../apache/xtable/spi/sync/TableFormatSync.java | 13 ++-
.../TestThreePartHierarchicalTableIdentifier.java | 50 ++++++++
.../apache/xtable/spi/sync/TestCatalogSync.java | 128 ++++++++++++++++++++
.../apache/xtable/spi/sync/TestCatalogUtils.java | 72 ++++++++++++
.../xtable/spi/sync/TestTableFormatSync.java | 38 +++---
.../xtable/conversion/ConversionController.java | 4 +-
.../apache/xtable/conversion/ConversionUtils.java | 57 ---------
.../apache/xtable/delta/DeltaConversionSource.java | 7 ++
.../apache/xtable/hudi/HudiConversionSource.java | 13 +++
.../xtable/iceberg/IcebergConversionSource.java | 7 ++
.../org/apache/xtable/ITConversionController.java | 3 +-
.../conversion/TestConversionController.java | 2 +-
.../xtable/conversion/TestConversionUtils.java | 111 ------------------
.../delta/ITDeltaConversionTargetSource.java | 55 ++++++---
.../xtable/hudi/ITHudiConversionSourceSource.java | 125 ++++++++++++++++++++
.../iceberg/ITIcebergConversionTargetSource.java | 69 +++++++++++
.../xtable/iceberg/TestIcebergDataHelper.java | 8 ++
.../org/apache/xtable/testutil/ITTestUtils.java | 47 ++++++++
.../apache/xtable/hudi/sync/XTableSyncTool.java | 2 +-
34 files changed, 1061 insertions(+), 342 deletions(-)
diff --git a/pom.xml b/pom.xml
index 99b4fe1a..7a597342 100644
--- a/pom.xml
+++ b/pom.xml
@@ -68,6 +68,7 @@
<maven-gpg-plugin.version>3.2.4</maven-gpg-plugin.version>
<maven-deploy-plugin.version>3.1.1</maven-deploy-plugin.version>
<maven-release-plugin.version>2.5.3</maven-release-plugin.version>
+ <mockito.version>5.2.0</mockito.version>
<parquet.version>1.12.2</parquet.version>
<protobuf.version>3.25.5</protobuf.version>
<scala12.version>2.12.20</scala12.version>
@@ -438,7 +439,19 @@
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
- <version>4.8.0</version>
+ <version>${mockito.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-junit-jupiter</artifactId>
+ <scope>test</scope>
+ <version>${mockito.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-inline</artifactId>
+ <version>${mockito.version}</version>
<scope>test</scope>
</dependency>
diff --git a/rfc/template.md b/rfc/template.md
deleted file mode 100644
index 75ab32a4..00000000
--- a/rfc/template.md
+++ /dev/null
@@ -1,55 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-# RFC-[number]: [Title]
-
-## Proposers
-
-- @<proposer1 github username>
-- @<proposer2 github username>
-
-## Approvers
-- @<approver1 github username>
-- @<approver2 github username>
-
-## Status
-
-GH Feature Request: <link to umbrella JIRA>
-
-> Please keep the status updated in `rfc/README.md`.
-
-## Abstract
-
-Describe the problem you are trying to solve and a brief description of why
it’s needed.
-
-## Background
-Introduce any background context which is relevant or necessary to understand
the feature and design choices.
-
-## Implementation
-Describe the new thing you want to do in appropriate detail, how it fits into
the project architecture.<br>
-Provide a detailed description of how you intend to implement this feature,
this may be fairly extensive and have large subsections of its own or it may be
a few sentences.<br>
-Use judgement to decide on how detailed the description needs to be based on
the scope of the change. If unclear, you can ask questions in
[email protected].
-
-## Rollout/Adoption Plan
-
-- Are there any breaking changes as part of this new feature/functionality?
-- What impact (if any) will there be on existing users?
-- If we are changing behavior how will we phase out the older behavior? When
will we remove the existing behavior?
-- If we need special migration tools, describe them here.
-
-## Test Plan
-
-Describe in few sentences how the RFC will be tested. How will we know that
the implementation works as expected? How will we know nothing breaks?
\ No newline at end of file
diff --git a/xtable-api/pom.xml b/xtable-api/pom.xml
index 9436b775..43aa7bac 100644
--- a/xtable-api/pom.xml
+++ b/xtable-api/pom.xml
@@ -84,5 +84,9 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-junit-jupiter</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git
a/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java
b/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java
index b37e1c1e..f3e1c359 100644
--- a/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java
+++ b/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java
@@ -28,7 +28,7 @@ import lombok.NonNull;
@EqualsAndHashCode(callSuper = true)
@Getter
public class SourceTable extends ExternalTable {
- /** The path to the data files, defaults to the metadataPath */
+ /** The path to the data files, defaults to the basePath */
@NonNull private final String dataPath;
@Builder(toBuilder = true)
diff --git
a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java
b/xtable-api/src/main/java/org/apache/xtable/model/catalog/CatalogTableIdentifier.java
similarity index 63%
copy from
xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java
copy to
xtable-api/src/main/java/org/apache/xtable/model/catalog/CatalogTableIdentifier.java
index 920a95f4..7679035f 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java
+++
b/xtable-api/src/main/java/org/apache/xtable/model/catalog/CatalogTableIdentifier.java
@@ -16,25 +16,10 @@
* limitations under the License.
*/
-package org.apache.xtable.model.exception;
+package org.apache.xtable.model.catalog;
-import lombok.Getter;
-
-@Getter
-public enum ErrorCode {
- INVALID_CONFIGURATION(10001),
- INVALID_PARTITION_SPEC(10002),
- INVALID_PARTITION_VALUE(10003),
- READ_EXCEPTION(10004),
- UPDATE_EXCEPTION(10005),
- INVALID_SCHEMA(10006),
- UNSUPPORTED_SCHEMA_TYPE(10007),
- UNSUPPORTED_FEATURE(10008),
- PARSE_EXCEPTION(10009);
-
- private final int errorCode;
-
- ErrorCode(int errorCode) {
- this.errorCode = errorCode;
- }
+/** Represents a catalog table identifier in a multi-level catalog system. */
+public interface CatalogTableIdentifier {
+ /** Returns the string identifier for the table within its catalog context */
+ String getId();
}
diff --git
a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java
b/xtable-api/src/main/java/org/apache/xtable/model/catalog/HierarchicalTableIdentifier.java
similarity index 60%
copy from
xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java
copy to
xtable-api/src/main/java/org/apache/xtable/model/catalog/HierarchicalTableIdentifier.java
index 920a95f4..e6d7cf0e 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java
+++
b/xtable-api/src/main/java/org/apache/xtable/model/catalog/HierarchicalTableIdentifier.java
@@ -16,25 +16,19 @@
* limitations under the License.
*/
-package org.apache.xtable.model.exception;
+package org.apache.xtable.model.catalog;
-import lombok.Getter;
-
-@Getter
-public enum ErrorCode {
- INVALID_CONFIGURATION(10001),
- INVALID_PARTITION_SPEC(10002),
- INVALID_PARTITION_VALUE(10003),
- READ_EXCEPTION(10004),
- UPDATE_EXCEPTION(10005),
- INVALID_SCHEMA(10006),
- UNSUPPORTED_SCHEMA_TYPE(10007),
- UNSUPPORTED_FEATURE(10008),
- PARSE_EXCEPTION(10009);
+/**
+ * Represents a hierarchical table identifier, often including catalog,
database (or schema), and
+ * table names. Some catalogs may omit the catalog name.
+ */
+public interface HierarchicalTableIdentifier extends CatalogTableIdentifier {
+ /** @return the catalog name if present, otherwise null */
+ String getCatalogName();
- private final int errorCode;
+ /** @return the database (or schema) name; required */
+ String getDatabaseName();
- ErrorCode(int errorCode) {
- this.errorCode = errorCode;
- }
+ /** @return the table name; required */
+ String getTableName();
}
diff --git
a/xtable-api/src/main/java/org/apache/xtable/model/catalog/ThreePartHierarchicalTableIdentifier.java
b/xtable-api/src/main/java/org/apache/xtable/model/catalog/ThreePartHierarchicalTableIdentifier.java
new file mode 100644
index 00000000..2608d36a
--- /dev/null
+++
b/xtable-api/src/main/java/org/apache/xtable/model/catalog/ThreePartHierarchicalTableIdentifier.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.model.catalog;
+
+import lombok.NonNull;
+import lombok.Value;
+
+/**
+ * An internal representation of a fully qualified table identifier within a
catalog. The naming
+ * convention follows a three level hierarchy, few examples can be found below.
+ *
+ * <ul>
+ * <li>1. catalog.database.table
+ * <li>2. catalog.schema.table
+ * <li>3. database.schema.table
+ * </ul>
+ *
+ * We have selected the first naming convention and will interoperate among
other catalogs following
+ * a different convention.
+ */
+@Value
+public class ThreePartHierarchicalTableIdentifier implements
HierarchicalTableIdentifier {
+ /**
+ * The top level hierarchy/namespace for organizing tables. Each catalog can
have multiple
+ * databases/schemas. This is an optional field as many catalogs have a
"default" catalog whose
+ * name varies depending on the catalogType.
+ */
+ String catalogName;
+ /**
+ * Catalogs have the ability to group tables logically, databaseName is the
identifier for such
+ * logical classification. The alternate names for this field include
namespace, schemaName etc.
+ */
+ @NonNull String databaseName;
+
+ /**
+ * The table name used when syncing the table to the catalog. NOTE: This
name can be different
+ * from the table name in storage.
+ */
+ @NonNull String tableName;
+
+ public ThreePartHierarchicalTableIdentifier(
+ String catalogName, @NonNull String databaseName, @NonNull String
tableName) {
+ this.catalogName = catalogName;
+ this.databaseName = databaseName;
+ this.tableName = tableName;
+ }
+
+ public ThreePartHierarchicalTableIdentifier(String databaseName, String
tableName) {
+ this(null, databaseName, tableName);
+ }
+
+ /**
+ * Constructs a new {@code CatalogTableIdentifier} from a hierarchical
string identifier.
+ *
+ * <p>The identifier is expected to be in one of the following formats:
+ *
+ * <ul>
+ * <li>{@code database.table} (two parts, no catalog)
+ * <li>{@code catalog.database.table} (three parts)
+ * </ul>
+ *
+ * If the identifier does not match one of these formats, an {@link
IllegalArgumentException} is
+ * thrown.
+ *
+ * @param hierarchicalTableIdentifier The hierarchical string identifier
(e.g.,
+ * "myCatalog.myDatabase.myTable" or "myDatabase.myTable").
+ * @throws IllegalArgumentException If the provided string does not match a
valid two-part or
+ * three-part identifier.
+ */
+ public static ThreePartHierarchicalTableIdentifier
fromDotSeparatedIdentifier(
+ String hierarchicalTableIdentifier) {
+ String[] parts = hierarchicalTableIdentifier.split("\\.");
+ if (parts.length == 2) {
+ return new ThreePartHierarchicalTableIdentifier(parts[0], parts[1]);
+ } else if (parts.length == 3) {
+ return new ThreePartHierarchicalTableIdentifier(parts[0], parts[1],
parts[2]);
+ } else {
+ throw new IllegalArgumentException("Invalid table identifier " +
hierarchicalTableIdentifier);
+ }
+ }
+
+ @Override
+ public String getId() {
+ if (catalogName != null && !catalogName.isEmpty()) {
+ return catalogName + "." + databaseName + "." + tableName;
+ }
+ return databaseName + "." + tableName;
+ }
+}
diff --git
a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java
b/xtable-api/src/main/java/org/apache/xtable/model/exception/CatalogRefreshException.java
similarity index 67%
copy from
xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java
copy to
xtable-api/src/main/java/org/apache/xtable/model/exception/CatalogRefreshException.java
index 920a95f4..dd322292 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java
+++
b/xtable-api/src/main/java/org/apache/xtable/model/exception/CatalogRefreshException.java
@@ -18,23 +18,14 @@
package org.apache.xtable.model.exception;
-import lombok.Getter;
+/** Exception thrown when refresh operation (updating table format metadata)
in catalog fails. */
+public class CatalogRefreshException extends InternalException {
-@Getter
-public enum ErrorCode {
- INVALID_CONFIGURATION(10001),
- INVALID_PARTITION_SPEC(10002),
- INVALID_PARTITION_VALUE(10003),
- READ_EXCEPTION(10004),
- UPDATE_EXCEPTION(10005),
- INVALID_SCHEMA(10006),
- UNSUPPORTED_SCHEMA_TYPE(10007),
- UNSUPPORTED_FEATURE(10008),
- PARSE_EXCEPTION(10009);
-
- private final int errorCode;
+ public CatalogRefreshException(String message, Throwable e) {
+ super(ErrorCode.CATALOG_REFRESH_EXCEPTION, message, e);
+ }
- ErrorCode(int errorCode) {
- this.errorCode = errorCode;
+ public CatalogRefreshException(String message) {
+ super(ErrorCode.CATALOG_REFRESH_EXCEPTION, message);
}
}
diff --git
a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java
b/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java
index 920a95f4..af85c900 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java
@@ -30,7 +30,8 @@ public enum ErrorCode {
INVALID_SCHEMA(10006),
UNSUPPORTED_SCHEMA_TYPE(10007),
UNSUPPORTED_FEATURE(10008),
- PARSE_EXCEPTION(10009);
+ PARSE_EXCEPTION(10009),
+ CATALOG_REFRESH_EXCEPTION(10010);
private final int errorCode;
diff --git
a/xtable-api/src/main/java/org/apache/xtable/model/sync/SyncResult.java
b/xtable-api/src/main/java/org/apache/xtable/model/sync/SyncResult.java
index d158b38c..824f626c 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/sync/SyncResult.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/sync/SyncResult.java
@@ -20,6 +20,7 @@ package org.apache.xtable.model.sync;
import java.time.Duration;
import java.time.Instant;
+import java.util.List;
import lombok.Builder;
import lombok.Value;
@@ -30,7 +31,7 @@ import lombok.Value;
* @since 0.1
*/
@Value
-@Builder
+@Builder(toBuilder = true)
public class SyncResult {
// Mode used for the sync
SyncMode mode;
@@ -38,10 +39,12 @@ public class SyncResult {
Instant syncStartTime;
// Duration
Duration syncDuration;
- // Status of the sync
- SyncStatus status;
+ // Status of the tableFormat sync
+ SyncStatus tableFormatSyncStatus;
// The Sync Mode recommended for the next sync (Usually filled on an error)
SyncMode recommendedSyncMode;
+ // The sync status for each catalog.
+ List<CatalogSyncStatus> catalogSyncStatusList;
public enum SyncStatusCode {
SUCCESS,
@@ -57,6 +60,25 @@ public class SyncResult {
SyncStatus.builder().statusCode(SyncStatusCode.SUCCESS).build();
// Status code
SyncStatusCode statusCode;
+ // errorDetails if any
+ ErrorDetails errorDetails;
+ }
+
+ /** Represents status for catalog sync status operation */
+ @Value
+ @Builder
+ public static class CatalogSyncStatus {
+ // A user defined unique catalog identifier.
+ String catalogId;
+ // Status code
+ SyncStatusCode statusCode;
+ // errorDetails if any
+ ErrorDetails errorDetails;
+ }
+
+ @Value
+ @Builder
+ public static class ErrorDetails {
// error Message if any
String errorMessage;
// Readable description of the error
diff --git
a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java
b/xtable-api/src/main/java/org/apache/xtable/spi/extractor/CatalogConversionSource.java
similarity index 57%
copy from
xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java
copy to
xtable-api/src/main/java/org/apache/xtable/spi/extractor/CatalogConversionSource.java
index 920a95f4..616f3d45 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/exception/ErrorCode.java
+++
b/xtable-api/src/main/java/org/apache/xtable/spi/extractor/CatalogConversionSource.java
@@ -16,25 +16,17 @@
* limitations under the License.
*/
-package org.apache.xtable.model.exception;
+package org.apache.xtable.spi.extractor;
-import lombok.Getter;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
-@Getter
-public enum ErrorCode {
- INVALID_CONFIGURATION(10001),
- INVALID_PARTITION_SPEC(10002),
- INVALID_PARTITION_VALUE(10003),
- READ_EXCEPTION(10004),
- UPDATE_EXCEPTION(10005),
- INVALID_SCHEMA(10006),
- UNSUPPORTED_SCHEMA_TYPE(10007),
- UNSUPPORTED_FEATURE(10008),
- PARSE_EXCEPTION(10009);
-
- private final int errorCode;
-
- ErrorCode(int errorCode) {
- this.errorCode = errorCode;
- }
+/**
+ * A client for converting the table with tableIdentifier {@link
CatalogTableIdentifier} in source
+ * catalog to SourceTable object {@link SourceTable}, can be used by
downstream consumers for
+ * syncing it to multiple {@link org.apache.xtable.conversion.TargetTable}
+ */
+public interface CatalogConversionSource {
+ /** Returns the source table object present in the catalog. */
+ SourceTable getSourceTable(CatalogTableIdentifier tableIdentifier);
}
diff --git
a/xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java
b/xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java
index 2500454c..21f7f63f 100644
---
a/xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java
+++
b/xtable-api/src/main/java/org/apache/xtable/spi/extractor/ConversionSource.java
@@ -41,6 +41,15 @@ public interface ConversionSource<COMMIT> extends Closeable {
*/
InternalTable getTable(COMMIT commit);
+ /**
+ * Extracts the {@link InternalTable} as of latest state. This method is
less expensive as
+ * compared to {@link ConversionSource#getCurrentSnapshot()} as it doesn't
load the files present
+ * in the table.
+ *
+ * @return {@link InternalTable} representing the current table.
+ */
+ InternalTable getCurrentTable();
+
/**
* Extracts the {@link InternalSnapshot} as of latest state.
*
diff --git
a/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java
b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java
new file mode 100644
index 00000000..e76d5906
--- /dev/null
+++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSync.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.spi.sync;
+
+import static
org.apache.xtable.spi.sync.CatalogUtils.hasStorageDescriptorLocationChanged;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.log4j.Log4j2;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+import org.apache.xtable.model.sync.SyncResult;
+import org.apache.xtable.model.sync.SyncResult.CatalogSyncStatus;
+
+/** Provides the functionality to sync metadata from InternalTable to multiple
target catalogs */
+@Log4j2
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class CatalogSync {
+ private static final CatalogSync INSTANCE = new CatalogSync();
+
+ public static CatalogSync getInstance() {
+ return INSTANCE;
+ }
+
+ public SyncResult syncTable(
+ Map<CatalogTableIdentifier, CatalogSyncClient> catalogSyncClients,
InternalTable table) {
+ List<CatalogSyncStatus> results = new ArrayList<>();
+ Instant startTime = Instant.now();
+ catalogSyncClients.forEach(
+ ((tableIdentifier, catalogSyncClient) -> {
+ try {
+ results.add(syncCatalog(catalogSyncClient, tableIdentifier,
table));
+ log.info(
+ "Catalog sync is successful for table {} with format {} using
catalogSync {}",
+ table.getBasePath(),
+ table.getTableFormat(),
+ catalogSyncClient.getClass().getName());
+ } catch (Exception e) {
+ log.error(
+ "Catalog sync failed for table {} with format {} using
catalogSync {}",
+ table.getBasePath(),
+ table.getTableFormat(),
+ catalogSyncClient.getClass().getName());
+ results.add(
+ getCatalogSyncFailureStatus(
+ catalogSyncClient.getCatalogId(),
catalogSyncClient.getClass().getName(), e));
+ }
+ }));
+ return SyncResult.builder()
+ .lastInstantSynced(table.getLatestCommitTime())
+ .syncStartTime(startTime)
+ .syncDuration(Duration.between(startTime, Instant.now()))
+ .catalogSyncStatusList(results)
+ .build();
+ }
+
+ private <TABLE> CatalogSyncStatus syncCatalog(
+ CatalogSyncClient<TABLE> catalogSyncClient,
+ CatalogTableIdentifier tableIdentifier,
+ InternalTable table) {
+ if (!catalogSyncClient.hasDatabase(tableIdentifier)) {
+ catalogSyncClient.createDatabase(tableIdentifier);
+ }
+ TABLE catalogTable = catalogSyncClient.getTable(tableIdentifier);
+ String storageDescriptorLocation =
catalogSyncClient.getStorageLocation(catalogTable);
+ if (catalogTable == null) {
+ catalogSyncClient.createTable(table, tableIdentifier);
+ } else if (hasStorageDescriptorLocationChanged(
+ storageDescriptorLocation, table.getBasePath())) {
+ // Replace table if there is a mismatch between hmsTable location and
Xtable basePath.
+ // Possible reasons could be:
+ // 1) hms table (manually) created with a different location before and
need to be
+ // re-created with a new basePath
+ // 2) XTable basePath changes due to migration or other reasons
+ String oldLocation =
+ StringUtils.isEmpty(storageDescriptorLocation) ? "null" :
storageDescriptorLocation;
+ log.warn(
+ "StorageDescriptor location changed from {} to {}, re-creating
table",
+ oldLocation,
+ table.getBasePath());
+ catalogSyncClient.createOrReplaceTable(table, tableIdentifier);
+ } else {
+ log.debug("Table metadata changed, refreshing table");
+ catalogSyncClient.refreshTable(table, catalogTable, tableIdentifier);
+ }
+ return CatalogSyncStatus.builder()
+ .catalogId(catalogSyncClient.getCatalogId())
+ .statusCode(SyncResult.SyncStatusCode.SUCCESS)
+ .build();
+ }
+
+ private CatalogSyncStatus getCatalogSyncFailureStatus(
+ String catalogId, String catalogImpl, Exception e) {
+ return CatalogSyncStatus.builder()
+ .catalogId(catalogId)
+ .statusCode(SyncResult.SyncStatusCode.ERROR)
+ .errorDetails(
+ SyncResult.ErrorDetails.builder()
+ .errorMessage(e.getMessage())
+ .errorDescription("catalogSync failed for " + catalogImpl)
+ .build())
+ .build();
+ }
+}
diff --git
a/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSyncClient.java
b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSyncClient.java
new file mode 100644
index 00000000..62de9379
--- /dev/null
+++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogSyncClient.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.spi.sync;
+
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+
+/**
+ * An interface for syncing {@link InternalTable} object to {@link TABLE}
object defined by the
+ * catalog.
+ *
+ * @param <TABLE>
+ */
+public interface CatalogSyncClient<TABLE> extends AutoCloseable {
+ /**
+ * Returns the user-defined unique identifier for the catalog, allows user
to sync table to
+ * multiple catalogs of the same name/type eg: HMS catalog with url1, HMS
catalog with url2.
+ */
+ String getCatalogId();
+
+ /** Returns the storage location of the table synced to the catalog. */
+ String getStorageLocation(TABLE table);
+
+ /** Checks whether a database used by tableIdentifier exists in the catalog.
*/
+ boolean hasDatabase(CatalogTableIdentifier tableIdentifier);
+
+ /** Creates a database used by tableIdentifier in the catalog. */
+ void createDatabase(CatalogTableIdentifier tableIdentifier);
+
+ /**
+ * Return the TABLE object used by the catalog implementation. Eg: HMS uses
+ * org.apache.hadoop.hive.metastore.api.Table, Glue uses
+ * software.amazon.awssdk.services.glue.model.Table etc.
+ */
+ TABLE getTable(CatalogTableIdentifier tableIdentifier);
+
+ /**
+ * Create a table in the catalog using the canonical InternalTable
representation and
+ * tableIdentifier.
+ */
+ void createTable(InternalTable table, CatalogTableIdentifier
tableIdentifier);
+
+ /** Refreshes the table metadata in the catalog with tableIdentifier. */
+ void refreshTable(
+ InternalTable table, TABLE catalogTable, CatalogTableIdentifier
tableIdentifier);
+
+ /**
+ * Tries to re-create the table in the catalog replacing state with the new
canonical
+ * InternalTable representation and tableIdentifier.
+ */
+ void createOrReplaceTable(InternalTable table, CatalogTableIdentifier
tableIdentifier);
+
+ /** Drops a table from the catalog. */
+ void dropTable(InternalTable table, CatalogTableIdentifier tableIdentifier);
+}
diff --git
a/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogUtils.java
b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogUtils.java
new file mode 100644
index 00000000..cd3d6d5d
--- /dev/null
+++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/CatalogUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.spi.sync;
+
+import java.net.URI;
+import java.util.Objects;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.xtable.model.exception.CatalogRefreshException;
+
+/** Utility methods used by CatalogSync. */
+public class CatalogUtils {
+
+ /**
+ * Returns whether the location of the table in catalog is same as the one
currently in storage.
+ *
+ * @param storageDescriptorLocation location of the table in catalog.
+ * @param tableBasePath location of the table in source table.
+ * @return equality of both the locations.
+ */
+ static boolean hasStorageDescriptorLocationChanged(
+ String storageDescriptorLocation, String tableBasePath) {
+
+ if (StringUtils.isEmpty(storageDescriptorLocation)) {
+ return true;
+ }
+ URI storageDescriptorUri = new Path(storageDescriptorLocation).toUri();
+ URI basePathUri = new Path(tableBasePath).toUri();
+
+ if (storageDescriptorUri.equals(basePathUri)
+ || storageDescriptorUri.getScheme() == null
+ || basePathUri.getScheme() == null
+ || storageDescriptorUri.getScheme().startsWith(basePathUri.getScheme())
+ ||
basePathUri.getScheme().startsWith(storageDescriptorUri.getScheme())) {
+ String storageDescriptorLocationIdentifier =
+ storageDescriptorUri.getAuthority() + storageDescriptorUri.getPath();
+ String tableBasePathIdentifier = basePathUri.getAuthority() +
basePathUri.getPath();
+ return !Objects.equals(storageDescriptorLocationIdentifier,
tableBasePathIdentifier);
+ }
+ throw new CatalogRefreshException(
+ String.format(
+ "Storage scheme has changed for table catalogStorageDescriptorUri
%s basePathUri %s",
+ storageDescriptorUri, basePathUri));
+ }
+}
diff --git
a/xtable-api/src/main/java/org/apache/xtable/spi/sync/TableFormatSync.java
b/xtable-api/src/main/java/org/apache/xtable/spi/sync/TableFormatSync.java
index 7cd0b384..bb340669 100644
--- a/xtable-api/src/main/java/org/apache/xtable/spi/sync/TableFormatSync.java
+++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/TableFormatSync.java
@@ -166,7 +166,7 @@ public class TableFormatSync {
return SyncResult.builder()
.mode(mode)
- .status(SyncResult.SyncStatus.SUCCESS)
+ .tableFormatSyncStatus(SyncResult.SyncStatus.SUCCESS)
.syncStartTime(startTime)
.syncDuration(Duration.between(startTime, Instant.now()))
.lastInstantSynced(tableState.getLatestCommitTime())
@@ -181,12 +181,15 @@ public class TableFormatSync {
private SyncResult buildResultForError(SyncMode mode, Instant startTime,
Exception e) {
return SyncResult.builder()
.mode(mode)
- .status(
+ .tableFormatSyncStatus(
SyncResult.SyncStatus.builder()
.statusCode(SyncResult.SyncStatusCode.ERROR)
- .errorMessage(e.getMessage())
- .errorDescription("Failed to sync " + mode.name())
- .canRetryOnFailure(true)
+ .errorDetails(
+ SyncResult.ErrorDetails.builder()
+ .errorMessage(e.getMessage())
+ .errorDescription("Failed to sync " + mode.name())
+ .canRetryOnFailure(true)
+ .build())
.build())
.syncStartTime(startTime)
.syncDuration(Duration.between(startTime, Instant.now()))
diff --git
a/xtable-api/src/test/java/org/apache/xtable/model/catalog/TestThreePartHierarchicalTableIdentifier.java
b/xtable-api/src/test/java/org/apache/xtable/model/catalog/TestThreePartHierarchicalTableIdentifier.java
new file mode 100644
index 00000000..5f10be0b
--- /dev/null
+++
b/xtable-api/src/test/java/org/apache/xtable/model/catalog/TestThreePartHierarchicalTableIdentifier.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.model.catalog;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class TestThreePartHierarchicalTableIdentifier {
+
+ @Test
+ void testGetId() {
+ ThreePartHierarchicalTableIdentifier catalogTableIdentifier =
+ ThreePartHierarchicalTableIdentifier.fromDotSeparatedIdentifier(
+ "catalogName.databaseName.tableName");
+ assertEquals("catalogName.databaseName.tableName",
catalogTableIdentifier.getId());
+ }
+
+ @Test
+ void testConstructorForHierarchicalTableIdentifier() {
+ Assertions.assertDoesNotThrow(
+ () ->
+ ThreePartHierarchicalTableIdentifier.fromDotSeparatedIdentifier(
+ "catalogName.databaseName.tableName"));
+ Assertions.assertDoesNotThrow(
+ () ->
+ ThreePartHierarchicalTableIdentifier.fromDotSeparatedIdentifier(
+ "databaseName.tableName"));
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
ThreePartHierarchicalTableIdentifier.fromDotSeparatedIdentifier("tableName"));
+ }
+}
diff --git
a/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestCatalogSync.java
b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestCatalogSync.java
new file mode 100644
index 00000000..2f4fbdc9
--- /dev/null
+++ b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestCatalogSync.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.spi.sync;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.PartitionTransformType;
+import org.apache.xtable.model.sync.SyncResult;
+
+@ExtendWith(MockitoExtension.class)
+public class TestCatalogSync<TABLE> {
+
+ @Mock CatalogSyncClient<TABLE> mockClient1;
+ @Mock CatalogSyncClient<TABLE> mockClient2;
+ @Mock CatalogSyncClient<TABLE> mockClient3;
+ @Mock CatalogSyncClient<TABLE> mockClient4;
+
+ private final ThreePartHierarchicalTableIdentifier tableIdentifier1 =
+ new ThreePartHierarchicalTableIdentifier("database1", "table1");
+ private final ThreePartHierarchicalTableIdentifier tableIdentifier2 =
+ new ThreePartHierarchicalTableIdentifier("database2", "table2");
+ private final ThreePartHierarchicalTableIdentifier tableIdentifier3 =
+ new ThreePartHierarchicalTableIdentifier("database3", "table3");
+ private final ThreePartHierarchicalTableIdentifier tableIdentifier4 =
+ new ThreePartHierarchicalTableIdentifier("database4", "table4");
+
+ @Mock TABLE mockTable;
+ private final InternalTable internalTable =
+ InternalTable.builder()
+ .readSchema(InternalSchema.builder().name("test_schema").build())
+ .partitioningFields(
+ Collections.singletonList(
+ InternalPartitionField.builder()
+
.sourceField(InternalField.builder().name("partition_field").build())
+ .transformType(PartitionTransformType.VALUE)
+ .build()))
+ .latestCommitTime(Instant.now().minus(10, ChronoUnit.MINUTES))
+ .basePath("/tmp/test")
+ .build();
+
+ @Test
+ void testSyncTable() {
+ when(mockClient1.hasDatabase(tableIdentifier1)).thenReturn(false);
+ when(mockClient2.hasDatabase(tableIdentifier2)).thenReturn(true);
+ when(mockClient3.hasDatabase(tableIdentifier3)).thenReturn(true);
+ when(mockClient4.hasDatabase(tableIdentifier4))
+ .thenThrow(new UnsupportedOperationException("No catalog impl"));
+
+ when(mockClient1.getTable(tableIdentifier1)).thenReturn(mockTable);
+ when(mockClient2.getTable(tableIdentifier2)).thenReturn(null);
+ when(mockClient3.getTable(tableIdentifier3)).thenReturn(mockTable);
+
+
when(mockClient1.getStorageLocation(any())).thenReturn("/tmp/test_changed");
+ when(mockClient2.getStorageLocation(any())).thenReturn("/tmp/test");
+ when(mockClient3.getStorageLocation(any())).thenReturn("/tmp/test");
+
+ when(mockClient4.getCatalogId()).thenReturn("catalogId4");
+
+ Map<CatalogTableIdentifier, CatalogSyncClient> catalogSyncClients =
+ ImmutableMap.of(
+ tableIdentifier1, mockClient1,
+ tableIdentifier2, mockClient2,
+ tableIdentifier3, mockClient3,
+ tableIdentifier4, mockClient4);
+
+ List<SyncResult.CatalogSyncStatus> results =
+ CatalogSync.getInstance()
+ .syncTable(catalogSyncClients, internalTable)
+ .getCatalogSyncStatusList();
+ List<SyncResult.CatalogSyncStatus> errorStatus =
+ results.stream()
+ .filter(status ->
status.getStatusCode().equals(SyncResult.SyncStatusCode.ERROR))
+ .collect(Collectors.toList());
+ assertEquals(SyncResult.SyncStatusCode.ERROR,
errorStatus.get(0).getStatusCode());
+ assertEquals(
+ 3,
+ results.stream()
+ .map(SyncResult.CatalogSyncStatus::getStatusCode)
+ .filter(statusCode ->
statusCode.equals(SyncResult.SyncStatusCode.SUCCESS))
+ .count());
+
+ verify(mockClient1, times(1)).createDatabase(tableIdentifier1);
+ verify(mockClient1, times(1)).createOrReplaceTable(internalTable,
tableIdentifier1);
+ verify(mockClient2, times(1)).createTable(eq(internalTable),
eq(tableIdentifier2));
+ verify(mockClient3, times(1)).refreshTable(eq(internalTable), any(),
eq(tableIdentifier3));
+ }
+}
diff --git
a/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestCatalogUtils.java
b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestCatalogUtils.java
new file mode 100644
index 00000000..69575e8e
--- /dev/null
+++ b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestCatalogUtils.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.spi.sync;
+
+import static
org.apache.xtable.spi.sync.CatalogUtils.hasStorageDescriptorLocationChanged;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.stream.Stream;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import org.apache.xtable.model.exception.CatalogRefreshException;
+
+public class TestCatalogUtils {
+
+ static Stream<Arguments> storageLocationTestArgs() {
+ return Stream.of(
+ Arguments.of("s3://bucket/table/v1", "s3://bucket/table/v2", true),
+ Arguments.of("s3://bucket/table1/v1", "s3://bucket/table2/v1", true),
+ Arguments.of("file:///var/lib/bucket/table/v1",
"file:///var/lib/bucket/table/v2/", true),
+ Arguments.of("s3://bucket/table/v1", "s3://bucket/table/v1", false),
+ Arguments.of("s3a://bucket/table/v1", "s3://bucket/table/v1/", false),
+ Arguments.of("s3://bucket/table/v1", "s3a://bucket/table/v1", false),
+ Arguments.of("s3://bucket/table/v1/", "s3a://bucket/table/v1", false),
+ Arguments.of("/var/lib/bucket/table/v1", "/var/lib/bucket/table/v1/",
false),
+ Arguments.of("file:///var/lib/bucket/table/v1",
"file:///var/lib/bucket/table/v1/", false));
+ }
+
+ static Stream<Arguments> storageLocationTestArgsException() {
+ return Stream.of(
+ Arguments.of(
+ "s3://bucket/table/v1",
+ "gs://bucket/table/v1",
+ new CatalogRefreshException(
+ "Storage scheme has changed for table
catalogStorageDescriptorUri s3://bucket/table/v1 basePathUri
gs://bucket/table/v1")));
+ }
+
+ @ParameterizedTest
+ @MethodSource("storageLocationTestArgs")
+ void testHasStorageLocationChanged(String storageLocation, String basePath,
boolean expected) {
+ assertEquals(expected,
hasStorageDescriptorLocationChanged(storageLocation, basePath));
+ }
+
+ @ParameterizedTest
+ @MethodSource("storageLocationTestArgsException")
+ void testHasStorageLocationChangedException(
+ String storageLocation, String basePath, Exception exception) {
+ assertThrows(
+ exception.getClass(),
+ () -> hasStorageDescriptorLocationChanged(storageLocation, basePath),
+ exception.getMessage());
+ }
+}
diff --git
a/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java
b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java
index 2a9e0588..39480f8b 100644
---
a/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java
+++
b/xtable-api/src/test/java/org/apache/xtable/spi/sync/TestTableFormatSync.java
@@ -88,7 +88,7 @@ public class TestTableFormatSync {
assertEquals(2, result.size());
SyncResult successResult = result.get(TableFormat.DELTA);
- assertEquals(SyncResult.SyncStatus.SUCCESS, successResult.getStatus());
+ assertEquals(SyncResult.SyncStatus.SUCCESS,
successResult.getTableFormatSyncStatus());
assertEquals(SyncMode.FULL, successResult.getMode());
assertEquals(startingTableState.getLatestCommitTime(),
successResult.getLastInstantSynced());
assertSyncResultTimes(successResult, start);
@@ -99,11 +99,14 @@ public class TestTableFormatSync {
assertEquals(
SyncResult.SyncStatus.builder()
.statusCode(SyncResult.SyncStatusCode.ERROR)
- .errorMessage("Failure")
- .errorDescription("Failed to sync FULL")
- .canRetryOnFailure(true)
+ .errorDetails(
+ SyncResult.ErrorDetails.builder()
+ .errorMessage("Failure")
+ .errorDescription("Failed to sync FULL")
+ .canRetryOnFailure(true)
+ .build())
.build(),
- failureResult.getStatus());
+ failureResult.getTableFormatSyncStatus());
verifyBaseConversionTargetCalls(
mockConversionTarget2, startingTableState, pendingCommitInstants);
@@ -168,7 +171,8 @@ public class TestTableFormatSync {
assertEquals(
tableChanges.get(0).getTableAsOfChange().getLatestCommitTime(),
partialSuccessResults.get(0).getLastInstantSynced());
- assertEquals(SyncResult.SyncStatus.SUCCESS,
partialSuccessResults.get(0).getStatus());
+ assertEquals(
+ SyncResult.SyncStatus.SUCCESS,
partialSuccessResults.get(0).getTableFormatSyncStatus());
assertSyncResultTimes(partialSuccessResults.get(0), start);
assertEquals(SyncMode.INCREMENTAL, partialSuccessResults.get(1).getMode());
@@ -176,11 +180,14 @@ public class TestTableFormatSync {
assertEquals(
SyncResult.SyncStatus.builder()
.statusCode(SyncResult.SyncStatusCode.ERROR)
- .errorMessage("Failure")
- .errorDescription("Failed to sync INCREMENTAL")
- .canRetryOnFailure(true)
+ .errorDetails(
+ SyncResult.ErrorDetails.builder()
+ .errorMessage("Failure")
+ .errorDescription("Failed to sync INCREMENTAL")
+ .canRetryOnFailure(true)
+ .build())
.build(),
- partialSuccessResults.get(1).getStatus());
+ partialSuccessResults.get(1).getTableFormatSyncStatus());
// Assert that all 3 changes are properly synced to the other format
List<SyncResult> successResults = result.get(TableFormat.DELTA);
@@ -190,7 +197,7 @@ public class TestTableFormatSync {
assertEquals(
tableChanges.get(i).getTableAsOfChange().getLatestCommitTime(),
successResults.get(i).getLastInstantSynced());
- assertEquals(SyncResult.SyncStatus.SUCCESS,
successResults.get(i).getStatus());
+ assertEquals(SyncResult.SyncStatus.SUCCESS,
successResults.get(i).getTableFormatSyncStatus());
assertSyncResultTimes(successResults.get(i), start);
}
@@ -257,7 +264,8 @@ public class TestTableFormatSync {
assertEquals(2, conversionTarget1Results.size());
for (SyncResult conversionTarget1Result : conversionTarget1Results) {
assertEquals(SyncMode.INCREMENTAL, conversionTarget1Result.getMode());
- assertEquals(SyncResult.SyncStatus.SUCCESS,
conversionTarget1Result.getStatus());
+ assertEquals(
+ SyncResult.SyncStatus.SUCCESS,
conversionTarget1Result.getTableFormatSyncStatus());
assertSyncResultTimes(conversionTarget1Result, start);
}
assertEquals(
@@ -275,7 +283,9 @@ public class TestTableFormatSync {
assertEquals(
tableChanges.get(i + 1).getTableAsOfChange().getLatestCommitTime(),
conversionTarget2Results.get(i).getLastInstantSynced());
- assertEquals(SyncResult.SyncStatus.SUCCESS,
conversionTarget2Results.get(i).getStatus());
+ assertEquals(
+ SyncResult.SyncStatus.SUCCESS,
+ conversionTarget2Results.get(i).getTableFormatSyncStatus());
assertSyncResultTimes(conversionTarget2Results.get(i), start);
}
@@ -330,7 +340,7 @@ public class TestTableFormatSync {
conversionTarget2Results.forEach(
syncResult -> {
assertEquals(SyncMode.INCREMENTAL, syncResult.getMode());
- assertEquals(SyncResult.SyncStatus.SUCCESS, syncResult.getStatus());
+ assertEquals(SyncResult.SyncStatus.SUCCESS,
syncResult.getTableFormatSyncStatus());
assertSyncResultTimes(syncResult, start);
});
diff --git
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
index bc5f5e02..222652a6 100644
---
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
+++
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
@@ -85,7 +85,7 @@ public class ConversionController {
if (config.getTargetTables() == null ||
config.getTargetTables().isEmpty()) {
throw new IllegalArgumentException("Please provide at-least one format
to sync");
}
- config = ConversionUtils.normalizeTargetPaths(config);
+
try (ConversionSource<COMMIT> conversionSource =
conversionSourceProvider.getConversionSourceInstance(config.getSourceTable())) {
ExtractFromSource<COMMIT> source =
ExtractFromSource.of(conversionSource);
@@ -143,7 +143,7 @@ public class ConversionController {
private static String getFormatsWithStatusCode(
Map<String, SyncResult> syncResultsMerged, SyncResult.SyncStatusCode
statusCode) {
return syncResultsMerged.entrySet().stream()
- .filter(entry -> entry.getValue().getStatus().getStatusCode() ==
statusCode)
+ .filter(entry ->
entry.getValue().getTableFormatSyncStatus().getStatusCode() == statusCode)
.map(Map.Entry::getKey)
.collect(Collectors.joining(","));
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java
deleted file mode 100644
index fdeedc9b..00000000
---
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionUtils.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.xtable.conversion;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-import org.apache.xtable.model.storage.TableFormat;
-
-public class ConversionUtils {
-
- /**
- * Few table formats need the metadata to be located at the root level of
the data files. Eg: An
- * iceberg table generated through spark will have two directories
basePath/data and
- * basePath/metadata For synchronising the iceberg metadata to hudi and
delta, they need to be
- * present in basePath/data/.hoodie and basePath/data/_delta_log.
- *
- * @param config conversion config for synchronizing source and target tables
- * @return updated table config.
- */
- public static ConversionConfig normalizeTargetPaths(ConversionConfig config)
{
- if
(!config.getSourceTable().getDataPath().equals(config.getSourceTable().getBasePath())
- &&
config.getSourceTable().getFormatName().equals(TableFormat.ICEBERG)) {
- List<TargetTable> updatedTargetTables =
- config.getTargetTables().stream()
- .filter(
- targetTable ->
- targetTable.getFormatName().equals(TableFormat.HUDI)
- ||
targetTable.getFormatName().equals(TableFormat.DELTA))
- .map(
- targetTable ->
- targetTable.toBuilder()
- .basePath(config.getSourceTable().getDataPath())
- .build())
- .collect(Collectors.toList());
- return new ConversionConfig(
- config.getSourceTable(), updatedTargetTables, config.getSyncMode());
- }
- return config;
- }
-}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
index a5937b02..140eb8ad 100644
---
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
+++
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
@@ -83,6 +83,13 @@ public class DeltaConversionSource implements
ConversionSource<Long> {
return tableExtractor.table(deltaLog, tableName, version);
}
+ @Override
+ public InternalTable getCurrentTable() {
+ DeltaLog deltaLog = DeltaLog.forTable(sparkSession, basePath);
+ Snapshot snapshot = deltaLog.snapshot();
+ return getTable(snapshot.version());
+ }
+
@Override
public InternalSnapshot getCurrentSnapshot() {
DeltaLog deltaLog = DeltaLog.forTable(sparkSession, basePath);
diff --git
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java
index 1b1d0bf3..02423c2d 100644
--- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java
+++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java
@@ -76,6 +76,19 @@ public class HudiConversionSource implements
ConversionSource<HoodieInstant> {
return tableExtractor.table(metaClient, commit);
}
+ @Override
+ public InternalTable getCurrentTable() {
+ HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+ HoodieTimeline completedTimeline =
activeTimeline.filterCompletedInstants();
+ // get latest commit
+ HoodieInstant latestCommit =
+ completedTimeline
+ .lastInstant()
+ .orElseThrow(
+ () -> new ReadException("Unable to read latest commit from
Hudi source table"));
+ return getTable(latestCommit);
+ }
+
@Override
public InternalSnapshot getCurrentSnapshot() {
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
diff --git
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
index f96ec714..c84fb196 100644
---
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
+++
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
@@ -131,6 +131,13 @@ public class IcebergConversionSource implements
ConversionSource<Snapshot> {
.build();
}
+ @Override
+ public InternalTable getCurrentTable() {
+ Table iceTable = getSourceTable();
+ Snapshot currentSnapshot = iceTable.currentSnapshot();
+ return getTable(currentSnapshot);
+ }
+
@Override
public InternalSnapshot getCurrentSnapshot() {
Table iceTable = getSourceTable();
diff --git
a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
index b5ffcdf1..3d539766 100644
--- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
+++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
@@ -1013,7 +1013,8 @@ public class ITConversionController {
TargetTable.builder()
.name(tableName)
.formatName(formatName)
- .basePath(table.getBasePath())
+ // set the metadata path to the data path as the
default (required by Hudi)
+ .basePath(table.getDataPath())
.metadataRetention(metadataRetention)
.build())
.collect(Collectors.toList());
diff --git
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
index 652bbe42..caba8046 100644
---
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
+++
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
@@ -392,7 +392,7 @@ public class TestConversionController {
return SyncResult.builder()
.mode(syncMode)
.lastInstantSynced(lastSyncedInstant)
- .status(SyncResult.SyncStatus.SUCCESS)
+ .tableFormatSyncStatus(SyncResult.SyncStatus.SUCCESS)
.build();
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionUtils.java
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionUtils.java
deleted file mode 100644
index b1044039..00000000
---
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionUtils.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.xtable.conversion;
-
-import static org.junit.jupiter.api.Assertions.*;
-
-import java.util.Arrays;
-
-import org.junit.jupiter.api.Test;
-
-import org.apache.xtable.model.storage.TableFormat;
-import org.apache.xtable.model.sync.SyncMode;
-
-class TestConversionUtils {
-
- @Test
- void testNormalizeTargetPaths() {
- ConversionConfig config =
- ConversionConfig.builder()
- .sourceTable(
- SourceTable.builder()
- .name("table_name")
- .formatName(TableFormat.ICEBERG)
- .basePath("/tmp/basePath")
- .dataPath("/tmp/basePath/data")
- .build())
- .syncMode(SyncMode.FULL)
- .targetTables(
- Arrays.asList(
- TargetTable.builder()
- .name("table_name")
- .basePath("/tmp/basePath")
- .formatName(TableFormat.DELTA)
- .build(),
- TargetTable.builder()
- .name("table_name")
- .basePath("/tmp/basePath")
- .formatName(TableFormat.HUDI)
- .build()))
- .build();
- ConversionConfig expectedNormalizedConfig =
- ConversionConfig.builder()
- .sourceTable(
- SourceTable.builder()
- .name("table_name")
- .formatName(TableFormat.ICEBERG)
- .basePath("/tmp/basePath")
- .dataPath("/tmp/basePath/data")
- .build())
- .syncMode(SyncMode.FULL)
- .targetTables(
- Arrays.asList(
- TargetTable.builder()
- .name("table_name")
- .basePath("/tmp/basePath/data")
- .formatName(TableFormat.DELTA)
- .build(),
- TargetTable.builder()
- .name("table_name")
- .basePath("/tmp/basePath/data")
- .formatName(TableFormat.HUDI)
- .build()))
- .build();
- ConversionConfig actualConfig =
ConversionUtils.normalizeTargetPaths(config);
- assertEquals(expectedNormalizedConfig, actualConfig);
- }
-
- @Test
- void testNormalizeTargetPathsNoOp() {
- ConversionConfig config =
- ConversionConfig.builder()
- .sourceTable(
- SourceTable.builder()
- .name("table_name")
- .formatName(TableFormat.HUDI)
- .basePath("/tmp/basePath")
- .build())
- .syncMode(SyncMode.FULL)
- .targetTables(
- Arrays.asList(
- TargetTable.builder()
- .name("table_name")
- .basePath("/tmp/basePath")
- .formatName(TableFormat.ICEBERG)
- .build(),
- TargetTable.builder()
- .name("table_name")
- .basePath("/tmp/basePath")
- .formatName(TableFormat.DELTA)
- .build()))
- .build();
- ConversionConfig actualConfig =
ConversionUtils.normalizeTargetPaths(config);
- assertEquals(config, actualConfig);
- }
-}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java
b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java
index 8fcf0753..ca8bc3fa 100644
---
a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java
+++
b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java
@@ -18,6 +18,7 @@
package org.apache.xtable.delta;
+import static org.apache.xtable.testutil.ITTestUtils.validateTable;
import static org.junit.jupiter.api.Assertions.*;
import java.net.URI;
@@ -208,6 +209,44 @@ public class ITDeltaConversionTargetSource {
snapshot.getPartitionedDataFiles().get(0));
}
+ @Test
+ void getCurrentTableTest() {
+ // Table name
+ final String tableName = GenericTable.getTableName();
+ final Path basePath = tempDir.resolve(tableName);
+ // Create table with a single row using Spark
+ sparkSession.sql(
+ "CREATE TABLE `"
+ + tableName
+ + "` USING DELTA LOCATION '"
+ + basePath
+ + "' AS SELECT * FROM VALUES (1, 2)");
+ // Create Delta source
+ SourceTable tableConfig =
+ SourceTable.builder()
+ .name(tableName)
+ .basePath(basePath.toString())
+ .formatName(TableFormat.DELTA)
+ .build();
+ DeltaConversionSource conversionSource =
+ conversionSourceProvider.getConversionSourceInstance(tableConfig);
+ // Get current table
+ InternalTable internalTable = conversionSource.getCurrentTable();
+ List<InternalField> fields = Arrays.asList(COL1_INT_FIELD, COL2_INT_FIELD);
+ validateTable(
+ internalTable,
+ tableName,
+ TableFormat.DELTA,
+ InternalSchema.builder()
+ .name("struct")
+ .dataType(InternalType.RECORD)
+ .fields(fields)
+ .build(),
+ DataLayoutStrategy.FLAT,
+ "file:" + basePath,
+ Collections.emptyList());
+ }
+
@Test
void getCurrentSnapshotPartitionedTest() throws URISyntaxException {
// Table name
@@ -660,22 +699,6 @@ public class ITDeltaConversionTargetSource {
assertEquals(PartitionTransformType.YEAR,
partitionField.getTransformType());
}
- private static void validateTable(
- InternalTable internalTable,
- String tableName,
- String tableFormat,
- InternalSchema readSchema,
- DataLayoutStrategy dataLayoutStrategy,
- String basePath,
- List<InternalPartitionField> partitioningFields) {
- Assertions.assertEquals(tableName, internalTable.getName());
- Assertions.assertEquals(tableFormat, internalTable.getTableFormat());
- Assertions.assertEquals(readSchema, internalTable.getReadSchema());
- Assertions.assertEquals(dataLayoutStrategy,
internalTable.getLayoutStrategy());
- Assertions.assertEquals(basePath, internalTable.getBasePath());
- Assertions.assertEquals(partitioningFields,
internalTable.getPartitioningFields());
- }
-
private void validatePartitionDataFiles(
PartitionFileGroup expectedPartitionFiles, PartitionFileGroup
actualPartitionFiles)
throws URISyntaxException {
diff --git
a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceSource.java
b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceSource.java
index c074bc23..3debf904 100644
---
a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceSource.java
+++
b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceSource.java
@@ -19,6 +19,7 @@
package org.apache.xtable.hudi;
import static java.util.stream.Collectors.groupingBy;
+import static org.apache.xtable.testutil.ITTestUtils.validateTable;
import static org.junit.jupiter.api.Assertions.*;
import java.nio.file.Path;
@@ -26,6 +27,7 @@ import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -38,6 +40,7 @@ import java.util.stream.Stream;
import lombok.Builder;
import lombok.Value;
+import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
@@ -65,7 +68,13 @@ import org.apache.xtable.ValidationTestHelper;
import org.apache.xtable.model.CommitsBacklog;
import org.apache.xtable.model.InstantsForIncrementalSync;
import org.apache.xtable.model.InternalSnapshot;
+import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.TableChange;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+import org.apache.xtable.model.storage.DataLayoutStrategy;
+import org.apache.xtable.model.storage.TableFormat;
/**
* A suite of functional tests that the extraction from Hudi to Intermediate
representation works.
@@ -98,6 +107,122 @@ public class ITHudiConversionSourceSource {
}
}
+ @Test
+ void getCurrentTableTest() {
+ String tableName = GenericTable.getTableName();
+ Path basePath = tempDir.resolve(tableName);
+ HudiTestUtil.PartitionConfig partitionConfig =
HudiTestUtil.PartitionConfig.of(null, null);
+ Schema schema =
+ Schema.createRecord(
+ "testCurrentTable",
+ null,
+ "hudi",
+ false,
+ Arrays.asList(
+ new Schema.Field("field1", Schema.create(Schema.Type.STRING)),
+ new Schema.Field("field2",
Schema.create(Schema.Type.STRING))));
+ try (TestJavaHudiTable table =
+ TestJavaHudiTable.withSchema(
+ tableName,
+ tempDir,
+ HudiTestUtil.PartitionConfig.of(null, null).getHudiConfig(),
+ HoodieTableType.MERGE_ON_READ,
+ schema)) {
+ table.insertRecords(5, Collections.emptyList(), false);
+ HudiConversionSource hudiClient =
+ getHudiSourceClient(
+ CONFIGURATION, table.getBasePath(),
partitionConfig.getXTableConfig());
+ InternalTable internalTable = hudiClient.getCurrentTable();
+ InternalSchema internalSchema =
+ InternalSchema.builder()
+ .name("testCurrentTable")
+ .dataType(InternalType.RECORD)
+ .isNullable(false)
+ .fields(
+ Arrays.asList(
+ InternalField.builder()
+ .name("_hoodie_commit_time")
+ .schema(
+ InternalSchema.builder()
+ .name("string")
+ .dataType(InternalType.STRING)
+ .isNullable(true)
+ .build())
+
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+ .build(),
+ InternalField.builder()
+ .name("_hoodie_commit_seqno")
+ .schema(
+ InternalSchema.builder()
+ .name("string")
+ .dataType(InternalType.STRING)
+ .isNullable(true)
+ .build())
+
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+ .build(),
+ InternalField.builder()
+ .name("_hoodie_record_key")
+ .schema(
+ InternalSchema.builder()
+ .name("string")
+ .dataType(InternalType.STRING)
+ .isNullable(true)
+ .build())
+
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+ .build(),
+ InternalField.builder()
+ .name("_hoodie_partition_path")
+ .schema(
+ InternalSchema.builder()
+ .name("string")
+ .dataType(InternalType.STRING)
+ .isNullable(true)
+ .build())
+
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+ .build(),
+ InternalField.builder()
+ .name("_hoodie_file_name")
+ .schema(
+ InternalSchema.builder()
+ .name("string")
+ .dataType(InternalType.STRING)
+ .isNullable(true)
+ .build())
+
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+ .build(),
+ InternalField.builder()
+ .name("field1")
+ .schema(
+ InternalSchema.builder()
+ .name("string")
+ .dataType(InternalType.STRING)
+ .isNullable(false)
+ .build())
+ .defaultValue(null)
+ .build(),
+ InternalField.builder()
+ .name("field2")
+ .schema(
+ InternalSchema.builder()
+ .name("string")
+ .dataType(InternalType.STRING)
+ .isNullable(false)
+ .build())
+ .defaultValue(null)
+ .build()))
+ .recordKeyFields(Collections.singletonList(null))
+ .build();
+ validateTable(
+ internalTable,
+ tableName,
+ TableFormat.HUDI,
+ internalSchema,
+ DataLayoutStrategy.FLAT,
+ "file:" + basePath + "_v1",
+ Collections.emptyList());
+ }
+ }
+
@ParameterizedTest
@MethodSource("testsForAllTableTypesAndPartitions")
public void insertAndUpsertData(
diff --git
a/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionTargetSource.java
b/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionTargetSource.java
index 3f20ac9d..acd88688 100644
---
a/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionTargetSource.java
+++
b/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionTargetSource.java
@@ -21,6 +21,7 @@ package org.apache.xtable.iceberg;
import static org.apache.xtable.GenericTable.getTableName;
import static org.apache.xtable.ValidationTestHelper.validateSnapshot;
import static org.apache.xtable.ValidationTestHelper.validateTableChanges;
+import static org.apache.xtable.testutil.ITTestUtils.validateTable;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -29,6 +30,8 @@ import java.nio.file.Path;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -49,9 +52,14 @@ import org.apache.xtable.conversion.SourceTable;
import org.apache.xtable.model.CommitsBacklog;
import org.apache.xtable.model.InstantsForIncrementalSync;
import org.apache.xtable.model.InternalSnapshot;
+import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.TableChange;
+import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
import org.apache.xtable.model.schema.PartitionTransformType;
+import org.apache.xtable.model.storage.DataLayoutStrategy;
import org.apache.xtable.model.storage.TableFormat;
public class ITIcebergConversionTargetSource {
@@ -66,6 +74,67 @@ public class ITIcebergConversionTargetSource {
sourceProvider.init(hadoopConf);
}
+ @Test
+ void getCurrentTableTest() {
+ String tableName = getTableName();
+ try (TestIcebergTable testIcebergTable =
+ new TestIcebergTable(
+ tableName,
+ tempDir,
+ hadoopConf,
+ "field1",
+ Collections.singletonList(null),
+ TestIcebergDataHelper.SchemaType.BASIC)) {
+ testIcebergTable.insertRows(50);
+ SourceTable tableConfig =
+ SourceTable.builder()
+ .name(testIcebergTable.getTableName())
+ .basePath(testIcebergTable.getBasePath())
+ .formatName(TableFormat.ICEBERG)
+ .build();
+ IcebergConversionSource conversionSource =
+ sourceProvider.getConversionSourceInstance(tableConfig);
+ InternalTable internalTable = conversionSource.getCurrentTable();
+ InternalSchema internalSchema =
+ InternalSchema.builder()
+ .name("record")
+ .dataType(InternalType.RECORD)
+ .fields(
+ Arrays.asList(
+ InternalField.builder()
+ .name("field1")
+ .fieldId(1)
+ .schema(
+ InternalSchema.builder()
+ .name("string")
+ .dataType(InternalType.STRING)
+ .isNullable(true)
+ .build())
+
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+ .build(),
+ InternalField.builder()
+ .name("field2")
+ .fieldId(2)
+ .schema(
+ InternalSchema.builder()
+ .name("string")
+ .dataType(InternalType.STRING)
+ .isNullable(true)
+ .build())
+
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+ .build()))
+ .build();
+ validateTable(
+ internalTable,
+ testIcebergTable.getBasePath(),
+ TableFormat.ICEBERG,
+ internalSchema,
+ DataLayoutStrategy.FLAT,
+ testIcebergTable.getBasePath(),
+ Collections.emptyList());
+ }
+ }
+
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testInsertsUpsertsAndDeletes(boolean isPartitioned) {
diff --git
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java
index 1d10fe7a..d90ba169 100644
---
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java
+++
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergDataHelper.java
@@ -56,6 +56,10 @@ import org.apache.iceberg.types.Types.NestedField;
@Value
public class TestIcebergDataHelper {
private static final Random RANDOM = new Random();
+ private static final List<Types.NestedField> BASIC_FIELDS =
+ Arrays.asList(
+ NestedField.optional(1, "field1", Types.StringType.get()),
+ NestedField.optional(2, "field2", Types.StringType.get()));
private static final List<Types.NestedField> COMMON_FIELDS =
Arrays.asList(
NestedField.optional(1, "id", Types.StringType.get()),
@@ -114,6 +118,7 @@ public class TestIcebergDataHelper {
private static final Schema SCHEMA_WITH_UUID_COLUMN =
new Schema(
Stream.concat(COMMON_FIELDS.stream(),
UUID_FIELDS.stream()).collect(Collectors.toList()));
+ private static final Schema BASIC_SCHEMA = new Schema(BASIC_FIELDS);
private static final OffsetDateTime EPOCH =
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
@@ -122,6 +127,7 @@ public class TestIcebergDataHelper {
List<String> partitionFieldNames;
public static enum SchemaType {
+ BASIC,
COMMON,
COMMON_WITH_ADDITIONAL_COLUMNS,
COMMON_WITH_UUID_COLUMN,
@@ -145,6 +151,8 @@ public class TestIcebergDataHelper {
return SCHEMA_WITH_ADDITIONAL_COLUMNS;
case COMMON_WITH_UUID_COLUMN:
return SCHEMA_WITH_UUID_COLUMN;
+ case BASIC:
+ return BASIC_SCHEMA;
default:
throw new IllegalArgumentException("Unknown schema type: " +
schemaType);
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java
b/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java
new file mode 100644
index 00000000..281e61fe
--- /dev/null
+++ b/xtable-core/src/test/java/org/apache/xtable/testutil/ITTestUtils.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.testutil;
+
+import java.util.List;
+
+import org.junit.jupiter.api.Assertions;
+
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.storage.DataLayoutStrategy;
+
+public class ITTestUtils {
+
+ public static void validateTable(
+ InternalTable internalTable,
+ String tableName,
+ String tableFormat,
+ InternalSchema readSchema,
+ DataLayoutStrategy dataLayoutStrategy,
+ String basePath,
+ List<InternalPartitionField> partitioningFields) {
+ Assertions.assertEquals(tableName, internalTable.getName());
+ Assertions.assertEquals(tableFormat, internalTable.getTableFormat());
+ Assertions.assertEquals(readSchema, internalTable.getReadSchema());
+ Assertions.assertEquals(dataLayoutStrategy,
internalTable.getLayoutStrategy());
+ Assertions.assertEquals(basePath, internalTable.getBasePath());
+ Assertions.assertEquals(partitioningFields,
internalTable.getPartitioningFields());
+ }
+}
diff --git
a/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncTool.java
b/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncTool.java
index a9653eb5..7491cec4 100644
---
a/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncTool.java
+++
b/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncTool.java
@@ -106,7 +106,7 @@ public class XTableSyncTool extends HoodieSyncTool {
results.entrySet().stream()
.filter(
entry ->
- entry.getValue().getStatus().getStatusCode()
+ entry.getValue().getTableFormatSyncStatus().getStatusCode()
!= SyncResult.SyncStatusCode.SUCCESS)
.map(Map.Entry::getKey)
.collect(Collectors.joining(","));