This is an automated email from the ASF dual-hosted git repository.
timbrown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/main by this push:
new e162b3bd Refactor config classes
e162b3bd is described below
commit e162b3bd717af6945dad56b931c6679c4fbaaada
Author: Timothy Brown <[email protected]>
AuthorDate: Sun Aug 18 19:59:44 2024 -0500
Refactor config classes
---
.../{PerTableConfig.java => ConversionConfig.java} | 41 +--
.../apache/xtable/conversion/ExternalTable.java | 77 +++++
.../{HudiSourceConfig.java => SourceTable.java} | 27 +-
.../{PerTableConfig.java => TargetTable.java} | 48 ++--
.../apache/xtable/spi/sync/ConversionTarget.java | 4 +-
.../xtable/conversion/TestExternalTable.java | 56 ++++
.../apache/xtable/conversion/TestSourceTable.java} | 48 ++--
.../apache/xtable/conversion/TestTargetTable.java} | 30 +-
.../xtable/conversion/ConversionController.java | 25 +-
.../conversion/ConversionSourceProvider.java | 13 +-
.../xtable/conversion/ConversionTargetFactory.java | 16 +-
.../xtable/conversion/PerTableConfigImpl.java | 142 ----------
.../delta/DeltaConversionSourceProvider.java | 10 +-
.../apache/xtable/delta/DeltaConversionTarget.java | 28 +-
.../ConfigurationBasedPartitionSpecExtractor.java | 4 +-
.../xtable/hudi/HudiConversionSourceProvider.java | 11 +-
.../apache/xtable/hudi/HudiConversionTarget.java | 20 +-
...SourceConfigImpl.java => HudiSourceConfig.java} | 32 ++-
.../xtable/iceberg/IcebergConversionSource.java | 33 ++-
.../iceberg/IcebergConversionSourceProvider.java | 4 +-
.../xtable/iceberg/IcebergConversionTarget.java | 22 +-
.../org/apache/xtable/ITConversionController.java | 310 +++++++++++----------
.../xtable/conversion/TestConversionConfig.java | 66 +++++
.../conversion/TestConversionController.java | 106 ++++---
.../conversion/TestConversionTargetFactory.java | 37 +--
.../xtable/conversion/TestPerTableConfig.java | 118 --------
.../delta/ITDeltaConversionTargetSource.java | 95 ++++---
.../org/apache/xtable/delta/TestDeltaSync.java | 14 +-
.../xtable/hudi/ITHudiConversionSourceSource.java | 2 +-
.../xtable/hudi/ITHudiConversionSourceTarget.java | 13 +-
.../iceberg/ITIcebergConversionTargetSource.java | 57 ++--
.../iceberg/TestIcebergConversionTargetSource.java | 22 +-
.../org/apache/xtable/iceberg/TestIcebergSync.java | 14 +-
.../java/org/apache/xtable/loadtest/LoadTest.java | 76 +++--
.../apache/xtable/hudi/sync/XTableSyncConfig.java | 4 +-
.../apache/xtable/hudi/sync/XTableSyncTool.java | 59 ++--
.../xtable/hudi/sync/TestXTableSyncTool.java | 2 +-
.../java/org/apache/xtable/utilities/RunSync.java | 64 +++--
38 files changed, 924 insertions(+), 826 deletions(-)
diff --git
a/xtable-api/src/main/java/org/apache/xtable/conversion/PerTableConfig.java
b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java
similarity index 54%
copy from
xtable-api/src/main/java/org/apache/xtable/conversion/PerTableConfig.java
copy to
xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java
index 612fc9b3..73e2628d 100644
--- a/xtable-api/src/main/java/org/apache/xtable/conversion/PerTableConfig.java
+++
b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java
@@ -20,24 +20,31 @@ package org.apache.xtable.conversion;
import java.util.List;
-import org.apache.xtable.model.sync.SyncMode;
-
-public interface PerTableConfig {
- int getTargetMetadataRetentionInHours();
-
- String getTableBasePath();
-
- String getTableDataPath();
+import lombok.Builder;
+import lombok.NonNull;
+import lombok.Value;
- String getTableName();
+import com.google.common.base.Preconditions;
- HudiSourceConfig getHudiSourceConfig();
-
- List<String> getTargetTableFormats();
-
- SyncMode getSyncMode();
-
- String[] getNamespace();
+import org.apache.xtable.model.sync.SyncMode;
- CatalogConfig getIcebergCatalogConfig();
+@Value
+public class ConversionConfig {
+ // The source of the sync
+ @NonNull SourceTable sourceTable;
+ // One or more targets to sync the table metadata to
+ List<TargetTable> targetTables;
+ // The mode, incremental or snapshot
+ SyncMode syncMode;
+
+ @Builder
+ ConversionConfig(
+ @NonNull SourceTable sourceTable, List<TargetTable> targetTables,
SyncMode syncMode) {
+ this.sourceTable = sourceTable;
+ this.targetTables = targetTables;
+ Preconditions.checkArgument(
+ targetTables != null && !targetTables.isEmpty(),
+ "Please provide at-least one format to sync");
+ this.syncMode = syncMode == null ? SyncMode.INCREMENTAL : syncMode;
+ }
}
diff --git
a/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java
b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java
new file mode 100644
index 00000000..939c59c0
--- /dev/null
+++ b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.conversion;
+
+import java.util.Properties;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NonNull;
+
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Preconditions;
+
+/** Defines a reference to a table in a particular format. */
+@Getter
+@EqualsAndHashCode
+class ExternalTable {
+ /** The name of the table. */
+ protected final @NonNull String name;
+ /** The format of the table (e.g. DELTA, ICEBERG, HUDI) */
+ protected final @NonNull String formatName;
+ /** The path to the root of the table or the metadata directory depending on
the format */
+ protected final @NonNull String basePath;
+ /** Optional namespace for the table */
+ protected final String[] namespace;
+ /** The configuration for interacting with the catalog that manages this
table */
+ protected final CatalogConfig catalogConfig;
+
+ /** Optional, additional properties that can be used to define interactions
with the table */
+ protected final Properties additionalProperties;
+
+ ExternalTable(
+ @NonNull String name,
+ @NonNull String formatName,
+ @NonNull String basePath,
+ String[] namespace,
+ CatalogConfig catalogConfig,
+ Properties additionalProperties) {
+ this.name = name;
+ this.formatName = formatName;
+ this.basePath = sanitizeBasePath(basePath);
+ this.namespace = namespace;
+ this.catalogConfig = catalogConfig;
+ this.additionalProperties = additionalProperties;
+ }
+
+ protected String sanitizeBasePath(String tableBasePath) {
+ Path path = new Path(tableBasePath);
+ Preconditions.checkArgument(path.isAbsolute(), "Table base path must be
absolute");
+ if (path.isAbsoluteAndSchemeAuthorityNull()) {
+ // assume this is local file system and append scheme
+ return "file://" + path;
+ } else if (path.toUri().getScheme().equals("file")) {
+ // add extra slashes
+ return "file://" + path.toUri().getPath();
+ } else {
+ return path.toString();
+ }
+ }
+}
diff --git
a/xtable-api/src/main/java/org/apache/xtable/conversion/HudiSourceConfig.java
b/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java
similarity index 53%
rename from
xtable-api/src/main/java/org/apache/xtable/conversion/HudiSourceConfig.java
rename to xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java
index 6edf356f..b37e1c1e 100644
---
a/xtable-api/src/main/java/org/apache/xtable/conversion/HudiSourceConfig.java
+++ b/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java
@@ -18,10 +18,29 @@
package org.apache.xtable.conversion;
-import org.apache.xtable.spi.extractor.SourcePartitionSpecExtractor;
+import java.util.Properties;
-public interface HudiSourceConfig {
- String getPartitionSpecExtractorClass();
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NonNull;
- SourcePartitionSpecExtractor loadSourcePartitionSpecExtractor();
+@EqualsAndHashCode(callSuper = true)
+@Getter
+public class SourceTable extends ExternalTable {
+ /** The path to the data files, defaults to the metadataPath */
+ @NonNull private final String dataPath;
+
+ @Builder(toBuilder = true)
+ public SourceTable(
+ String name,
+ String formatName,
+ String basePath,
+ String dataPath,
+ String[] namespace,
+ CatalogConfig catalogConfig,
+ Properties additionalProperties) {
+ super(name, formatName, basePath, namespace, catalogConfig,
additionalProperties);
+ this.dataPath = dataPath == null ? this.getBasePath() :
sanitizeBasePath(dataPath);
+ }
}
diff --git
a/xtable-api/src/main/java/org/apache/xtable/conversion/PerTableConfig.java
b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java
similarity index 52%
copy from
xtable-api/src/main/java/org/apache/xtable/conversion/PerTableConfig.java
copy to xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java
index 612fc9b3..6256da2c 100644
--- a/xtable-api/src/main/java/org/apache/xtable/conversion/PerTableConfig.java
+++ b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java
@@ -18,26 +18,30 @@
package org.apache.xtable.conversion;
-import java.util.List;
-
-import org.apache.xtable.model.sync.SyncMode;
-
-public interface PerTableConfig {
- int getTargetMetadataRetentionInHours();
-
- String getTableBasePath();
-
- String getTableDataPath();
-
- String getTableName();
-
- HudiSourceConfig getHudiSourceConfig();
-
- List<String> getTargetTableFormats();
-
- SyncMode getSyncMode();
-
- String[] getNamespace();
-
- CatalogConfig getIcebergCatalogConfig();
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.Properties;
+
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+
+@Getter
+@EqualsAndHashCode(callSuper = true)
+public class TargetTable extends ExternalTable {
+ private final Duration metadataRetention;
+
+ @Builder(toBuilder = true)
+ public TargetTable(
+ String name,
+ String formatName,
+ String basePath,
+ String[] namespace,
+ CatalogConfig catalogConfig,
+ Duration metadataRetention,
+ Properties additionalProperties) {
+ super(name, formatName, basePath, namespace, catalogConfig,
additionalProperties);
+ this.metadataRetention =
+ metadataRetention == null ? Duration.of(7, ChronoUnit.DAYS) :
metadataRetention;
+ }
}
diff --git
a/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java
b/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java
index a0df756a..736f49e4 100644
--- a/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java
+++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java
@@ -23,7 +23,7 @@ import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
-import org.apache.xtable.conversion.PerTableConfig;
+import org.apache.xtable.conversion.TargetTable;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.metadata.TableSyncMetadata;
import org.apache.xtable.model.schema.InternalPartitionField;
@@ -89,5 +89,5 @@ public interface ConversionTarget {
String getTableFormat();
/** Initializes the client with provided configuration */
- void init(PerTableConfig perTableConfig, Configuration configuration);
+ void init(TargetTable targetTable, Configuration configuration);
}
diff --git
a/xtable-api/src/test/java/org/apache/xtable/conversion/TestExternalTable.java
b/xtable-api/src/test/java/org/apache/xtable/conversion/TestExternalTable.java
new file mode 100644
index 00000000..5422b0a7
--- /dev/null
+++
b/xtable-api/src/test/java/org/apache/xtable/conversion/TestExternalTable.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.conversion;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.junit.jupiter.api.Test;
+
+public class TestExternalTable {
+ @Test
+ void sanitizePath() {
+ ExternalTable tooManySlashes =
+ new ExternalTable("name", "hudi", "s3://bucket//path", null, null,
null);
+ assertEquals("s3://bucket/path", tooManySlashes.getBasePath());
+
+ ExternalTable localFilePath =
+ new ExternalTable("name", "hudi", "/local/data//path", null, null,
null);
+ assertEquals("file:///local/data/path", localFilePath.getBasePath());
+
+ ExternalTable properLocalFilePath =
+ new ExternalTable("name", "hudi", "file:///local/data//path", null,
null, null);
+ assertEquals("file:///local/data/path", properLocalFilePath.getBasePath());
+ }
+
+ @Test
+ void errorIfRequiredArgsNotSet() {
+ assertThrows(
+ NullPointerException.class,
+ () -> new ExternalTable("name", "hudi", null, null, null, null));
+
+ assertThrows(
+ NullPointerException.class,
+ () -> new ExternalTable("name", null, "file://bucket/path", null,
null, null));
+
+ assertThrows(
+ NullPointerException.class,
+ () -> new ExternalTable(null, "hudi", "file://bucket/path", null,
null, null));
+ }
+}
diff --git
a/xtable-api/src/main/java/org/apache/xtable/conversion/PerTableConfig.java
b/xtable-api/src/test/java/org/apache/xtable/conversion/TestSourceTable.java
similarity index 51%
copy from
xtable-api/src/main/java/org/apache/xtable/conversion/PerTableConfig.java
copy to
xtable-api/src/test/java/org/apache/xtable/conversion/TestSourceTable.java
index 612fc9b3..05effaf9 100644
--- a/xtable-api/src/main/java/org/apache/xtable/conversion/PerTableConfig.java
+++ b/xtable-api/src/test/java/org/apache/xtable/conversion/TestSourceTable.java
@@ -18,26 +18,30 @@
package org.apache.xtable.conversion;
-import java.util.List;
-
-import org.apache.xtable.model.sync.SyncMode;
-
-public interface PerTableConfig {
- int getTargetMetadataRetentionInHours();
-
- String getTableBasePath();
-
- String getTableDataPath();
-
- String getTableName();
-
- HudiSourceConfig getHudiSourceConfig();
-
- List<String> getTargetTableFormats();
-
- SyncMode getSyncMode();
-
- String[] getNamespace();
-
- CatalogConfig getIcebergCatalogConfig();
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.junit.jupiter.api.Test;
+
+class TestSourceTable {
+ @Test
+ void dataPathDefaultsToMetadataPath() {
+ String basePath = "file:///path/to/table";
+ SourceTable sourceTable =
+
SourceTable.builder().name("name").formatName("hudi").basePath(basePath).build();
+ assertEquals(basePath, sourceTable.getDataPath());
+ }
+
+ @Test
+ void dataPathIsSanitized() {
+ String basePath = "file:///path/to/table";
+ String dataPath = "file:///path/to/table//data";
+ SourceTable sourceTable =
+ SourceTable.builder()
+ .name("name")
+ .formatName("hudi")
+ .basePath(basePath)
+ .dataPath(dataPath)
+ .build();
+ assertEquals("file:///path/to/table/data", sourceTable.getDataPath());
+ }
}
diff --git
a/xtable-api/src/main/java/org/apache/xtable/conversion/PerTableConfig.java
b/xtable-api/src/test/java/org/apache/xtable/conversion/TestTargetTable.java
similarity index 65%
rename from
xtable-api/src/main/java/org/apache/xtable/conversion/PerTableConfig.java
rename to
xtable-api/src/test/java/org/apache/xtable/conversion/TestTargetTable.java
index 612fc9b3..9faa22c8 100644
--- a/xtable-api/src/main/java/org/apache/xtable/conversion/PerTableConfig.java
+++ b/xtable-api/src/test/java/org/apache/xtable/conversion/TestTargetTable.java
@@ -18,26 +18,18 @@
package org.apache.xtable.conversion;
-import java.util.List;
+import static org.junit.jupiter.api.Assertions.assertEquals;
-import org.apache.xtable.model.sync.SyncMode;
+import java.time.Duration;
-public interface PerTableConfig {
- int getTargetMetadataRetentionInHours();
+import org.junit.jupiter.api.Test;
- String getTableBasePath();
-
- String getTableDataPath();
-
- String getTableName();
-
- HudiSourceConfig getHudiSourceConfig();
-
- List<String> getTargetTableFormats();
-
- SyncMode getSyncMode();
-
- String[] getNamespace();
-
- CatalogConfig getIcebergCatalogConfig();
+class TestTargetTable {
+ @Test
+ void retentionDefaultsToSevenDays() {
+ String basePath = "file:///path/to/table";
+ TargetTable targetTable =
+
TargetTable.builder().name("name").formatName("hudi").basePath(basePath).build();
+ assertEquals(Duration.ofDays(7), targetTable.getMetadataRetention());
+ }
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
index 34968733..dc665969 100644
---
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
+++
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java
@@ -26,7 +26,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -52,7 +51,7 @@ import org.apache.xtable.spi.sync.TableFormatSync;
/**
* Responsible for completing the entire lifecycle of the sync process given
{@link
- * PerTableConfigImpl}. This is done in three steps,
+ * ConversionConfig}. This is done in three steps,
*
* <ul>
* <li>1. Extracting snapshot {@link InternalSnapshot} from the source table
format.
@@ -72,33 +71,31 @@ public class ConversionController {
}
/**
- * Runs a sync for the given source table configuration in PerTableConfig.
+ * Runs a sync for the given source table configuration in ConversionConfig.
*
* @param config A per table level config containing tableBasePath,
partitionFieldSpecConfig,
* targetTableFormats and syncMode
* @param conversionSourceProvider A provider for the {@link
ConversionSource} instance, {@link
- * ConversionSourceProvider#init(Configuration, Map)} must be called
before calling this
- * method.
+ * ConversionSourceProvider#init(Configuration)} must be called before
calling this method.
* @return Returns a map containing the table format, and it's sync result.
Run sync for a table
* with the provided per table level configuration.
*/
public <COMMIT> Map<String, SyncResult> sync(
- PerTableConfig config, ConversionSourceProvider<COMMIT>
conversionSourceProvider) {
- if (config.getTargetTableFormats() == null ||
config.getTargetTableFormats().isEmpty()) {
+ ConversionConfig config, ConversionSourceProvider<COMMIT>
conversionSourceProvider) {
+ if (config.getTargetTables() == null ||
config.getTargetTables().isEmpty()) {
throw new IllegalArgumentException("Please provide at-least one format
to sync");
}
try (ConversionSource<COMMIT> conversionSource =
- conversionSourceProvider.getConversionSourceInstance(config)) {
+
conversionSourceProvider.getConversionSourceInstance(config.getSourceTable())) {
ExtractFromSource<COMMIT> source =
ExtractFromSource.of(conversionSource);
Map<String, ConversionTarget> conversionTargetByFormat =
- config.getTargetTableFormats().stream()
+ config.getTargetTables().stream()
.collect(
Collectors.toMap(
- Function.identity(),
- tableFormat ->
- conversionTargetFactory.createForFormat(tableFormat,
config, conf)));
+ TargetTable::getFormatName,
+ targetTable ->
conversionTargetFactory.createForFormat(targetTable, conf)));
// State for each TableFormat
Map<String, Optional<TableSyncMetadata>> lastSyncMetadataByFormat =
conversionTargetByFormat.entrySet().stream()
@@ -152,11 +149,11 @@ public class ConversionController {
}
private <COMMIT> Map<String, ConversionTarget> getFormatsToSyncIncrementally(
- PerTableConfig perTableConfig,
+ ConversionConfig conversionConfig,
Map<String, ConversionTarget> conversionTargetByFormat,
Map<String, Optional<TableSyncMetadata>> lastSyncMetadataByFormat,
ConversionSource<COMMIT> conversionSource) {
- if (perTableConfig.getSyncMode() == SyncMode.FULL) {
+ if (conversionConfig.getSyncMode() == SyncMode.FULL) {
// Full sync requested by config, hence no incremental sync.
return Collections.emptyMap();
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionSourceProvider.java
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionSourceProvider.java
index 09cad6db..ccd6fde7 100644
---
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionSourceProvider.java
+++
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionSourceProvider.java
@@ -18,8 +18,6 @@
package org.apache.xtable.conversion;
-import java.util.Map;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.xtable.spi.extractor.ConversionSource;
@@ -33,16 +31,9 @@ public abstract class ConversionSourceProvider<COMMIT> {
/** The Hadoop configuration to use when reading from the source table. */
protected Configuration hadoopConf;
- /** The configuration for the source. */
- protected Map<String, String> sourceConf;
-
- /** The configuration for the table to read from. */
- protected PerTableConfig sourceTableConfig;
-
/** Initializes the provider various source specific configurations. */
- public void init(Configuration hadoopConf, Map<String, String> sourceConf) {
+ public void init(Configuration hadoopConf) {
this.hadoopConf = hadoopConf;
- this.sourceConf = sourceConf;
}
/**
@@ -54,5 +45,5 @@ public abstract class ConversionSourceProvider<COMMIT> {
* @return the conversion source
*/
public abstract ConversionSource<COMMIT> getConversionSourceInstance(
- PerTableConfig sourceTableConfig);
+ SourceTable sourceTableConfig);
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionTargetFactory.java
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionTargetFactory.java
index 68e85b4d..8e355897 100644
---
a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionTargetFactory.java
+++
b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionTargetFactory.java
@@ -38,19 +38,17 @@ public class ConversionTargetFactory {
/**
* Create a fully initialized instance of the ConversionTarget represented
by the given Table
- * Format name. Initialization is done with the config provided through
PerTableConfig and
+ * Format name. Initialization is done with the config provided through
TargetTable and
* Configuration params.
*
- * @param tableFormat
- * @param perTableConfig
- * @param configuration
- * @return
+ * @param targetTable the spec of the target
+ * @param configuration hadoop configuration
+ * @return an intialized {@link ConversionTarget}
*/
- public ConversionTarget createForFormat(
- String tableFormat, PerTableConfig perTableConfig, Configuration
configuration) {
- ConversionTarget conversionTarget =
createConversionTargetForName(tableFormat);
+ public ConversionTarget createForFormat(TargetTable targetTable,
Configuration configuration) {
+ ConversionTarget conversionTarget =
createConversionTargetForName(targetTable.getFormatName());
- conversionTarget.init(perTableConfig, configuration);
+ conversionTarget.init(targetTable, configuration);
return conversionTarget;
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/conversion/PerTableConfigImpl.java
b/xtable-core/src/main/java/org/apache/xtable/conversion/PerTableConfigImpl.java
deleted file mode 100644
index f34402b1..00000000
---
a/xtable-core/src/main/java/org/apache/xtable/conversion/PerTableConfigImpl.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.xtable.conversion;
-
-import java.util.List;
-
-import javax.annotation.Nonnull;
-
-import lombok.Builder;
-import lombok.NonNull;
-import lombok.Value;
-
-import org.apache.hadoop.fs.Path;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.xtable.hudi.HudiSourceConfigImpl;
-import org.apache.xtable.iceberg.IcebergCatalogConfig;
-import org.apache.xtable.model.sync.SyncMode;
-
-/** Represents input configuration to the sync process. */
-@Value
-public class PerTableConfigImpl implements PerTableConfig {
- /** table base path in local file system or HDFS or object stores like S3,
GCS etc. */
- @Nonnull String tableBasePath;
- /** the base path for the data folder, defaults to the tableBasePath if not
specified */
- @Nonnull String tableDataPath;
-
- /** The name of the table */
- @Nonnull String tableName;
-
- /** The namespace of the table (optional) */
- String[] namespace;
-
- /**
- * HudiSourceConfig is a config that allows us to infer partition values for
hoodie source tables.
- * If the table is not partitioned, leave it blank. If it is partitioned,
you can specify a spec
- * with a comma separated list with format path:type:format.
- *
- * <p><ui>
- * <li>partitionSpecExtractorClass: class to extract partition fields from
the given
- * spec.ConfigurationBasedPartitionSpecExtractor is the default class
- * <li>partitionFieldSpecConfig: path:type:format spec to infer partition
values </ui>
- *
- * <ul>
- * <li>path: is a dot separated path to the partition field
- * <li>type: describes how the partition value was generated from the
column value
- * <ul>
- * <li>VALUE: an identity transform of field value to partition
value
- * <li>YEAR: data is partitioned by a field representing a date
and year granularity
- * is used
- * <li>MONTH: same as YEAR but with month granularity
- * <li>DAY: same as YEAR but with day granularity
- * <li>HOUR: same as YEAR but with hour granularity
- * </ul>
- * <li>format: if your partition type is YEAR, MONTH, DAY, or HOUR
specify the format for
- * the date string as it appears in your file paths
- * </ul>
- */
- @Nonnull HudiSourceConfigImpl hudiSourceConfig;
-
- /** List of table formats to sync. */
- @Nonnull List<String> targetTableFormats;
-
- /** Configuration options for integrating with an existing Iceberg Catalog
(optional) */
- IcebergCatalogConfig icebergCatalogConfig;
-
- /**
- * Mode of a sync. FULL is only supported right now.
- *
- * <ul>
- * <li>FULL: Full sync will create a checkpoint of ALL the files relevant
at a certain point in
- * time
- * <li>INCREMENTAL: Incremental will sync differential structures to bring
the table state from
- * and to points in the timeline
- * </ul>
- */
- @Nonnull SyncMode syncMode;
-
- /**
- * The retention for metadata or versions of the table in the target systems
to bound the size of
- * any metadata tracked in the target system. Specified in hours.
- */
- int targetMetadataRetentionInHours;
-
- @Builder
- PerTableConfigImpl(
- @NonNull String tableBasePath,
- String tableDataPath,
- @NonNull String tableName,
- String[] namespace,
- HudiSourceConfigImpl hudiSourceConfig,
- @NonNull List<String> targetTableFormats,
- IcebergCatalogConfig icebergCatalogConfig,
- SyncMode syncMode,
- Integer targetMetadataRetentionInHours) {
- // sanitize source path
- this.tableBasePath = sanitizeBasePath(tableBasePath);
- this.tableDataPath = tableDataPath == null ? tableBasePath :
sanitizeBasePath(tableDataPath);
- this.tableName = tableName;
- this.namespace = namespace;
- this.hudiSourceConfig =
- hudiSourceConfig == null ? HudiSourceConfigImpl.builder().build() :
hudiSourceConfig;
- Preconditions.checkArgument(
- targetTableFormats.size() > 0, "Please provide at-least one format to
sync");
- this.targetTableFormats = targetTableFormats;
- this.icebergCatalogConfig = icebergCatalogConfig;
- this.syncMode = syncMode == null ? SyncMode.INCREMENTAL : syncMode;
- this.targetMetadataRetentionInHours =
- targetMetadataRetentionInHours == null ? 24 * 7 :
targetMetadataRetentionInHours;
- }
-
- private String sanitizeBasePath(String tableBasePath) {
- Path path = new Path(tableBasePath);
- Preconditions.checkArgument(path.isAbsolute(), "Table base path must be
absolute");
- if (path.isAbsoluteAndSchemeAuthorityNull()) {
- // assume this is local file system and append scheme
- return "file://" + path;
- } else if (path.toUri().getScheme().equals("file")) {
- // add extra slashes
- return "file://" + path.toUri().getPath();
- } else {
- return path.toString();
- }
- }
-}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSourceProvider.java
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSourceProvider.java
index f42425db..045e2b72 100644
---
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSourceProvider.java
+++
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSourceProvider.java
@@ -23,18 +23,18 @@ import org.apache.spark.sql.SparkSession;
import io.delta.tables.DeltaTable;
import org.apache.xtable.conversion.ConversionSourceProvider;
-import org.apache.xtable.conversion.PerTableConfig;
+import org.apache.xtable.conversion.SourceTable;
/** A concrete implementation of {@link ConversionSourceProvider} for Delta
Lake table format. */
public class DeltaConversionSourceProvider extends
ConversionSourceProvider<Long> {
@Override
- public DeltaConversionSource getConversionSourceInstance(PerTableConfig
perTableConfig) {
+ public DeltaConversionSource getConversionSourceInstance(SourceTable
sourceTable) {
SparkSession sparkSession =
DeltaConversionUtils.buildSparkSession(hadoopConf);
- DeltaTable deltaTable = DeltaTable.forPath(sparkSession,
perTableConfig.getTableBasePath());
+ DeltaTable deltaTable = DeltaTable.forPath(sparkSession,
sourceTable.getBasePath());
return DeltaConversionSource.builder()
.sparkSession(sparkSession)
- .tableName(perTableConfig.getTableName())
- .basePath(perTableConfig.getTableBasePath())
+ .tableName(sourceTable.getName())
+ .basePath(sourceTable.getBasePath())
.deltaTable(deltaTable)
.deltaLog(deltaTable.deltaLog())
.build();
diff --git
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
index 120ee0e0..b34fa449 100644
---
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
+++
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
@@ -54,7 +54,7 @@ import scala.collection.Seq;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.xtable.conversion.PerTableConfig;
+import org.apache.xtable.conversion.TargetTable;
import org.apache.xtable.exception.NotSupportedException;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.metadata.TableSyncMetadata;
@@ -76,16 +76,16 @@ public class DeltaConversionTarget implements
ConversionTarget {
private DeltaDataFileUpdatesExtractor dataFileUpdatesExtractor;
private String tableName;
- private int logRetentionInHours;
+ private long logRetentionInHours;
private TransactionState transactionState;
public DeltaConversionTarget() {}
- public DeltaConversionTarget(PerTableConfig perTableConfig, SparkSession
sparkSession) {
+ public DeltaConversionTarget(TargetTable targetTable, SparkSession
sparkSession) {
this(
- perTableConfig.getTableDataPath(),
- perTableConfig.getTableName(),
- perTableConfig.getTargetMetadataRetentionInHours(),
+ targetTable.getBasePath(),
+ targetTable.getName(),
+ targetTable.getMetadataRetention().toHours(),
sparkSession,
DeltaSchemaExtractor.getInstance(),
DeltaPartitionExtractor.getInstance(),
@@ -96,7 +96,7 @@ public class DeltaConversionTarget implements
ConversionTarget {
DeltaConversionTarget(
String tableDataPath,
String tableName,
- int logRetentionInHours,
+ long logRetentionInHours,
SparkSession sparkSession,
DeltaSchemaExtractor schemaExtractor,
DeltaPartitionExtractor partitionExtractor,
@@ -115,7 +115,7 @@ public class DeltaConversionTarget implements
ConversionTarget {
private void _init(
String tableDataPath,
String tableName,
- int logRetentionInHours,
+ long logRetentionInHours,
SparkSession sparkSession,
DeltaSchemaExtractor schemaExtractor,
DeltaPartitionExtractor partitionExtractor,
@@ -134,13 +134,13 @@ public class DeltaConversionTarget implements
ConversionTarget {
}
@Override
- public void init(PerTableConfig perTableConfig, Configuration configuration)
{
+ public void init(TargetTable targetTable, Configuration configuration) {
SparkSession sparkSession =
DeltaConversionUtils.buildSparkSession(configuration);
_init(
- perTableConfig.getTableDataPath(),
- perTableConfig.getTableName(),
- perTableConfig.getTargetMetadataRetentionInHours(),
+ targetTable.getBasePath(),
+ targetTable.getName(),
+ targetTable.getMetadataRetention().toHours(),
sparkSession,
DeltaSchemaExtractor.getInstance(),
DeltaPartitionExtractor.getInstance(),
@@ -221,7 +221,7 @@ public class DeltaConversionTarget implements
ConversionTarget {
private final OptimisticTransaction transaction;
private final Instant commitTime;
private final DeltaLog deltaLog;
- private final int retentionInHours;
+ private final long retentionInHours;
@Getter private final List<String> partitionColumns;
private final String tableName;
@Getter private StructType latestSchema;
@@ -230,7 +230,7 @@ public class DeltaConversionTarget implements
ConversionTarget {
@Setter private Seq<Action> actions;
private TransactionState(
- DeltaLog deltaLog, String tableName, Instant latestCommitTime, int
retentionInHours) {
+ DeltaLog deltaLog, String tableName, Instant latestCommitTime, long
retentionInHours) {
this.deltaLog = deltaLog;
this.transaction = deltaLog.startTransaction();
this.latestSchema = deltaLog.snapshot().schema();
diff --git
a/xtable-core/src/main/java/org/apache/xtable/hudi/ConfigurationBasedPartitionSpecExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/hudi/ConfigurationBasedPartitionSpecExtractor.java
index e3659f19..7bc41a10 100644
---
a/xtable-core/src/main/java/org/apache/xtable/hudi/ConfigurationBasedPartitionSpecExtractor.java
+++
b/xtable-core/src/main/java/org/apache/xtable/hudi/ConfigurationBasedPartitionSpecExtractor.java
@@ -36,13 +36,13 @@ import org.apache.xtable.schema.SchemaFieldFinder;
*/
@AllArgsConstructor
public class ConfigurationBasedPartitionSpecExtractor implements
HudiSourcePartitionSpecExtractor {
- private final HudiSourceConfigImpl config;
+ private final HudiSourceConfig config;
@Override
public List<InternalPartitionField> spec(InternalSchema tableSchema) {
List<InternalPartitionField> partitionFields =
new ArrayList<>(config.getPartitionFieldSpecs().size());
- for (HudiSourceConfigImpl.PartitionFieldSpec fieldSpec :
config.getPartitionFieldSpecs()) {
+ for (HudiSourceConfig.PartitionFieldSpec fieldSpec :
config.getPartitionFieldSpecs()) {
InternalField sourceField =
SchemaFieldFinder.getInstance()
.findFieldByPath(tableSchema, fieldSpec.getSourceFieldPath());
diff --git
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java
index f6e5e28d..0ddbbcb7 100644
---
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java
+++
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java
@@ -25,19 +25,18 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.xtable.conversion.ConversionSourceProvider;
-import org.apache.xtable.conversion.PerTableConfig;
+import org.apache.xtable.conversion.SourceTable;
/** A concrete implementation of {@link ConversionSourceProvider} for Hudi
table format. */
@Log4j2
public class HudiConversionSourceProvider extends
ConversionSourceProvider<HoodieInstant> {
@Override
- public HudiConversionSource getConversionSourceInstance(PerTableConfig
sourceTableConfig) {
- this.sourceTableConfig = sourceTableConfig;
+ public HudiConversionSource getConversionSourceInstance(SourceTable
sourceTable) {
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder()
.setConf(hadoopConf)
- .setBasePath(this.sourceTableConfig.getTableBasePath())
+ .setBasePath(sourceTable.getBasePath())
.setLoadActiveTimelineOnLoad(true)
.build();
if
(!metaClient.getTableConfig().getTableType().equals(HoodieTableType.COPY_ON_WRITE))
{
@@ -45,8 +44,8 @@ public class HudiConversionSourceProvider extends
ConversionSourceProvider<Hoodi
}
final HudiSourcePartitionSpecExtractor sourcePartitionSpecExtractor =
- (HudiSourcePartitionSpecExtractor)
-
sourceTableConfig.getHudiSourceConfig().loadSourcePartitionSpecExtractor();
+ HudiSourceConfig.fromProperties(sourceTable.getAdditionalProperties())
+ .loadSourcePartitionSpecExtractor();
return new HudiConversionSource(metaClient, sourcePartitionSpecExtractor);
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java
index b1a0bc91..c3ef6f92 100644
--- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java
+++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java
@@ -78,7 +78,7 @@ import org.apache.hudi.table.action.clean.CleanPlanner;
import com.google.common.annotations.VisibleForTesting;
import org.apache.xtable.avro.AvroSchemaConverter;
-import org.apache.xtable.conversion.PerTableConfig;
+import org.apache.xtable.conversion.TargetTable;
import org.apache.xtable.exception.NotSupportedException;
import org.apache.xtable.exception.ReadException;
import org.apache.xtable.exception.UpdateException;
@@ -108,16 +108,15 @@ public class HudiConversionTarget implements
ConversionTarget {
@VisibleForTesting
HudiConversionTarget(
- PerTableConfig perTableConfig,
+ TargetTable targetTable,
Configuration configuration,
int maxNumDeltaCommitsBeforeCompaction) {
this(
- perTableConfig.getTableDataPath(),
- perTableConfig.getTargetMetadataRetentionInHours(),
+ targetTable.getBasePath(),
+ (int) targetTable.getMetadataRetention().toHours(),
maxNumDeltaCommitsBeforeCompaction,
BaseFileUpdatesExtractor.of(
- new HoodieJavaEngineContext(configuration),
- new CachingPath(perTableConfig.getTableDataPath())),
+ new HoodieJavaEngineContext(configuration), new
CachingPath(targetTable.getBasePath())),
AvroSchemaConverter.getInstance(),
HudiTableManager.of(configuration),
CommitState::new);
@@ -163,14 +162,13 @@ public class HudiConversionTarget implements
ConversionTarget {
}
@Override
- public void init(PerTableConfig perTableConfig, Configuration configuration)
{
+ public void init(TargetTable targetTable, Configuration configuration) {
_init(
- perTableConfig.getTableDataPath(),
- perTableConfig.getTargetMetadataRetentionInHours(),
+ targetTable.getBasePath(),
+ (int) targetTable.getMetadataRetention().toHours(),
HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.defaultValue(),
BaseFileUpdatesExtractor.of(
- new HoodieJavaEngineContext(configuration),
- new CachingPath(perTableConfig.getTableDataPath())),
+ new HoodieJavaEngineContext(configuration), new
CachingPath(targetTable.getBasePath())),
AvroSchemaConverter.getInstance(),
HudiTableManager.of(configuration),
CommitState::new);
diff --git
a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfigImpl.java
b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java
similarity index 68%
rename from
xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfigImpl.java
rename to xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java
index 0fa8b529..606b9281 100644
--- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfigImpl.java
+++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java
@@ -22,29 +22,41 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
+import java.util.Properties;
-import lombok.Builder;
import lombok.Value;
import com.google.common.base.Preconditions;
-import org.apache.xtable.conversion.HudiSourceConfig;
import org.apache.xtable.model.schema.PartitionTransformType;
import org.apache.xtable.reflection.ReflectionUtils;
/** Configuration of Hudi source format for the sync process. */
@Value
-public class HudiSourceConfigImpl implements HudiSourceConfig {
+public class HudiSourceConfig {
+ public static final String PARTITION_SPEC_EXTRACTOR_CLASS =
+ "xtable.hudi.source.partition_spec_extractor_class";
+ public static final String PARTITION_FIELD_SPEC_CONFIG =
+ "xtable.hudi.source.partition_field_spec_config";
+
String partitionSpecExtractorClass;
List<PartitionFieldSpec> partitionFieldSpecs;
- @Builder
- public HudiSourceConfigImpl(String partitionSpecExtractorClass, String
partitionFieldSpecConfig) {
- this.partitionSpecExtractorClass =
- partitionSpecExtractorClass == null
- ? ConfigurationBasedPartitionSpecExtractor.class.getName()
- : partitionSpecExtractorClass;
- this.partitionFieldSpecs =
parsePartitionFieldSpecs(partitionFieldSpecConfig);
+ public static HudiSourceConfig fromPartitionFieldSpecConfig(String
partitionFieldSpecConfig) {
+ return new HudiSourceConfig(
+ ConfigurationBasedPartitionSpecExtractor.class.getName(),
+ parsePartitionFieldSpecs(partitionFieldSpecConfig));
+ }
+
+ public static HudiSourceConfig fromProperties(Properties properties) {
+ String partitionSpecExtractorClass =
+ properties.getProperty(
+ PARTITION_SPEC_EXTRACTOR_CLASS,
+ ConfigurationBasedPartitionSpecExtractor.class.getName());
+ String partitionFieldSpecString =
properties.getProperty(PARTITION_FIELD_SPEC_CONFIG);
+ List<PartitionFieldSpec> partitionFieldSpecs =
+ parsePartitionFieldSpecs(partitionFieldSpecString);
+ return new HudiSourceConfig(partitionSpecExtractorClass,
partitionFieldSpecs);
}
@Value
diff --git
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
index 7b26ee86..f96ec714 100644
---
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
+++
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java
@@ -20,23 +20,35 @@ package org.apache.xtable.iceberg;
import java.io.IOException;
import java.time.Instant;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
-import lombok.*;
+import lombok.AccessLevel;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.NonNull;
import lombok.extern.log4j.Log4j2;
import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.*;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
-import org.apache.xtable.conversion.PerTableConfig;
+import org.apache.xtable.conversion.SourceTable;
import org.apache.xtable.exception.ReadException;
import org.apache.xtable.model.CommitsBacklog;
import org.apache.xtable.model.InstantsForIncrementalSync;
@@ -46,15 +58,18 @@ import org.apache.xtable.model.TableChange;
import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.stat.PartitionValue;
-import org.apache.xtable.model.storage.*;
+import org.apache.xtable.model.storage.DataFilesDiff;
+import org.apache.xtable.model.storage.DataLayoutStrategy;
import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.PartitionFileGroup;
+import org.apache.xtable.model.storage.TableFormat;
import org.apache.xtable.spi.extractor.ConversionSource;
@Log4j2
@Builder
public class IcebergConversionSource implements ConversionSource<Snapshot> {
@NonNull private final Configuration hadoopConf;
- @NonNull private final PerTableConfig sourceTableConfig;
+ @NonNull private final SourceTable sourceTableConfig;
@Getter(lazy = true, value = AccessLevel.PACKAGE)
private final Table sourceTable = initSourceTable();
@@ -73,15 +88,15 @@ public class IcebergConversionSource implements
ConversionSource<Snapshot> {
private Table initSourceTable() {
IcebergTableManager tableManager = IcebergTableManager.of(hadoopConf);
String[] namespace = sourceTableConfig.getNamespace();
- String tableName = sourceTableConfig.getTableName();
+ String tableName = sourceTableConfig.getName();
TableIdentifier tableIdentifier =
namespace == null
? TableIdentifier.of(tableName)
: TableIdentifier.of(Namespace.of(namespace), tableName);
return tableManager.getTable(
- (IcebergCatalogConfig) sourceTableConfig.getIcebergCatalogConfig(),
+ (IcebergCatalogConfig) sourceTableConfig.getCatalogConfig(),
tableIdentifier,
- sourceTableConfig.getTableBasePath());
+ sourceTableConfig.getBasePath());
}
private FileIO initTableOps() {
diff --git
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSourceProvider.java
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSourceProvider.java
index 9a2bb840..449ebe5d 100644
---
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSourceProvider.java
+++
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSourceProvider.java
@@ -21,12 +21,12 @@ package org.apache.xtable.iceberg;
import org.apache.iceberg.Snapshot;
import org.apache.xtable.conversion.ConversionSourceProvider;
-import org.apache.xtable.conversion.PerTableConfig;
+import org.apache.xtable.conversion.SourceTable;
/** A concrete implementation of {@link ConversionSourceProvider} for Hudi
table format. */
public class IcebergConversionSourceProvider extends
ConversionSourceProvider<Snapshot> {
@Override
- public IcebergConversionSource getConversionSourceInstance(PerTableConfig
sourceTableConfig) {
+ public IcebergConversionSource getConversionSourceInstance(SourceTable
sourceTableConfig) {
return IcebergConversionSource.builder()
.sourceTableConfig(sourceTableConfig)
.hadoopConf(hadoopConf)
diff --git
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
index 72125cb0..ecdbfa26 100644
---
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
+++
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
@@ -39,7 +39,7 @@ import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NotFoundException;
-import org.apache.xtable.conversion.PerTableConfig;
+import org.apache.xtable.conversion.TargetTable;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.metadata.TableSyncMetadata;
import org.apache.xtable.model.schema.InternalPartitionField;
@@ -70,7 +70,7 @@ public class IcebergConversionTarget implements
ConversionTarget {
public IcebergConversionTarget() {}
IcebergConversionTarget(
- PerTableConfig perTableConfig,
+ TargetTable targetTable,
Configuration configuration,
IcebergSchemaExtractor schemaExtractor,
IcebergSchemaSync schemaSync,
@@ -79,7 +79,7 @@ public class IcebergConversionTarget implements
ConversionTarget {
IcebergDataFileUpdatesSync dataFileUpdatesExtractor,
IcebergTableManager tableManager) {
_init(
- perTableConfig,
+ targetTable,
configuration,
schemaExtractor,
schemaSync,
@@ -90,7 +90,7 @@ public class IcebergConversionTarget implements
ConversionTarget {
}
private void _init(
- PerTableConfig perTableConfig,
+ TargetTable targetTable,
Configuration configuration,
IcebergSchemaExtractor schemaExtractor,
IcebergSchemaSync schemaSync,
@@ -103,17 +103,17 @@ public class IcebergConversionTarget implements
ConversionTarget {
this.partitionSpecExtractor = partitionSpecExtractor;
this.partitionSpecSync = partitionSpecSync;
this.dataFileUpdatesExtractor = dataFileUpdatesExtractor;
- String tableName = perTableConfig.getTableName();
- this.basePath = perTableConfig.getTableBasePath();
+ String tableName = targetTable.getName();
+ this.basePath = targetTable.getBasePath();
this.configuration = configuration;
- this.snapshotRetentionInHours =
perTableConfig.getTargetMetadataRetentionInHours();
- String[] namespace = perTableConfig.getNamespace();
+ this.snapshotRetentionInHours = (int)
targetTable.getMetadataRetention().toHours();
+ String[] namespace = targetTable.getNamespace();
this.tableIdentifier =
namespace == null
? TableIdentifier.of(tableName)
: TableIdentifier.of(Namespace.of(namespace), tableName);
this.tableManager = tableManager;
- this.catalogConfig = (IcebergCatalogConfig)
perTableConfig.getIcebergCatalogConfig();
+ this.catalogConfig = (IcebergCatalogConfig) targetTable.getCatalogConfig();
if (tableManager.tableExists(catalogConfig, tableIdentifier, basePath)) {
// Load the table state if it already exists
@@ -124,9 +124,9 @@ public class IcebergConversionTarget implements
ConversionTarget {
}
@Override
- public void init(PerTableConfig perTableConfig, Configuration configuration)
{
+ public void init(TargetTable targetTable, Configuration configuration) {
_init(
- perTableConfig,
+ targetTable,
configuration,
IcebergSchemaExtractor.getInstance(),
IcebergSchemaSync.getInstance(),
diff --git
a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
index 44338494..58f0f982 100644
--- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
+++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
@@ -19,6 +19,7 @@
package org.apache.xtable;
import static org.apache.xtable.GenericTable.getTableName;
+import static
org.apache.xtable.hudi.HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG;
import static org.apache.xtable.hudi.HudiTestUtil.PartitionConfig;
import static org.apache.xtable.model.storage.TableFormat.DELTA;
import static org.apache.xtable.model.storage.TableFormat.HUDI;
@@ -30,6 +31,7 @@ import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
@@ -41,6 +43,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Properties;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -81,13 +84,13 @@ import org.apache.spark.sql.delta.DeltaLog;
import com.google.common.collect.ImmutableList;
+import org.apache.xtable.conversion.ConversionConfig;
import org.apache.xtable.conversion.ConversionController;
import org.apache.xtable.conversion.ConversionSourceProvider;
-import org.apache.xtable.conversion.PerTableConfig;
-import org.apache.xtable.conversion.PerTableConfigImpl;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.conversion.TargetTable;
import org.apache.xtable.delta.DeltaConversionSourceProvider;
import org.apache.xtable.hudi.HudiConversionSourceProvider;
-import org.apache.xtable.hudi.HudiSourceConfigImpl;
import org.apache.xtable.hudi.HudiTestUtil;
import org.apache.xtable.iceberg.IcebergConversionSourceProvider;
import org.apache.xtable.model.storage.TableFormat;
@@ -147,17 +150,17 @@ public class ITConversionController {
if (sourceTableFormat.equalsIgnoreCase(HUDI)) {
ConversionSourceProvider<HoodieInstant> hudiConversionSourceProvider =
new HudiConversionSourceProvider();
- hudiConversionSourceProvider.init(jsc.hadoopConfiguration(),
Collections.emptyMap());
+ hudiConversionSourceProvider.init(jsc.hadoopConfiguration());
return hudiConversionSourceProvider;
} else if (sourceTableFormat.equalsIgnoreCase(DELTA)) {
ConversionSourceProvider<Long> deltaConversionSourceProvider =
new DeltaConversionSourceProvider();
- deltaConversionSourceProvider.init(jsc.hadoopConfiguration(),
Collections.emptyMap());
+ deltaConversionSourceProvider.init(jsc.hadoopConfiguration());
return deltaConversionSourceProvider;
} else if (sourceTableFormat.equalsIgnoreCase(ICEBERG)) {
ConversionSourceProvider<Snapshot> icebergConversionSourceProvider =
new IcebergConversionSourceProvider();
- icebergConversionSourceProvider.init(jsc.hadoopConfiguration(),
Collections.emptyMap());
+ icebergConversionSourceProvider.init(jsc.hadoopConfiguration());
return icebergConversionSourceProvider;
} else {
throw new IllegalArgumentException("Unsupported source format: " +
sourceTableFormat);
@@ -193,27 +196,26 @@ public class ITConversionController {
tableName, tempDir, sparkSession, jsc, sourceTableFormat,
isPartitioned)) {
insertRecords = table.insertRows(100);
- PerTableConfig perTableConfig =
- PerTableConfigImpl.builder()
- .tableName(tableName)
- .targetTableFormats(targetTableFormats)
- .tableBasePath(table.getBasePath())
- .tableDataPath(table.getDataPath())
- .hudiSourceConfig(
-
HudiSourceConfigImpl.builder().partitionFieldSpecConfig(partitionConfig).build())
- .syncMode(syncMode)
- .build();
- conversionController.sync(perTableConfig, conversionSourceProvider);
+ ConversionConfig conversionConfig =
+ getTableSyncConfig(
+ sourceTableFormat,
+ syncMode,
+ tableName,
+ table,
+ targetTableFormats,
+ partitionConfig,
+ null);
+ conversionController.sync(conversionConfig, conversionSourceProvider);
checkDatasetEquivalence(sourceTableFormat, table, targetTableFormats,
100);
// make multiple commits and then sync
table.insertRows(100);
table.upsertRows(insertRecords.subList(0, 20));
- conversionController.sync(perTableConfig, conversionSourceProvider);
+ conversionController.sync(conversionConfig, conversionSourceProvider);
checkDatasetEquivalence(sourceTableFormat, table, targetTableFormats,
200);
table.deleteRows(insertRecords.subList(30, 50));
- conversionController.sync(perTableConfig, conversionSourceProvider);
+ conversionController.sync(conversionConfig, conversionSourceProvider);
checkDatasetEquivalence(sourceTableFormat, table, targetTableFormats,
180);
checkDatasetEquivalenceWithFilter(
sourceTableFormat, table, targetTableFormats,
table.getFilterQuery());
@@ -222,39 +224,38 @@ public class ITConversionController {
try (GenericTable tableWithUpdatedSchema =
GenericTable.getInstanceWithAdditionalColumns(
tableName, tempDir, sparkSession, jsc, sourceTableFormat,
isPartitioned)) {
- PerTableConfig perTableConfig =
- PerTableConfigImpl.builder()
- .tableName(tableName)
- .targetTableFormats(targetTableFormats)
- .tableBasePath(tableWithUpdatedSchema.getBasePath())
- .tableDataPath(tableWithUpdatedSchema.getDataPath())
- .hudiSourceConfig(
-
HudiSourceConfigImpl.builder().partitionFieldSpecConfig(partitionConfig).build())
- .syncMode(syncMode)
- .build();
+ ConversionConfig conversionConfig =
+ getTableSyncConfig(
+ sourceTableFormat,
+ syncMode,
+ tableName,
+ tableWithUpdatedSchema,
+ targetTableFormats,
+ partitionConfig,
+ null);
List<Row> insertsAfterSchemaUpdate =
tableWithUpdatedSchema.insertRows(100);
tableWithUpdatedSchema.reload();
- conversionController.sync(perTableConfig, conversionSourceProvider);
+ conversionController.sync(conversionConfig, conversionSourceProvider);
checkDatasetEquivalence(sourceTableFormat, tableWithUpdatedSchema,
targetTableFormats, 280);
tableWithUpdatedSchema.deleteRows(insertsAfterSchemaUpdate.subList(60,
90));
- conversionController.sync(perTableConfig, conversionSourceProvider);
+ conversionController.sync(conversionConfig, conversionSourceProvider);
checkDatasetEquivalence(sourceTableFormat, tableWithUpdatedSchema,
targetTableFormats, 250);
if (isPartitioned) {
// Adds new partition.
tableWithUpdatedSchema.insertRecordsForSpecialPartition(50);
- conversionController.sync(perTableConfig, conversionSourceProvider);
+ conversionController.sync(conversionConfig, conversionSourceProvider);
checkDatasetEquivalence(sourceTableFormat, tableWithUpdatedSchema,
targetTableFormats, 300);
// Drops partition.
tableWithUpdatedSchema.deleteSpecialPartition();
- conversionController.sync(perTableConfig, conversionSourceProvider);
+ conversionController.sync(conversionConfig, conversionSourceProvider);
checkDatasetEquivalence(sourceTableFormat, tableWithUpdatedSchema,
targetTableFormats, 250);
// Insert records to the dropped partition again.
tableWithUpdatedSchema.insertRecordsForSpecialPartition(50);
- conversionController.sync(perTableConfig, conversionSourceProvider);
+ conversionController.sync(conversionConfig, conversionSourceProvider);
checkDatasetEquivalence(sourceTableFormat, tableWithUpdatedSchema,
targetTableFormats, 300);
}
}
@@ -279,24 +280,22 @@ public class ITConversionController {
String commitInstant2 = table.startCommit();
table.insertRecordsWithCommitAlreadyStarted(insertsForCommit2,
commitInstant2, true);
- PerTableConfig perTableConfig =
- PerTableConfigImpl.builder()
- .tableName(tableName)
- .targetTableFormats(targetTableFormats)
- .tableBasePath(table.getBasePath())
- .hudiSourceConfig(
- HudiSourceConfigImpl.builder()
-
.partitionFieldSpecConfig(partitionConfig.getXTableConfig())
- .build())
- .syncMode(syncMode)
- .build();
+ ConversionConfig conversionConfig =
+ getTableSyncConfig(
+ HUDI,
+ syncMode,
+ tableName,
+ table,
+ targetTableFormats,
+ partitionConfig.getXTableConfig(),
+ null);
ConversionController conversionController =
new ConversionController(jsc.hadoopConfiguration());
- conversionController.sync(perTableConfig, conversionSourceProvider);
+ conversionController.sync(conversionConfig, conversionSourceProvider);
checkDatasetEquivalence(HUDI, table, targetTableFormats, 50);
table.insertRecordsWithCommitAlreadyStarted(insertsForCommit1,
commitInstant1, true);
- conversionController.sync(perTableConfig, conversionSourceProvider);
+ conversionController.sync(conversionConfig, conversionSourceProvider);
checkDatasetEquivalence(HUDI, table, targetTableFormats, 100);
}
}
@@ -314,20 +313,18 @@ public class ITConversionController {
tableName, tempDir, jsc, partitionConfig.getHudiConfig(),
tableType)) {
List<HoodieRecord<HoodieAvroPayload>> insertedRecords1 =
table.insertRecords(50, true);
- PerTableConfig perTableConfig =
- PerTableConfigImpl.builder()
- .tableName(tableName)
- .targetTableFormats(targetTableFormats)
- .tableBasePath(table.getBasePath())
- .hudiSourceConfig(
- HudiSourceConfigImpl.builder()
-
.partitionFieldSpecConfig(partitionConfig.getXTableConfig())
- .build())
- .syncMode(syncMode)
- .build();
+ ConversionConfig conversionConfig =
+ getTableSyncConfig(
+ HUDI,
+ syncMode,
+ tableName,
+ table,
+ targetTableFormats,
+ partitionConfig.getXTableConfig(),
+ null);
ConversionController conversionController =
new ConversionController(jsc.hadoopConfiguration());
- conversionController.sync(perTableConfig, conversionSourceProvider);
+ conversionController.sync(conversionConfig, conversionSourceProvider);
checkDatasetEquivalence(HUDI, table, targetTableFormats, 50);
table.deleteRecords(insertedRecords1.subList(0, 20), true);
@@ -335,7 +332,7 @@ public class ITConversionController {
String scheduledCompactionInstant = table.onlyScheduleCompaction();
table.insertRecords(50, true);
- conversionController.sync(perTableConfig, conversionSourceProvider);
+ conversionController.sync(conversionConfig, conversionSourceProvider);
Map<String, String> sourceHudiOptions =
Collections.singletonMap("hoodie.datasource.query.type",
"read_optimized");
// Because compaction is not completed yet and read optimized query,
there are 100 records.
@@ -343,13 +340,13 @@ public class ITConversionController {
HUDI, table, sourceHudiOptions, targetTableFormats,
Collections.emptyMap(), 100);
table.insertRecords(50, true);
- conversionController.sync(perTableConfig, conversionSourceProvider);
+ conversionController.sync(conversionConfig, conversionSourceProvider);
// Because compaction is not completed yet and read optimized query,
there are 150 records.
checkDatasetEquivalence(
HUDI, table, sourceHudiOptions, targetTableFormats,
Collections.emptyMap(), 150);
table.completeScheduledCompaction(scheduledCompactionInstant);
- conversionController.sync(perTableConfig, conversionSourceProvider);
+ conversionController.sync(conversionConfig, conversionSourceProvider);
checkDatasetEquivalence(HUDI, table, targetTableFormats, 130);
}
}
@@ -362,31 +359,32 @@ public class ITConversionController {
GenericTable.getInstance(tableName, tempDir, sparkSession, jsc,
sourceTableFormat, false)) {
table.insertRows(50);
List<String> targetTableFormats = getOtherFormats(sourceTableFormat);
- PerTableConfig perTableConfig =
- PerTableConfigImpl.builder()
- .tableName(tableName)
- .targetTableFormats(targetTableFormats)
- .tableBasePath(table.getBasePath())
- .tableDataPath(table.getDataPath())
- .syncMode(SyncMode.INCREMENTAL)
- .build();
+ ConversionConfig conversionConfig =
+ getTableSyncConfig(
+ sourceTableFormat,
+ SyncMode.INCREMENTAL,
+ tableName,
+ table,
+ targetTableFormats,
+ null,
+ null);
ConversionSourceProvider<?> conversionSourceProvider =
getConversionSourceProvider(sourceTableFormat);
ConversionController conversionController =
new ConversionController(jsc.hadoopConfiguration());
- conversionController.sync(perTableConfig, conversionSourceProvider);
+ conversionController.sync(conversionConfig, conversionSourceProvider);
Instant instantAfterFirstSync = Instant.now();
// sleep before starting the next commit to avoid any rounding issues
Thread.sleep(1000);
table.insertRows(50);
- conversionController.sync(perTableConfig, conversionSourceProvider);
+ conversionController.sync(conversionConfig, conversionSourceProvider);
Instant instantAfterSecondSync = Instant.now();
// sleep before starting the next commit to avoid any rounding issues
Thread.sleep(1000);
table.insertRows(50);
- conversionController.sync(perTableConfig, conversionSourceProvider);
+ conversionController.sync(conversionConfig, conversionSourceProvider);
checkDatasetEquivalence(
sourceTableFormat,
@@ -485,25 +483,22 @@ public class ITConversionController {
GenericTable.getInstance(tableName, tempDir, sparkSession, jsc,
sourceTableFormat, true);
}
try (GenericTable tableToClose = table) {
- PerTableConfig perTableConfig =
- PerTableConfigImpl.builder()
- .tableName(tableName)
- .targetTableFormats(targetTableFormats)
- .tableBasePath(tableToClose.getBasePath())
- .tableDataPath(tableToClose.getDataPath())
- .hudiSourceConfig(
- HudiSourceConfigImpl.builder()
- .partitionFieldSpecConfig(xTablePartitionConfig)
- .build())
- .syncMode(SyncMode.INCREMENTAL)
- .build();
+ ConversionConfig conversionConfig =
+ getTableSyncConfig(
+ sourceTableFormat,
+ SyncMode.INCREMENTAL,
+ tableName,
+ table,
+ targetTableFormats,
+ xTablePartitionConfig,
+ null);
tableToClose.insertRows(100);
ConversionController conversionController =
new ConversionController(jsc.hadoopConfiguration());
- conversionController.sync(perTableConfig, conversionSourceProvider);
+ conversionController.sync(conversionConfig, conversionSourceProvider);
// Do a second sync to force the test to read back the metadata it wrote
earlier
tableToClose.insertRows(100);
- conversionController.sync(perTableConfig, conversionSourceProvider);
+ conversionController.sync(conversionConfig, conversionSourceProvider);
checkDatasetEquivalenceWithFilter(
sourceTableFormat, tableToClose, targetTableFormats, filter);
@@ -520,33 +515,23 @@ public class ITConversionController {
tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) {
table.insertRecords(100, true);
- PerTableConfig perTableConfigIceberg =
- PerTableConfigImpl.builder()
- .tableName(tableName)
- .targetTableFormats(ImmutableList.of(ICEBERG))
- .tableBasePath(table.getBasePath())
- .syncMode(syncMode)
- .build();
-
- PerTableConfig perTableConfigDelta =
- PerTableConfigImpl.builder()
- .tableName(tableName)
- .targetTableFormats(ImmutableList.of(DELTA))
- .tableBasePath(table.getBasePath())
- .syncMode(syncMode)
- .build();
+ ConversionConfig conversionConfigIceberg =
+ getTableSyncConfig(
+ HUDI, syncMode, tableName, table, ImmutableList.of(ICEBERG),
null, null);
+ ConversionConfig conversionConfigDelta =
+ getTableSyncConfig(HUDI, syncMode, tableName, table,
ImmutableList.of(DELTA), null, null);
ConversionController conversionController =
new ConversionController(jsc.hadoopConfiguration());
- conversionController.sync(perTableConfigIceberg,
conversionSourceProvider);
+ conversionController.sync(conversionConfigIceberg,
conversionSourceProvider);
checkDatasetEquivalence(HUDI, table, Collections.singletonList(ICEBERG),
100);
- conversionController.sync(perTableConfigDelta, conversionSourceProvider);
+ conversionController.sync(conversionConfigDelta,
conversionSourceProvider);
checkDatasetEquivalence(HUDI, table, Collections.singletonList(DELTA),
100);
table.insertRecords(100, true);
- conversionController.sync(perTableConfigIceberg,
conversionSourceProvider);
+ conversionController.sync(conversionConfigIceberg,
conversionSourceProvider);
checkDatasetEquivalence(HUDI, table, Collections.singletonList(ICEBERG),
200);
- conversionController.sync(perTableConfigDelta, conversionSourceProvider);
+ conversionController.sync(conversionConfigDelta,
conversionSourceProvider);
checkDatasetEquivalence(HUDI, table, Collections.singletonList(DELTA),
200);
}
}
@@ -558,21 +543,18 @@ public class ITConversionController {
try (TestJavaHudiTable table =
TestJavaHudiTable.forStandardSchema(
tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) {
- PerTableConfig singleTableConfig =
- PerTableConfigImpl.builder()
- .tableName(tableName)
- .targetTableFormats(ImmutableList.of(ICEBERG))
- .tableBasePath(table.getBasePath())
- .syncMode(SyncMode.INCREMENTAL)
- .build();
-
- PerTableConfig dualTableConfig =
- PerTableConfigImpl.builder()
- .tableName(tableName)
- .targetTableFormats(Arrays.asList(ICEBERG, DELTA))
- .tableBasePath(table.getBasePath())
- .syncMode(SyncMode.INCREMENTAL)
- .build();
+ ConversionConfig singleTableConfig =
+ getTableSyncConfig(
+ HUDI, SyncMode.INCREMENTAL, tableName, table,
ImmutableList.of(ICEBERG), null, null);
+ ConversionConfig dualTableConfig =
+ getTableSyncConfig(
+ HUDI,
+ SyncMode.INCREMENTAL,
+ tableName,
+ table,
+ Arrays.asList(ICEBERG, DELTA),
+ null,
+ null);
table.insertRecords(50, true);
ConversionController conversionController =
@@ -612,18 +594,20 @@ public class ITConversionController {
table.insertRows(20);
ConversionController conversionController =
new ConversionController(jsc.hadoopConfiguration());
- PerTableConfig perTableConfig =
- PerTableConfigImpl.builder()
- .tableName(tableName)
- .targetTableFormats(Collections.singletonList(ICEBERG))
- .tableBasePath(table.getBasePath())
- .syncMode(SyncMode.INCREMENTAL)
- .build();
- conversionController.sync(perTableConfig, conversionSourceProvider);
+ ConversionConfig conversionConfig =
+ getTableSyncConfig(
+ HUDI,
+ SyncMode.INCREMENTAL,
+ tableName,
+ table,
+ Collections.singletonList(ICEBERG),
+ null,
+ null);
+ conversionController.sync(conversionConfig, conversionSourceProvider);
table.insertRows(10);
- conversionController.sync(perTableConfig, conversionSourceProvider);
+ conversionController.sync(conversionConfig, conversionSourceProvider);
table.insertRows(10);
- conversionController.sync(perTableConfig, conversionSourceProvider);
+ conversionController.sync(conversionConfig, conversionSourceProvider);
// corrupt last two snapshots
Table icebergTable = new
HadoopTables(jsc.hadoopConfiguration()).load(table.getBasePath());
long currentSnapshotId = icebergTable.currentSnapshot().snapshotId();
@@ -633,7 +617,7 @@ public class ITConversionController {
Files.delete(
Paths.get(URI.create(icebergTable.snapshot(previousSnapshotId).manifestListLocation())));
table.insertRows(10);
- conversionController.sync(perTableConfig, conversionSourceProvider);
+ conversionController.sync(conversionConfig, conversionSourceProvider);
checkDatasetEquivalence(HUDI, table, Collections.singletonList(ICEBERG),
50);
}
}
@@ -645,18 +629,19 @@ public class ITConversionController {
try (TestJavaHudiTable table =
TestJavaHudiTable.forStandardSchema(
tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) {
- PerTableConfig perTableConfig =
- PerTableConfigImpl.builder()
- .tableName(tableName)
- .targetTableFormats(Arrays.asList(ICEBERG, DELTA))
- .tableBasePath(table.getBasePath())
- .syncMode(SyncMode.INCREMENTAL)
- .targetMetadataRetentionInHours(0) // force cleanup
- .build();
+ ConversionConfig conversionConfig =
+ getTableSyncConfig(
+ HUDI,
+ SyncMode.INCREMENTAL,
+ tableName,
+ table,
+ Arrays.asList(ICEBERG, DELTA),
+ null,
+ Duration.ofHours(0)); // force cleanup
ConversionController conversionController =
new ConversionController(jsc.hadoopConfiguration());
table.insertRecords(10, true);
- conversionController.sync(perTableConfig, conversionSourceProvider);
+ conversionController.sync(conversionConfig, conversionSourceProvider);
// later we will ensure we can still read the source table at this
instant to ensure that
// neither target cleaned up the underlying parquet files in the table
Instant instantAfterFirstCommit = Instant.now();
@@ -667,7 +652,7 @@ public class ITConversionController {
.forEach(
unused -> {
table.insertRecords(10, true);
- conversionController.sync(perTableConfig,
conversionSourceProvider);
+ conversionController.sync(conversionConfig,
conversionSourceProvider);
});
// ensure that hudi rows can still be read and underlying files were not
removed
List<Row> rows =
@@ -859,4 +844,45 @@ public class ITConversionController {
Optional<String> hudiSourceConfig;
String filter;
}
+
+ private static ConversionConfig getTableSyncConfig(
+ String sourceTableFormat,
+ SyncMode syncMode,
+ String tableName,
+ GenericTable table,
+ List<String> targetTableFormats,
+ String partitionConfig,
+ Duration metadataRetention) {
+ Properties sourceProperties = new Properties();
+ if (partitionConfig != null) {
+ sourceProperties.put(PARTITION_FIELD_SPEC_CONFIG, partitionConfig);
+ }
+ SourceTable sourceTable =
+ SourceTable.builder()
+ .name(tableName)
+ .formatName(sourceTableFormat)
+ .basePath(table.getBasePath())
+ .dataPath(table.getDataPath())
+ .additionalProperties(sourceProperties)
+ .build();
+
+ List<TargetTable> targetTables =
+ targetTableFormats.stream()
+ .map(
+ formatName ->
+ TargetTable.builder()
+ .name(tableName)
+ .formatName(formatName)
+ // set the metadata path to the data path as the
default (required by Hudi)
+ .basePath(table.getDataPath())
+ .metadataRetention(metadataRetention)
+ .build())
+ .collect(Collectors.toList());
+
+ return ConversionConfig.builder()
+ .sourceTable(sourceTable)
+ .targetTables(targetTables)
+ .syncMode(syncMode)
+ .build();
+ }
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionConfig.java
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionConfig.java
new file mode 100644
index 00000000..6e502af0
--- /dev/null
+++
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionConfig.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.conversion;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+
+import java.util.Collections;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.xtable.model.sync.SyncMode;
+
+class TestConversionConfig {
+
+ @Test
+ void defaultValueSet() {
+ ConversionConfig conversionConfig =
+ ConversionConfig.builder()
+ .sourceTable(mock(SourceTable.class))
+ .targetTables(Collections.singletonList(mock(TargetTable.class)))
+ .build();
+
+ assertEquals(SyncMode.INCREMENTAL, conversionConfig.getSyncMode());
+ }
+
+ @Test
+ void errorIfSourceTableNotSet() {
+ assertThrows(
+ NullPointerException.class,
+ () ->
+ ConversionConfig.builder()
+
.targetTables(Collections.singletonList(mock(TargetTable.class)))
+ .build());
+ }
+
+ @Test
+ void errorIfNoTargetsSet() {
+ Exception thrownException =
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ ConversionConfig.builder()
+ .sourceTable(mock(SourceTable.class))
+ .targetTables(Collections.emptyList())
+ .build());
+ assertEquals("Please provide at-least one format to sync",
thrownException.getMessage());
+ }
+}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
index 558b8ee5..652bbe42 100644
---
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
+++
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java
@@ -18,7 +18,7 @@
package org.apache.xtable.conversion;
-import static org.apache.xtable.GenericTable.getTableName;
+import static org.apache.xtable.model.storage.TableFormat.HUDI;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
@@ -79,13 +79,16 @@ public class TestConversionController {
Map<String, SyncResult> perTableResults = new HashMap<>();
perTableResults.put(TableFormat.ICEBERG, syncResult);
perTableResults.put(TableFormat.DELTA, syncResult);
- PerTableConfig perTableConfig =
- getPerTableConfig(Arrays.asList(TableFormat.ICEBERG,
TableFormat.DELTA), syncMode);
-
when(mockConversionSourceProvider.getConversionSourceInstance(perTableConfig))
+ ConversionConfig conversionConfig =
+ getTableSyncConfig(Arrays.asList(TableFormat.ICEBERG,
TableFormat.DELTA), syncMode);
+ when(mockConversionSourceProvider.getConversionSourceInstance(
+ conversionConfig.getSourceTable()))
.thenReturn(mockConversionSource);
- when(mockConversionTargetFactory.createForFormat(TableFormat.ICEBERG,
perTableConfig, mockConf))
+ when(mockConversionTargetFactory.createForFormat(
+ conversionConfig.getTargetTables().get(0), mockConf))
.thenReturn(mockConversionTarget1);
- when(mockConversionTargetFactory.createForFormat(TableFormat.DELTA,
perTableConfig, mockConf))
+ when(mockConversionTargetFactory.createForFormat(
+ conversionConfig.getTargetTables().get(1), mockConf))
.thenReturn(mockConversionTarget2);
when(mockConversionSource.getCurrentSnapshot()).thenReturn(internalSnapshot);
when(tableFormatSync.syncSnapshot(
@@ -95,20 +98,23 @@ public class TestConversionController {
ConversionController conversionController =
new ConversionController(mockConf, mockConversionTargetFactory,
tableFormatSync);
Map<String, SyncResult> result =
- conversionController.sync(perTableConfig,
mockConversionSourceProvider);
+ conversionController.sync(conversionConfig,
mockConversionSourceProvider);
assertEquals(perTableResults, result);
}
@Test
void testAllIncrementalSyncAsPerConfigAndNoFallbackNecessary() {
SyncMode syncMode = SyncMode.INCREMENTAL;
- PerTableConfig perTableConfig =
- getPerTableConfig(Arrays.asList(TableFormat.ICEBERG,
TableFormat.DELTA), syncMode);
-
when(mockConversionSourceProvider.getConversionSourceInstance(perTableConfig))
+ ConversionConfig conversionConfig =
+ getTableSyncConfig(Arrays.asList(TableFormat.ICEBERG,
TableFormat.DELTA), syncMode);
+ when(mockConversionSourceProvider.getConversionSourceInstance(
+ conversionConfig.getSourceTable()))
.thenReturn(mockConversionSource);
- when(mockConversionTargetFactory.createForFormat(TableFormat.ICEBERG,
perTableConfig, mockConf))
+ when(mockConversionTargetFactory.createForFormat(
+ conversionConfig.getTargetTables().get(0), mockConf))
.thenReturn(mockConversionTarget1);
- when(mockConversionTargetFactory.createForFormat(TableFormat.DELTA,
perTableConfig, mockConf))
+ when(mockConversionTargetFactory.createForFormat(
+ conversionConfig.getTargetTables().get(1), mockConf))
.thenReturn(mockConversionTarget2);
Instant instantAsOfNow = Instant.now();
@@ -178,7 +184,7 @@ public class TestConversionController {
ConversionController conversionController =
new ConversionController(mockConf, mockConversionTargetFactory,
tableFormatSync);
Map<String, SyncResult> result =
- conversionController.sync(perTableConfig,
mockConversionSourceProvider);
+ conversionController.sync(conversionConfig,
mockConversionSourceProvider);
assertEquals(expectedSyncResult, result);
}
@@ -192,13 +198,16 @@ public class TestConversionController {
Map<String, SyncResult> syncResults = new HashMap<>();
syncResults.put(TableFormat.ICEBERG, syncResult);
syncResults.put(TableFormat.DELTA, syncResult);
- PerTableConfig perTableConfig =
- getPerTableConfig(Arrays.asList(TableFormat.ICEBERG,
TableFormat.DELTA), syncMode);
-
when(mockConversionSourceProvider.getConversionSourceInstance(perTableConfig))
+ ConversionConfig conversionConfig =
+ getTableSyncConfig(Arrays.asList(TableFormat.ICEBERG,
TableFormat.DELTA), syncMode);
+ when(mockConversionSourceProvider.getConversionSourceInstance(
+ conversionConfig.getSourceTable()))
.thenReturn(mockConversionSource);
- when(mockConversionTargetFactory.createForFormat(TableFormat.ICEBERG,
perTableConfig, mockConf))
+ when(mockConversionTargetFactory.createForFormat(
+ conversionConfig.getTargetTables().get(0), mockConf))
.thenReturn(mockConversionTarget1);
- when(mockConversionTargetFactory.createForFormat(TableFormat.DELTA,
perTableConfig, mockConf))
+ when(mockConversionTargetFactory.createForFormat(
+ conversionConfig.getTargetTables().get(1), mockConf))
.thenReturn(mockConversionTarget2);
Instant instantAsOfNow = Instant.now();
@@ -219,20 +228,23 @@ public class TestConversionController {
ConversionController conversionController =
new ConversionController(mockConf, mockConversionTargetFactory,
tableFormatSync);
Map<String, SyncResult> result =
- conversionController.sync(perTableConfig,
mockConversionSourceProvider);
+ conversionController.sync(conversionConfig,
mockConversionSourceProvider);
assertEquals(syncResults, result);
}
@Test
void testIncrementalSyncFallbackToSnapshotForOnlySingleFormat() {
SyncMode syncMode = SyncMode.INCREMENTAL;
- PerTableConfig perTableConfig =
- getPerTableConfig(Arrays.asList(TableFormat.ICEBERG,
TableFormat.DELTA), syncMode);
-
when(mockConversionSourceProvider.getConversionSourceInstance(perTableConfig))
+ ConversionConfig conversionConfig =
+ getTableSyncConfig(Arrays.asList(TableFormat.ICEBERG,
TableFormat.DELTA), syncMode);
+ when(mockConversionSourceProvider.getConversionSourceInstance(
+ conversionConfig.getSourceTable()))
.thenReturn(mockConversionSource);
- when(mockConversionTargetFactory.createForFormat(TableFormat.ICEBERG,
perTableConfig, mockConf))
+ when(mockConversionTargetFactory.createForFormat(
+ conversionConfig.getTargetTables().get(0), mockConf))
.thenReturn(mockConversionTarget1);
- when(mockConversionTargetFactory.createForFormat(TableFormat.DELTA,
perTableConfig, mockConf))
+ when(mockConversionTargetFactory.createForFormat(
+ conversionConfig.getTargetTables().get(1), mockConf))
.thenReturn(mockConversionTarget2);
Instant instantAsOfNow = Instant.now();
@@ -300,20 +312,23 @@ public class TestConversionController {
ConversionController conversionController =
new ConversionController(mockConf, mockConversionTargetFactory,
tableFormatSync);
Map<String, SyncResult> result =
- conversionController.sync(perTableConfig,
mockConversionSourceProvider);
+ conversionController.sync(conversionConfig,
mockConversionSourceProvider);
assertEquals(expectedSyncResult, result);
}
@Test
void incrementalSyncWithNoPendingInstantsForAllFormats() {
SyncMode syncMode = SyncMode.INCREMENTAL;
- PerTableConfig perTableConfig =
- getPerTableConfig(Arrays.asList(TableFormat.ICEBERG,
TableFormat.DELTA), syncMode);
-
when(mockConversionSourceProvider.getConversionSourceInstance(perTableConfig))
+ ConversionConfig conversionConfig =
+ getTableSyncConfig(Arrays.asList(TableFormat.ICEBERG,
TableFormat.DELTA), syncMode);
+ when(mockConversionSourceProvider.getConversionSourceInstance(
+ conversionConfig.getSourceTable()))
.thenReturn(mockConversionSource);
- when(mockConversionTargetFactory.createForFormat(TableFormat.ICEBERG,
perTableConfig, mockConf))
+ when(mockConversionTargetFactory.createForFormat(
+ conversionConfig.getTargetTables().get(0), mockConf))
.thenReturn(mockConversionTarget1);
- when(mockConversionTargetFactory.createForFormat(TableFormat.DELTA,
perTableConfig, mockConf))
+ when(mockConversionTargetFactory.createForFormat(
+ conversionConfig.getTargetTables().get(1), mockConf))
.thenReturn(mockConversionTarget2);
Instant instantAsOfNow = Instant.now();
@@ -355,7 +370,7 @@ public class TestConversionController {
ConversionController conversionController =
new ConversionController(mockConf, mockConversionTargetFactory,
tableFormatSync);
Map<String, SyncResult> result =
- conversionController.sync(perTableConfig,
mockConversionSourceProvider);
+ conversionController.sync(conversionConfig,
mockConversionSourceProvider);
assertEquals(expectedSyncResult, result);
}
@@ -394,14 +409,31 @@ public class TestConversionController {
}
private Instant getInstantAtLastNMinutes(Instant currentInstant, int n) {
- return Instant.now().minus(Duration.ofMinutes(n));
+ return currentInstant.minus(Duration.ofMinutes(n));
}
- private PerTableConfig getPerTableConfig(List<String> targetTableFormats,
SyncMode syncMode) {
- return PerTableConfigImpl.builder()
- .tableName(getTableName())
- .tableBasePath("/tmp/doesnt/matter")
- .targetTableFormats(targetTableFormats)
+ private ConversionConfig getTableSyncConfig(List<String> targetTableFormats,
SyncMode syncMode) {
+ SourceTable sourceTable =
+ SourceTable.builder()
+ .name("tablename")
+ .formatName(HUDI)
+ .basePath("/tmp/doesnt/matter")
+ .build();
+
+ List<TargetTable> targetTables =
+ targetTableFormats.stream()
+ .map(
+ formatName ->
+ TargetTable.builder()
+ .name("tablename")
+ .formatName(formatName)
+ .basePath("/tmp/doesnt/matter")
+ .build())
+ .collect(Collectors.toList());
+
+ return ConversionConfig.builder()
+ .sourceTable(sourceTable)
+ .targetTables(targetTables)
.syncMode(syncMode)
.build();
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionTargetFactory.java
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionTargetFactory.java
index 34d3cbef..0984b42b 100644
---
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionTargetFactory.java
+++
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionTargetFactory.java
@@ -24,15 +24,11 @@ import static
org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import java.util.Arrays;
-import java.util.List;
-
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.Test;
import org.apache.xtable.exception.NotSupportedException;
import org.apache.xtable.model.storage.TableFormat;
-import org.apache.xtable.model.sync.SyncMode;
import org.apache.xtable.spi.sync.ConversionTarget;
public class TestConversionTargetFactory {
@@ -42,11 +38,10 @@ public class TestConversionTargetFactory {
ConversionTarget tc =
ConversionTargetFactory.getInstance().createConversionTargetForName(TableFormat.DELTA);
assertNotNull(tc);
- PerTableConfig perTableConfig =
- getPerTableConfig(Arrays.asList(TableFormat.DELTA),
SyncMode.INCREMENTAL);
+ TargetTable targetTable = getPerTableConfig(TableFormat.DELTA);
Configuration conf = new Configuration();
conf.set("spark.master", "local");
- tc.init(perTableConfig, conf);
+ tc.init(targetTable, conf);
assertEquals(tc.getTableFormat(), TableFormat.DELTA);
}
@@ -55,11 +50,10 @@ public class TestConversionTargetFactory {
ConversionTarget tc =
ConversionTargetFactory.getInstance().createConversionTargetForName(TableFormat.HUDI);
assertNotNull(tc);
- PerTableConfig perTableConfig =
- getPerTableConfig(Arrays.asList(TableFormat.HUDI),
SyncMode.INCREMENTAL);
+ TargetTable targetTable = getPerTableConfig(TableFormat.HUDI);
Configuration conf = new Configuration();
conf.setStrings("spark.master", "local");
- tc.init(perTableConfig, conf);
+ tc.init(targetTable, conf);
assertEquals(tc.getTableFormat(), TableFormat.HUDI);
}
@@ -68,11 +62,10 @@ public class TestConversionTargetFactory {
ConversionTarget tc =
ConversionTargetFactory.getInstance().createConversionTargetForName(TableFormat.ICEBERG);
assertNotNull(tc);
- PerTableConfig perTableConfig =
- getPerTableConfig(Arrays.asList(TableFormat.ICEBERG),
SyncMode.INCREMENTAL);
+ TargetTable targetTable = getPerTableConfig(TableFormat.ICEBERG);
Configuration conf = new Configuration();
conf.setStrings("spark.master", "local");
- tc.init(perTableConfig, conf);
+ tc.init(targetTable, conf);
assertEquals(tc.getTableFormat(), TableFormat.ICEBERG);
}
@@ -88,22 +81,18 @@ public class TestConversionTargetFactory {
@Test
public void testConversionTargetFromFormatType() {
- PerTableConfig perTableConfig =
- getPerTableConfig(Arrays.asList(TableFormat.DELTA),
SyncMode.INCREMENTAL);
+ TargetTable targetTable = getPerTableConfig(TableFormat.DELTA);
Configuration conf = new Configuration();
conf.setStrings("spark.master", "local");
- ConversionTarget tc =
- ConversionTargetFactory.getInstance()
- .createForFormat(TableFormat.DELTA, perTableConfig, conf);
+ ConversionTarget tc =
ConversionTargetFactory.getInstance().createForFormat(targetTable, conf);
assertEquals(tc.getTableFormat(), TableFormat.DELTA);
}
- private PerTableConfig getPerTableConfig(List<String> targetTableFormats,
SyncMode syncMode) {
- return PerTableConfigImpl.builder()
- .tableName(getTableName())
- .tableBasePath("/tmp/doesnt/matter")
- .targetTableFormats(targetTableFormats)
- .syncMode(syncMode)
+ private TargetTable getPerTableConfig(String tableFormat) {
+ return TargetTable.builder()
+ .name(getTableName())
+ .basePath("/tmp/doesnt/matter")
+ .formatName(tableFormat)
.build();
}
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestPerTableConfig.java
b/xtable-core/src/test/java/org/apache/xtable/conversion/TestPerTableConfig.java
deleted file mode 100644
index 4f057385..00000000
---
a/xtable-core/src/test/java/org/apache/xtable/conversion/TestPerTableConfig.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.xtable.conversion;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-import java.util.Collections;
-
-import org.junit.jupiter.api.Test;
-
-import org.apache.xtable.hudi.HudiSourceConfigImpl;
-import org.apache.xtable.model.storage.TableFormat;
-import org.apache.xtable.model.sync.SyncMode;
-
-class TestPerTableConfig {
-
- @Test
- void sanitizePath() {
- PerTableConfig tooManySlashes =
- PerTableConfigImpl.builder()
- .tableBasePath("s3://bucket//path")
- .tableName("name")
- .targetTableFormats(Collections.singletonList(TableFormat.ICEBERG))
- .build();
- assertEquals("s3://bucket/path", tooManySlashes.getTableBasePath());
-
- PerTableConfig localFilePath =
- PerTableConfigImpl.builder()
- .tableBasePath("/local/data//path")
- .tableName("name")
- .targetTableFormats(Collections.singletonList(TableFormat.ICEBERG))
- .build();
- assertEquals("file:///local/data/path", localFilePath.getTableBasePath());
-
- PerTableConfig properLocalFilePath =
- PerTableConfigImpl.builder()
- .tableBasePath("file:///local/data//path")
- .tableName("name")
- .targetTableFormats(Collections.singletonList(TableFormat.ICEBERG))
- .build();
- assertEquals("file:///local/data/path",
properLocalFilePath.getTableBasePath());
- }
-
- @Test
- void defaultValueSet() {
- PerTableConfig perTableConfig =
- PerTableConfigImpl.builder()
- .tableBasePath("file://bucket/path")
- .tableName("name")
- .targetTableFormats(Collections.singletonList(TableFormat.ICEBERG))
- .build();
-
- assertEquals(24 * 7, perTableConfig.getTargetMetadataRetentionInHours());
- assertEquals(SyncMode.INCREMENTAL, perTableConfig.getSyncMode());
- assertEquals(HudiSourceConfigImpl.builder().build(),
perTableConfig.getHudiSourceConfig());
- assertNull(perTableConfig.getNamespace());
- assertNull(perTableConfig.getIcebergCatalogConfig());
- }
-
- @Test
- void errorIfRequiredArgsNotSet() {
- assertThrows(
- NullPointerException.class,
- () ->
- PerTableConfigImpl.builder()
- .tableName("name")
-
.targetTableFormats(Collections.singletonList(TableFormat.ICEBERG))
- .build());
-
- assertThrows(
- NullPointerException.class,
- () ->
- PerTableConfigImpl.builder()
- .tableBasePath("file://bucket/path")
-
.targetTableFormats(Collections.singletonList(TableFormat.ICEBERG))
- .build());
-
- assertThrows(
- NullPointerException.class,
- () ->
- PerTableConfigImpl.builder()
- .tableBasePath("file://bucket/path")
- .tableName("name")
- .build());
- }
-
- @Test
- void errorIfNoTargetsSet() {
- Exception thrownException =
- assertThrows(
- IllegalArgumentException.class,
- () ->
- PerTableConfigImpl.builder()
- .tableName("name")
- .tableBasePath("file://bucket/path")
- .targetTableFormats(Collections.emptyList())
- .build());
- assertEquals("Please provide at-least one format to sync",
thrownException.getMessage());
- }
-}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java
b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java
index 3e2d61f5..8fcf0753 100644
---
a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java
+++
b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java
@@ -53,8 +53,7 @@ import org.junit.jupiter.params.provider.MethodSource;
import org.apache.xtable.GenericTable;
import org.apache.xtable.TestSparkDeltaTable;
import org.apache.xtable.ValidationTestHelper;
-import org.apache.xtable.conversion.PerTableConfig;
-import org.apache.xtable.conversion.PerTableConfigImpl;
+import org.apache.xtable.conversion.SourceTable;
import org.apache.xtable.model.CommitsBacklog;
import org.apache.xtable.model.InstantsForIncrementalSync;
import org.apache.xtable.model.InternalSnapshot;
@@ -149,7 +148,7 @@ public class ITDeltaConversionTargetSource {
hadoopConf.set("fs.defaultFS", "file:///");
conversionSourceProvider = new DeltaConversionSourceProvider();
- conversionSourceProvider.init(hadoopConf, null);
+ conversionSourceProvider.init(hadoopConf);
}
@Test
@@ -165,11 +164,11 @@ public class ITDeltaConversionTargetSource {
+ basePath
+ "' AS SELECT * FROM VALUES (1, 2)");
// Create Delta source
- PerTableConfig tableConfig =
- PerTableConfigImpl.builder()
- .tableName(tableName)
- .tableBasePath(basePath.toString())
- .targetTableFormats(Collections.singletonList(TableFormat.ICEBERG))
+ SourceTable tableConfig =
+ SourceTable.builder()
+ .name(tableName)
+ .basePath(basePath.toString())
+ .formatName(TableFormat.DELTA)
.build();
DeltaConversionSource conversionSource =
conversionSourceProvider.getConversionSourceInstance(tableConfig);
@@ -223,11 +222,11 @@ public class ITDeltaConversionTargetSource {
+ basePath
+ "' AS SELECT 'SingleValue' AS part_col, 1 AS col1, 2 AS col2");
// Create Delta source
- PerTableConfig tableConfig =
- PerTableConfigImpl.builder()
- .tableName(tableName)
- .tableBasePath(basePath.toString())
- .targetTableFormats(Collections.singletonList(TableFormat.ICEBERG))
+ SourceTable tableConfig =
+ SourceTable.builder()
+ .name(tableName)
+ .basePath(basePath.toString())
+ .formatName(TableFormat.DELTA)
.build();
DeltaConversionSource conversionSource =
conversionSourceProvider.getConversionSourceInstance(tableConfig);
@@ -311,11 +310,11 @@ public class ITDeltaConversionTargetSource {
+ tableName
+ "` VALUES(1, CAST('2012-02-12 00:12:34' AS TIMESTAMP))");
// Create Delta source
- PerTableConfig tableConfig =
- PerTableConfigImpl.builder()
- .tableName(tableName)
- .tableBasePath(basePath.toString())
- .targetTableFormats(Collections.singletonList(TableFormat.ICEBERG))
+ SourceTable tableConfig =
+ SourceTable.builder()
+ .name(tableName)
+ .basePath(basePath.toString())
+ .formatName(TableFormat.DELTA)
.build();
DeltaConversionSource conversionSource =
conversionSourceProvider.getConversionSourceInstance(tableConfig);
@@ -350,11 +349,11 @@ public class ITDeltaConversionTargetSource {
testSparkDeltaTable.insertRows(50);
allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
- PerTableConfig tableConfig =
- PerTableConfigImpl.builder()
- .tableName(testSparkDeltaTable.getTableName())
- .tableBasePath(testSparkDeltaTable.getBasePath())
- .targetTableFormats(Arrays.asList(TableFormat.HUDI,
TableFormat.ICEBERG))
+ SourceTable tableConfig =
+ SourceTable.builder()
+ .name(testSparkDeltaTable.getTableName())
+ .basePath(testSparkDeltaTable.getBasePath())
+ .formatName(TableFormat.DELTA)
.build();
DeltaConversionSource conversionSource =
conversionSourceProvider.getConversionSourceInstance(tableConfig);
@@ -390,11 +389,11 @@ public class ITDeltaConversionTargetSource {
// Insert 50 rows to 2018 partition.
List<Row> commit1Rows = testSparkDeltaTable.insertRowsForPartition(50,
2018);
Long timestamp1 = testSparkDeltaTable.getLastCommitTimestamp();
- PerTableConfig tableConfig =
- PerTableConfigImpl.builder()
- .tableName(testSparkDeltaTable.getTableName())
- .tableBasePath(testSparkDeltaTable.getBasePath())
- .targetTableFormats(Arrays.asList(TableFormat.HUDI,
TableFormat.ICEBERG))
+ SourceTable tableConfig =
+ SourceTable.builder()
+ .name(testSparkDeltaTable.getTableName())
+ .basePath(testSparkDeltaTable.getBasePath())
+ .formatName(TableFormat.DELTA)
.build();
DeltaConversionSource conversionSource =
conversionSourceProvider.getConversionSourceInstance(tableConfig);
@@ -458,11 +457,11 @@ public class ITDeltaConversionTargetSource {
testSparkDeltaTable.insertRows(50);
allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
- PerTableConfig tableConfig =
- PerTableConfigImpl.builder()
- .tableName(testSparkDeltaTable.getTableName())
- .tableBasePath(testSparkDeltaTable.getBasePath())
- .targetTableFormats(Arrays.asList(TableFormat.HUDI,
TableFormat.ICEBERG))
+ SourceTable tableConfig =
+ SourceTable.builder()
+ .name(testSparkDeltaTable.getTableName())
+ .basePath(testSparkDeltaTable.getBasePath())
+ .formatName(TableFormat.DELTA)
.build();
DeltaConversionSource conversionSource =
conversionSourceProvider.getConversionSourceInstance(tableConfig);
@@ -506,11 +505,11 @@ public class ITDeltaConversionTargetSource {
testSparkDeltaTable.insertRows(50);
allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
- PerTableConfig tableConfig =
- PerTableConfigImpl.builder()
- .tableName(testSparkDeltaTable.getTableName())
- .tableBasePath(testSparkDeltaTable.getBasePath())
- .targetTableFormats(Arrays.asList(TableFormat.HUDI,
TableFormat.ICEBERG))
+ SourceTable tableConfig =
+ SourceTable.builder()
+ .name(testSparkDeltaTable.getTableName())
+ .basePath(testSparkDeltaTable.getBasePath())
+ .formatName(TableFormat.DELTA)
.build();
DeltaConversionSource conversionSource =
conversionSourceProvider.getConversionSourceInstance(tableConfig);
@@ -563,11 +562,11 @@ public class ITDeltaConversionTargetSource {
testSparkDeltaTable.insertRowsForPartition(20, partitionValueToDelete);
allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
- PerTableConfig tableConfig =
- PerTableConfigImpl.builder()
- .tableName(testSparkDeltaTable.getTableName())
- .tableBasePath(testSparkDeltaTable.getBasePath())
- .targetTableFormats(Arrays.asList(TableFormat.HUDI,
TableFormat.ICEBERG))
+ SourceTable tableConfig =
+ SourceTable.builder()
+ .name(testSparkDeltaTable.getTableName())
+ .basePath(testSparkDeltaTable.getBasePath())
+ .formatName(TableFormat.DELTA)
.build();
DeltaConversionSource conversionSource =
conversionSourceProvider.getConversionSourceInstance(tableConfig);
@@ -623,11 +622,11 @@ public class ITDeltaConversionTargetSource {
testSparkDeltaTable.insertRows(50);
allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
- PerTableConfig tableConfig =
- PerTableConfigImpl.builder()
- .tableName(testSparkDeltaTable.getTableName())
- .tableBasePath(testSparkDeltaTable.getBasePath())
- .targetTableFormats(Arrays.asList(TableFormat.HUDI,
TableFormat.ICEBERG))
+ SourceTable tableConfig =
+ SourceTable.builder()
+ .name(testSparkDeltaTable.getTableName())
+ .basePath(testSparkDeltaTable.getBasePath())
+ .formatName(TableFormat.DELTA)
.build();
DeltaConversionSource conversionSource =
conversionSourceProvider.getConversionSourceInstance(tableConfig);
diff --git
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java
index 135ce995..f0f889d2 100644
--- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java
+++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java
@@ -26,7 +26,9 @@ import static
org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.time.Duration;
import java.time.Instant;
+import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -76,7 +78,7 @@ import io.delta.standalone.expressions.Literal;
import io.delta.standalone.types.IntegerType;
import io.delta.standalone.types.StringType;
-import org.apache.xtable.conversion.PerTableConfigImpl;
+import org.apache.xtable.conversion.TargetTable;
import org.apache.xtable.model.InternalSnapshot;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.schema.InternalField;
@@ -126,11 +128,11 @@ public class TestDeltaSync {
Files.createDirectories(basePath);
conversionTarget =
new DeltaConversionTarget(
- PerTableConfigImpl.builder()
- .tableName(tableName)
- .tableBasePath(basePath.toString())
- .targetMetadataRetentionInHours(1)
-
.targetTableFormats(Collections.singletonList(TableFormat.DELTA))
+ TargetTable.builder()
+ .name(tableName)
+ .basePath(basePath.toString())
+ .metadataRetention(Duration.of(1, ChronoUnit.HOURS))
+ .formatName(TableFormat.DELTA)
.build(),
sparkSession);
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceSource.java
b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceSource.java
index 408e4373..c074bc23 100644
---
a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceSource.java
+++
b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceSource.java
@@ -569,7 +569,7 @@ public class ITHudiConversionSourceSource {
.build();
HudiSourcePartitionSpecExtractor partitionSpecExtractor =
new ConfigurationBasedPartitionSpecExtractor(
-
HudiSourceConfigImpl.builder().partitionFieldSpecConfig(xTablePartitionConfig).build());
+
HudiSourceConfig.fromPartitionFieldSpecConfig(xTablePartitionConfig));
return new HudiConversionSource(hoodieTableMetaClient,
partitionSpecExtractor);
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java
b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java
index 7990bfbf..12885567 100644
---
a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java
+++
b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java
@@ -24,6 +24,7 @@ import static
org.apache.xtable.hudi.HudiTestUtil.initTableAndGetMetaClient;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.nio.file.Path;
+import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
@@ -69,7 +70,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
-import org.apache.xtable.conversion.PerTableConfigImpl;
+import org.apache.xtable.conversion.TargetTable;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.metadata.TableSyncMetadata;
import org.apache.xtable.model.schema.InternalField;
@@ -582,11 +583,11 @@ public class ITHudiConversionSourceTarget {
private HudiConversionTarget getTargetClient() {
return new HudiConversionTarget(
- PerTableConfigImpl.builder()
- .tableBasePath(tableBasePath)
- .targetTableFormats(Collections.singletonList(TableFormat.HUDI))
- .tableName("test_table")
- .targetMetadataRetentionInHours(4)
+ TargetTable.builder()
+ .basePath(tableBasePath)
+ .formatName(TableFormat.HUDI)
+ .name("test_table")
+ .metadataRetention(Duration.of(4, ChronoUnit.HOURS))
.build(),
CONFIGURATION,
3);
diff --git
a/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionTargetSource.java
b/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionTargetSource.java
index 997ab184..3f20ac9d 100644
---
a/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionTargetSource.java
+++
b/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionTargetSource.java
@@ -29,7 +29,6 @@ import java.nio.file.Path;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -46,8 +45,7 @@ import org.apache.iceberg.Snapshot;
import org.apache.iceberg.data.Record;
import org.apache.xtable.TestIcebergTable;
-import org.apache.xtable.conversion.PerTableConfig;
-import org.apache.xtable.conversion.PerTableConfigImpl;
+import org.apache.xtable.conversion.SourceTable;
import org.apache.xtable.model.CommitsBacklog;
import org.apache.xtable.model.InstantsForIncrementalSync;
import org.apache.xtable.model.InternalSnapshot;
@@ -65,7 +63,7 @@ public class ITIcebergConversionTargetSource {
@BeforeEach
void setup() {
sourceProvider = new IcebergConversionSourceProvider();
- sourceProvider.init(hadoopConf, null);
+ sourceProvider.init(hadoopConf);
}
@ParameterizedTest
@@ -97,11 +95,11 @@ public class ITIcebergConversionTargetSource {
testIcebergTable.insertRows(50);
allActiveFiles.add(testIcebergTable.getAllActiveFiles());
- PerTableConfig tableConfig =
- PerTableConfigImpl.builder()
- .tableName(testIcebergTable.getTableName())
- .tableBasePath(testIcebergTable.getBasePath())
- .targetTableFormats(Arrays.asList(TableFormat.HUDI,
TableFormat.DELTA))
+ SourceTable tableConfig =
+ SourceTable.builder()
+ .name(testIcebergTable.getTableName())
+ .basePath(testIcebergTable.getBasePath())
+ .formatName(TableFormat.ICEBERG)
.build();
IcebergConversionSource conversionSource =
sourceProvider.getConversionSourceInstance(tableConfig);
@@ -157,11 +155,11 @@ public class ITIcebergConversionTargetSource {
testIcebergTable.insertRecordsForPartition(20, partitionValueToDelete);
allActiveFiles.add(testIcebergTable.getAllActiveFiles());
- PerTableConfig tableConfig =
- PerTableConfigImpl.builder()
- .tableName(testIcebergTable.getTableName())
- .tableBasePath(testIcebergTable.getBasePath())
- .targetTableFormats(Arrays.asList(TableFormat.HUDI,
TableFormat.DELTA))
+ SourceTable tableConfig =
+ SourceTable.builder()
+ .name(testIcebergTable.getTableName())
+ .basePath(testIcebergTable.getBasePath())
+ .formatName(TableFormat.ICEBERG)
.build();
IcebergConversionSource conversionSource =
sourceProvider.getConversionSourceInstance(tableConfig);
@@ -217,11 +215,11 @@ public class ITIcebergConversionTargetSource {
testIcebergTable.insertRecordsForPartition(20, partitionValueToDelete);
allActiveFiles.add(testIcebergTable.getAllActiveFiles());
- PerTableConfig tableConfig =
- PerTableConfigImpl.builder()
- .tableName(testIcebergTable.getTableName())
- .tableBasePath(testIcebergTable.getBasePath())
- .targetTableFormats(Arrays.asList(TableFormat.HUDI,
TableFormat.DELTA))
+ SourceTable tableConfig =
+ SourceTable.builder()
+ .name(testIcebergTable.getTableName())
+ .basePath(testIcebergTable.getBasePath())
+ .formatName(TableFormat.ICEBERG)
.build();
IcebergConversionSource conversionSource =
sourceProvider.getConversionSourceInstance(tableConfig);
@@ -277,11 +275,11 @@ public class ITIcebergConversionTargetSource {
testIcebergTable.insertRows(50);
allActiveFiles.add(testIcebergTable.getAllActiveFiles());
- PerTableConfig tableConfig =
- PerTableConfigImpl.builder()
- .tableName(testIcebergTable.getTableName())
- .tableBasePath(testIcebergTable.getBasePath())
- .targetTableFormats(Arrays.asList(TableFormat.HUDI,
TableFormat.DELTA))
+ SourceTable tableConfig =
+ SourceTable.builder()
+ .name(testIcebergTable.getTableName())
+ .basePath(testIcebergTable.getBasePath())
+ .formatName(TableFormat.ICEBERG)
.build();
IcebergConversionSource conversionSource =
sourceProvider.getConversionSourceInstance(tableConfig);
@@ -318,12 +316,11 @@ public class ITIcebergConversionTargetSource {
// Insert 50 rows to INFO partition.
List<Record> commit1Rows =
testIcebergTable.insertRecordsForPartition(50, "INFO");
Long timestamp1 = testIcebergTable.getLastCommitTimestamp();
- PerTableConfig tableConfig =
- PerTableConfigImpl.builder()
- .tableName(testIcebergTable.getTableName())
- .tableBasePath(testIcebergTable.getBasePath())
- .tableDataPath(testIcebergTable.getDataPath())
- .targetTableFormats(Arrays.asList(TableFormat.HUDI,
TableFormat.DELTA))
+ SourceTable tableConfig =
+ SourceTable.builder()
+ .name(testIcebergTable.getTableName())
+ .basePath(testIcebergTable.getBasePath())
+ .formatName(TableFormat.ICEBERG)
.build();
// Upsert all rows inserted before, so all files are replaced.
diff --git
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionTargetSource.java
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionTargetSource.java
index d100a313..784d6585 100644
---
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionTargetSource.java
+++
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionTargetSource.java
@@ -26,7 +26,6 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -48,8 +47,7 @@ import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.types.Types;
-import org.apache.xtable.conversion.PerTableConfig;
-import org.apache.xtable.conversion.PerTableConfigImpl;
+import org.apache.xtable.conversion.SourceTable;
import org.apache.xtable.model.CommitsBacklog;
import org.apache.xtable.model.InstantsForIncrementalSync;
import org.apache.xtable.model.InternalSnapshot;
@@ -77,7 +75,7 @@ class TestIcebergConversionTargetSource {
hadoopConf.set("fs.defaultFS", "file:///");
sourceProvider = new IcebergConversionSourceProvider();
- sourceProvider.init(hadoopConf, null);
+ sourceProvider.init(hadoopConf);
tableManager = IcebergTableManager.of(hadoopConf);
@@ -91,7 +89,7 @@ class TestIcebergConversionTargetSource {
@Test
void getTableTest(@TempDir Path workingDir) throws IOException {
Table catalogSales = createTestTableWithData(workingDir.toString());
- PerTableConfig sourceTableConfig = getPerTableConfig(catalogSales);
+ SourceTable sourceTableConfig = getPerTableConfig(catalogSales);
IcebergConversionSource conversionSource =
sourceProvider.getConversionSourceInstance(sourceTableConfig);
@@ -123,7 +121,7 @@ class TestIcebergConversionTargetSource {
Table catalogSales = createTestTableWithData(workingDir.toString());
Snapshot iceCurrentSnapshot = catalogSales.currentSnapshot();
- PerTableConfig sourceTableConfig = getPerTableConfig(catalogSales);
+ SourceTable sourceTableConfig = getPerTableConfig(catalogSales);
IcebergDataFileExtractor spyDataFileExtractor =
spy(IcebergDataFileExtractor.builder().build());
IcebergPartitionValueConverter spyPartitionConverter =
@@ -384,7 +382,7 @@ class TestIcebergConversionTargetSource {
}
private IcebergConversionSource getIcebergConversionSource(Table
catalogSales) {
- PerTableConfig tableConfig = getPerTableConfig(catalogSales);
+ SourceTable tableConfig = getPerTableConfig(catalogSales);
return IcebergConversionSource.builder()
.hadoopConf(hadoopConf)
@@ -392,11 +390,11 @@ class TestIcebergConversionTargetSource {
.build();
}
- private static PerTableConfig getPerTableConfig(Table catalogSales) {
- return PerTableConfigImpl.builder()
- .tableName(catalogSales.name())
- .tableBasePath(catalogSales.location())
- .targetTableFormats(Collections.singletonList(TableFormat.DELTA))
+ private static SourceTable getPerTableConfig(Table catalogSales) {
+ return SourceTable.builder()
+ .name(catalogSales.name())
+ .basePath(catalogSales.location())
+ .formatName(TableFormat.ICEBERG)
.build();
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
index 45140b44..bd36dde9 100644
--- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
+++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
@@ -37,7 +37,9 @@ import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.time.Duration;
import java.time.Instant;
+import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -81,7 +83,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import org.apache.xtable.ITConversionController;
-import org.apache.xtable.conversion.PerTableConfigImpl;
+import org.apache.xtable.conversion.TargetTable;
import org.apache.xtable.model.InternalSnapshot;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.metadata.TableSyncMetadata;
@@ -189,11 +191,11 @@ public class TestIcebergSync {
private IcebergConversionTarget getConversionTarget() {
return new IcebergConversionTarget(
- PerTableConfigImpl.builder()
- .tableBasePath(basePath.toString())
- .tableName(tableName)
- .targetMetadataRetentionInHours(1)
- .targetTableFormats(Collections.singletonList(TableFormat.ICEBERG))
+ TargetTable.builder()
+ .basePath(basePath.toString())
+ .name(tableName)
+ .metadataRetention(Duration.of(1, ChronoUnit.HOURS))
+ .formatName(TableFormat.ICEBERG)
.build(),
CONFIGURATION,
mockSchemaExtractor,
diff --git a/xtable-core/src/test/java/org/apache/xtable/loadtest/LoadTest.java
b/xtable-core/src/test/java/org/apache/xtable/loadtest/LoadTest.java
index 8b1a24e0..341b2cb0 100644
--- a/xtable-core/src/test/java/org/apache/xtable/loadtest/LoadTest.java
+++ b/xtable-core/src/test/java/org/apache/xtable/loadtest/LoadTest.java
@@ -18,9 +18,12 @@
package org.apache.xtable.loadtest;
+import static
org.apache.xtable.hudi.HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG;
+
import java.nio.file.Path;
import java.util.Arrays;
-import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -34,11 +37,13 @@ import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.xtable.GenericTable;
import org.apache.xtable.TestJavaHudiTable;
+import org.apache.xtable.conversion.ConversionConfig;
import org.apache.xtable.conversion.ConversionController;
import org.apache.xtable.conversion.ConversionSourceProvider;
-import org.apache.xtable.conversion.PerTableConfig;
-import org.apache.xtable.conversion.PerTableConfigImpl;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.conversion.TargetTable;
import org.apache.xtable.hudi.HudiConversionSourceProvider;
import org.apache.xtable.model.storage.TableFormat;
import org.apache.xtable.model.sync.SyncMode;
@@ -56,7 +61,7 @@ public class LoadTest {
@BeforeEach
public void setup() {
hudiConversionSourceProvider = new HudiConversionSourceProvider();
- hudiConversionSourceProvider.init(CONFIGURATION, Collections.emptyMap());
+ hudiConversionSourceProvider.init(CONFIGURATION);
}
@Test
@@ -75,16 +80,15 @@ public class LoadTest {
.collect(Collectors.toList()),
false);
}
- PerTableConfig perTableConfig =
- PerTableConfigImpl.builder()
- .tableName(tableName)
- .targetTableFormats(Arrays.asList(TableFormat.ICEBERG,
TableFormat.DELTA))
- .tableBasePath(table.getBasePath())
- .syncMode(SyncMode.FULL)
- .build();
+ ConversionConfig conversionConfig =
+ getTableSyncConfig(
+ SyncMode.FULL,
+ tableName,
+ table,
+ Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA));
ConversionController conversionController = new
ConversionController(CONFIGURATION);
long start = System.currentTimeMillis();
- conversionController.sync(perTableConfig, hudiConversionSourceProvider);
+ conversionController.sync(conversionConfig,
hudiConversionSourceProvider);
long end = System.currentTimeMillis();
System.out.println("Full sync took " + (end - start) + "ms");
}
@@ -104,16 +108,15 @@ public class LoadTest {
TestJavaHudiTable.forStandardSchema(
tableName, tempDir, "level:SIMPLE", HoodieTableType.COPY_ON_WRITE,
archivalConfig)) {
table.insertRecords(1, "partition0", false);
- PerTableConfig perTableConfig =
- PerTableConfigImpl.builder()
- .tableName(tableName)
- .targetTableFormats(Arrays.asList(TableFormat.ICEBERG,
TableFormat.DELTA))
- .tableBasePath(table.getBasePath())
- .syncMode(SyncMode.INCREMENTAL)
- .build();
+ ConversionConfig conversionConfig =
+ getTableSyncConfig(
+ SyncMode.INCREMENTAL,
+ tableName,
+ table,
+ Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA));
// sync once to establish first commit
ConversionController conversionController = new
ConversionController(CONFIGURATION);
- conversionController.sync(perTableConfig, hudiConversionSourceProvider);
+ conversionController.sync(conversionConfig,
hudiConversionSourceProvider);
for (int i = 0; i < numCommits; i++) {
table.insertRecords(
1,
@@ -124,9 +127,40 @@ public class LoadTest {
}
long start = System.currentTimeMillis();
- conversionController.sync(perTableConfig, hudiConversionSourceProvider);
+ conversionController.sync(conversionConfig,
hudiConversionSourceProvider);
long end = System.currentTimeMillis();
System.out.println("Incremental sync took " + (end - start) + "ms");
}
}
+
+ private static ConversionConfig getTableSyncConfig(
+ SyncMode syncMode, String tableName, GenericTable table, List<String>
targetTableFormats) {
+ Properties sourceProperties = new Properties();
+ sourceProperties.put(PARTITION_FIELD_SPEC_CONFIG, "level:VALUE");
+ SourceTable sourceTable =
+ SourceTable.builder()
+ .name(tableName)
+ .formatName(TableFormat.HUDI)
+ .basePath(table.getBasePath())
+ .dataPath(table.getDataPath())
+ .additionalProperties(sourceProperties)
+ .build();
+
+ List<TargetTable> targetTables =
+ targetTableFormats.stream()
+ .map(
+ formatName ->
+ TargetTable.builder()
+ .name(tableName)
+ .formatName(formatName)
+ .basePath(table.getBasePath())
+ .build())
+ .collect(Collectors.toList());
+
+ return ConversionConfig.builder()
+ .sourceTable(sourceTable)
+ .targetTables(targetTables)
+ .syncMode(syncMode)
+ .build();
+ }
}
diff --git
a/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncConfig.java
b/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncConfig.java
index 67b00417..4d878cea 100644
---
a/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncConfig.java
+++
b/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncConfig.java
@@ -26,12 +26,12 @@ import org.apache.hudi.sync.common.HoodieSyncConfig;
public class XTableSyncConfig extends HoodieSyncConfig implements Serializable
{
- public static final ConfigProperty<String> ONE_TABLE_FORMATS =
+ public static final ConfigProperty<String> XTABLE_FORMATS =
ConfigProperty.key("hoodie.xtable.formats.to.sync")
.defaultValue("DELTA,ICEBERG")
.withDocumentation("Comma separated list of formats to sync.");
- public static final ConfigProperty<Integer>
ONE_TABLE_TARGET_METADATA_RETENTION_HOURS =
+ public static final ConfigProperty<Integer>
XTABLE_TARGET_METADATA_RETENTION_HOURS =
ConfigProperty.key("hoodie.xtable.target.metadata.retention.hr")
.defaultValue(24 * 7)
.withDocumentation("Retention in hours for metadata in target
table.");
diff --git
a/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncTool.java
b/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncTool.java
index 49a7a743..a9653eb5 100644
---
a/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncTool.java
+++
b/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncTool.java
@@ -18,8 +18,11 @@
package org.apache.xtable.hudi.sync;
+import static
org.apache.xtable.hudi.HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG;
+import static org.apache.xtable.model.storage.TableFormat.HUDI;
+
+import java.time.Duration;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -34,11 +37,11 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.sync.common.HoodieSyncConfig;
import org.apache.hudi.sync.common.HoodieSyncTool;
+import org.apache.xtable.conversion.ConversionConfig;
import org.apache.xtable.conversion.ConversionController;
-import org.apache.xtable.conversion.PerTableConfig;
-import org.apache.xtable.conversion.PerTableConfigImpl;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.conversion.TargetTable;
import org.apache.xtable.hudi.HudiConversionSourceProvider;
-import org.apache.xtable.hudi.HudiSourceConfigImpl;
import org.apache.xtable.model.schema.PartitionTransformType;
import org.apache.xtable.model.sync.SyncMode;
import org.apache.xtable.model.sync.SyncResult;
@@ -55,39 +58,57 @@ public class XTableSyncTool extends HoodieSyncTool {
super(props, hadoopConf);
this.config = new XTableSyncConfig(props);
this.hudiConversionSourceProvider = new HudiConversionSourceProvider();
- hudiConversionSourceProvider.init(hadoopConf, Collections.emptyMap());
+ hudiConversionSourceProvider.init(hadoopConf);
}
@Override
public void syncHoodieTable() {
List<String> formatsToSync =
-
Arrays.stream(config.getString(XTableSyncConfig.ONE_TABLE_FORMATS).split(","))
+
Arrays.stream(config.getString(XTableSyncConfig.XTABLE_FORMATS).split(","))
.map(format -> format.toUpperCase())
.collect(Collectors.toList());
String basePath = config.getString(HoodieSyncConfig.META_SYNC_BASE_PATH);
String tableName =
config.getString(HoodieTableConfig.HOODIE_TABLE_NAME_KEY);
- PerTableConfig perTableConfig =
- PerTableConfigImpl.builder()
- .tableName(tableName)
- .tableBasePath(basePath)
- .targetTableFormats(formatsToSync)
- .hudiSourceConfig(
- HudiSourceConfigImpl.builder()
- .partitionFieldSpecConfig(getPartitionSpecConfig())
- .build())
+ Properties sourceProperties = new Properties();
+ sourceProperties.put(PARTITION_FIELD_SPEC_CONFIG,
getPartitionSpecConfig());
+ SourceTable sourceTable =
+ SourceTable.builder()
+ .name(tableName)
+ .formatName(HUDI)
+ .basePath(basePath)
+ .additionalProperties(sourceProperties)
+ .build();
+ Duration metadataRetention =
+
config.contains(XTableSyncConfig.XTABLE_TARGET_METADATA_RETENTION_HOURS)
+ ? Duration.ofHours(
+
config.getInt(XTableSyncConfig.XTABLE_TARGET_METADATA_RETENTION_HOURS))
+ : null;
+ List<TargetTable> targetTables =
+ formatsToSync.stream()
+ .map(
+ format ->
+ TargetTable.builder()
+ .basePath(basePath)
+ .metadataRetention(metadataRetention)
+ .formatName(format)
+ .name(tableName)
+ .build())
+ .collect(Collectors.toList());
+ ConversionConfig conversionConfig =
+ ConversionConfig.builder()
+ .sourceTable(sourceTable)
+ .targetTables(targetTables)
.syncMode(SyncMode.INCREMENTAL)
- .targetMetadataRetentionInHours(
-
config.getInt(XTableSyncConfig.ONE_TABLE_TARGET_METADATA_RETENTION_HOURS))
.build();
Map<String, SyncResult> results =
- new ConversionController(hadoopConf).sync(perTableConfig,
hudiConversionSourceProvider);
+ new ConversionController(hadoopConf).sync(conversionConfig,
hudiConversionSourceProvider);
String failingFormats =
results.entrySet().stream()
.filter(
entry ->
entry.getValue().getStatus().getStatusCode()
!= SyncResult.SyncStatusCode.SUCCESS)
- .map(entry -> entry.getKey().toString())
+ .map(Map.Entry::getKey)
.collect(Collectors.joining(","));
if (!failingFormats.isEmpty()) {
throw new HoodieException("Unable to sync to InternalTable for formats:
" + failingFormats);
diff --git
a/xtable-hudi-support/xtable-hudi-support-extensions/src/test/java/org/apache/xtable/hudi/sync/TestXTableSyncTool.java
b/xtable-hudi-support/xtable-hudi-support-extensions/src/test/java/org/apache/xtable/hudi/sync/TestXTableSyncTool.java
index d44465c8..4024674b 100644
---
a/xtable-hudi-support/xtable-hudi-support-extensions/src/test/java/org/apache/xtable/hudi/sync/TestXTableSyncTool.java
+++
b/xtable-hudi-support/xtable-hudi-support-extensions/src/test/java/org/apache/xtable/hudi/sync/TestXTableSyncTool.java
@@ -98,7 +98,7 @@ public class TestXTableSyncTool {
writeBasicHudiTable(path, options);
Properties properties = new Properties();
- properties.put(XTableSyncConfig.ONE_TABLE_FORMATS.key(), "iceberg,DELTA");
+ properties.put(XTableSyncConfig.XTABLE_FORMATS.key(), "iceberg,DELTA");
properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(),
partitionPath);
properties.put(HoodieSyncConfig.META_SYNC_BASE_PATH.key(), path);
properties.putAll(options);
diff --git
a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
index 7fb65c42..c84753de 100644
--- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
+++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
@@ -25,6 +25,8 @@ import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
import lombok.Data;
import lombok.extern.log4j.Log4j2;
@@ -44,17 +46,16 @@ import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.xtable.conversion.ConversionConfig;
import org.apache.xtable.conversion.ConversionController;
import org.apache.xtable.conversion.ConversionSourceProvider;
-import org.apache.xtable.conversion.PerTableConfig;
-import org.apache.xtable.conversion.PerTableConfigImpl;
-import org.apache.xtable.hudi.ConfigurationBasedPartitionSpecExtractor;
-import org.apache.xtable.hudi.HudiSourceConfigImpl;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.conversion.TargetTable;
+import org.apache.xtable.hudi.HudiSourceConfig;
import org.apache.xtable.iceberg.IcebergCatalogConfig;
import org.apache.xtable.model.storage.TableFormat;
import org.apache.xtable.model.sync.SyncMode;
import org.apache.xtable.reflection.ReflectionUtils;
-import
org.apache.xtable.utilities.RunSync.TableFormatConverters.ConversionConfig;
/**
* Provides a standalone runner for the sync process. See README.md for more
details on how to run
@@ -129,7 +130,7 @@ public class RunSync {
String sourceFormat = datasetConfig.sourceFormat;
customConfig = getCustomConfigurations(cmd, CONVERTERS_CONFIG_PATH);
TableFormatConverters tableFormatConverters =
loadTableFormatConversionConfigs(customConfig);
- ConversionConfig sourceConversionConfig =
+ TableFormatConverters.ConversionConfig sourceConversionConfig =
tableFormatConverters.getTableFormatConverters().get(sourceFormat);
if (sourceConversionConfig == null) {
throw new IllegalArgumentException(
@@ -140,7 +141,7 @@ public class RunSync {
String sourceProviderClass =
sourceConversionConfig.conversionSourceProviderClass;
ConversionSourceProvider<?> conversionSourceProvider =
ReflectionUtils.createInstanceOfClass(sourceProviderClass);
- conversionSourceProvider.init(hadoopConf,
sourceConversionConfig.configuration);
+ conversionSourceProvider.init(hadoopConf);
List<String> tableFormatList = datasetConfig.getTargetFormats();
ConversionController conversionController = new
ConversionController(hadoopConf);
@@ -149,24 +150,45 @@ public class RunSync {
"Running sync for basePath {} for following table formats {}",
table.getTableBasePath(),
tableFormatList);
- PerTableConfig config =
- PerTableConfigImpl.builder()
- .tableBasePath(table.getTableBasePath())
- .tableName(table.getTableName())
+ Properties sourceProperties = new Properties();
+ if (table.getPartitionSpec() != null) {
+ sourceProperties.put(
+ HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG,
table.getPartitionSpec());
+ }
+ SourceTable sourceTable =
+ SourceTable.builder()
+ .name(table.getTableName())
+ .basePath(table.getTableBasePath())
.namespace(table.getNamespace() == null ? null :
table.getNamespace().split("\\."))
- .tableDataPath(table.getTableDataPath())
- .icebergCatalogConfig(icebergCatalogConfig)
- .hudiSourceConfig(
- HudiSourceConfigImpl.builder()
- .partitionSpecExtractorClass(
-
ConfigurationBasedPartitionSpecExtractor.class.getName())
- .partitionFieldSpecConfig(table.getPartitionSpec())
- .build())
- .targetTableFormats(tableFormatList)
+ .dataPath(table.getTableDataPath())
+ .catalogConfig(icebergCatalogConfig)
+ .additionalProperties(sourceProperties)
+ .formatName(sourceFormat)
+ .build();
+ List<TargetTable> targetTables =
+ tableFormatList.stream()
+ .map(
+ tableFormat ->
+ TargetTable.builder()
+ .name(table.getTableName())
+ .basePath(table.getTableBasePath())
+ .namespace(
+ table.getNamespace() == null
+ ? null
+ : table.getNamespace().split("\\."))
+ .catalogConfig(icebergCatalogConfig)
+ .formatName(tableFormat)
+ .build())
+ .collect(Collectors.toList());
+
+ ConversionConfig conversionConfig =
+ ConversionConfig.builder()
+ .sourceTable(sourceTable)
+ .targetTables(targetTables)
.syncMode(SyncMode.INCREMENTAL)
.build();
try {
- conversionController.sync(config, conversionSourceProvider);
+ conversionController.sync(conversionConfig, conversionSourceProvider);
} catch (Exception e) {
log.error(String.format("Error running sync for %s",
table.getTableBasePath()), e);
}