This is an automated email from the ASF dual-hosted git repository.
kipk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4b639f6b75 [GOBBLIN-2159] Support partition level copy in
Iceberg-Distcp (#4058)
4b639f6b75 is described below
commit 4b639f6b75ff716f8f2903678c6bf89e0b7a1690
Author: Vivek Rai <[email protected]>
AuthorDate: Thu Oct 24 00:17:01 2024 +0530
[GOBBLIN-2159] Support partition level copy in Iceberg-Distcp (#4058)
---
.../copy/iceberg/BaseIcebergCatalog.java | 14 +-
.../management/copy/iceberg/IcebergCatalog.java | 4 +-
.../copy/iceberg/IcebergDatasetFinder.java | 9 +-
.../copy/iceberg/IcebergHiveCatalog.java | 6 +
.../iceberg/IcebergOverwritePartitionsStep.java | 164 ++++++++++++
.../copy/iceberg/IcebergPartitionDataset.java | 233 ++++++++++++++++
.../iceberg/IcebergPartitionDatasetFinder.java | 64 +++++
.../data/management/copy/iceberg/IcebergTable.java | 85 +++++-
...MatchesAnyPropNamePartitionFilterPredicate.java | 67 +++++
.../IcebergPartitionFilterPredicateUtil.java | 73 ++++++
.../copy/iceberg/IcebergDatasetTest.java | 45 ++--
.../IcebergOverwritePartitionsStepTest.java | 156 +++++++++++
.../copy/iceberg/IcebergPartitionDatasetTest.java | 292 +++++++++++++++++++++
.../management/copy/iceberg/IcebergTableTest.java | 131 ++++++++-
...hesAnyPropNamePartitionFilterPredicateTest.java | 60 +++++
.../IcebergPartitionFilterPredicateUtilTest.java | 106 ++++++++
16 files changed, 1475 insertions(+), 34 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
index 9e2ae53b99..b16e1aaa72 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
@@ -21,10 +21,12 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.iceberg.exceptions.NoSuchTableException;
/**
* Base implementation of {@link IcebergCatalog} to access {@link
IcebergTable} and the
@@ -41,9 +43,15 @@ public abstract class BaseIcebergCatalog implements
IcebergCatalog {
}
@Override
- public IcebergTable openTable(String dbName, String tableName) {
+ public IcebergTable openTable(String dbName, String tableName) throws
IcebergTable.TableNotFoundException {
TableIdentifier tableId = TableIdentifier.of(dbName, tableName);
- return new IcebergTable(tableId, calcDatasetDescriptorName(tableId),
getDatasetDescriptorPlatform(), createTableOperations(tableId),
this.getCatalogUri());
+ try {
+ return new IcebergTable(tableId, calcDatasetDescriptorName(tableId),
getDatasetDescriptorPlatform(),
+ createTableOperations(tableId), this.getCatalogUri(),
loadTableInstance(tableId));
+ } catch (NoSuchTableException ex) {
+ // defend against `org.apache.iceberg.catalog.Catalog::loadTable`
throwing inside some `@Override` of `loadTableInstance`
+ throw new IcebergTable.TableNotFoundException(tableId);
+ }
}
protected Catalog createCompanionCatalog(Map<String, String> properties,
Configuration configuration) {
@@ -67,4 +75,6 @@ public abstract class BaseIcebergCatalog implements
IcebergCatalog {
}
protected abstract TableOperations createTableOperations(TableIdentifier
tableId);
+
+ protected abstract Table loadTableInstance(TableIdentifier tableId);
}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
index 68e9bb31c6..05ddaf9c52 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
@@ -29,10 +29,10 @@ import org.apache.iceberg.catalog.TableIdentifier;
public interface IcebergCatalog {
/** @return table identified by `dbName` and `tableName` */
- IcebergTable openTable(String dbName, String tableName);
+ IcebergTable openTable(String dbName, String tableName) throws
IcebergTable.TableNotFoundException;
/** @return table identified by `tableId` */
- default IcebergTable openTable(TableIdentifier tableId) {
+ default IcebergTable openTable(TableIdentifier tableId) throws
IcebergTable.TableNotFoundException {
// CHALLENGE: clearly better to implement in the reverse direction -
`openTable(String, String)` in terms of `openTable(TableIdentifier)` -
// but challenging to do at this point, with multiple derived classes
already "in the wild" that implement `openTable(String, String)`
return openTable(tableId.namespace().toString(), tableId.name());
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
index f6668f5d18..e6afe37877 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
@@ -85,7 +85,7 @@ public class IcebergDatasetFinder implements
IterableDatasetFinder<IcebergDatase
}
protected final FileSystem sourceFs;
- private final Properties properties;
+ protected final Properties properties;
/**
* Finds all {@link IcebergDataset}s in the file system using the Iceberg
Catalog.
@@ -153,7 +153,7 @@ public class IcebergDatasetFinder implements
IterableDatasetFinder<IcebergDatase
IcebergTable destIcebergTable =
destinationIcebergCatalog.openTable(destDbName, destTableName);
// TODO: Rethink strategy to enforce dest iceberg table
Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable),
String.format("Missing Destination Iceberg Table: {%s}.{%s}", destDbName,
destTableName));
- return new IcebergDataset(srcIcebergTable, destIcebergTable, properties,
fs, getConfigShouldCopyMetadataPath(properties));
+ return createSpecificDataset(srcIcebergTable, destIcebergTable,
properties, fs, getConfigShouldCopyMetadataPath(properties));
}
protected static IcebergCatalog createIcebergCatalog(Properties properties,
CatalogLocation location) throws IOException {
@@ -165,6 +165,11 @@ public class IcebergDatasetFinder implements
IterableDatasetFinder<IcebergDatase
return IcebergCatalogFactory.create(icebergCatalogClassName,
catalogProperties, configuration);
}
+ protected IcebergDataset createSpecificDataset(IcebergTable srcIcebergTable,
IcebergTable destIcebergTable, Properties properties, FileSystem fs, boolean
shouldIncludeMetadataPath)
+ throws IOException {
+ return new IcebergDataset(srcIcebergTable, destIcebergTable, properties,
fs, shouldIncludeMetadataPath);
+ }
+
protected static boolean getConfigShouldCopyMetadataPath(Properties
properties) {
return
Boolean.valueOf(properties.getProperty(ICEBERG_DATASET_SHOULD_COPY_METADATA_PATH,
DEFAULT_ICEBERG_DATASET_SHOULD_COPY_METADATA_PATH));
}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java
index af541a79a5..27ea723df5 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java
@@ -21,6 +21,7 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hive.HiveCatalog;
@@ -61,4 +62,9 @@ public class IcebergHiveCatalog extends BaseIcebergCatalog {
public boolean tableAlreadyExists(IcebergTable icebergTable) {
return hc.tableExists(icebergTable.getTableId());
}
+
+ @Override
+ protected Table loadTableInstance(TableIdentifier tableId) {
+ return hc.loadTable(tableId);
+ }
}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
new file mode 100644
index 0000000000..dffcbccb27
--- /dev/null
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.util.SerializationUtil;
+
+import com.github.rholder.retry.Attempt;
+import com.github.rholder.retry.RetryException;
+import com.github.rholder.retry.RetryListener;
+import com.github.rholder.retry.Retryer;
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.commit.CommitStep;
+import org.apache.gobblin.util.retry.RetryerFactory;
+
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_INTERVAL_MS;
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TIMES;
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TYPE;
+import static org.apache.gobblin.util.retry.RetryerFactory.RetryType;
+
+/**
+ * Commit step for overwriting partitions in an Iceberg table.
+ * <p>
+ * This class implements the {@link CommitStep} interface and provides
functionality to overwrite
+ * partitions in the destination Iceberg table using serialized data files.
+ * </p>
+ */
+@Slf4j
+public class IcebergOverwritePartitionsStep implements CommitStep {
+ private final String destTableIdStr;
+ private final Properties properties;
+ private final byte[] serializedDataFiles;
+ private final String partitionColName;
+ private final String partitionValue;
+ public static final String OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX =
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX +
+ ".catalog.overwrite.partitions.retries";
+ private static final Config RETRYER_FALLBACK_CONFIG =
ConfigFactory.parseMap(ImmutableMap.of(
+ RETRY_INTERVAL_MS, TimeUnit.SECONDS.toMillis(3L),
+ RETRY_TIMES, 3,
+ RETRY_TYPE, RetryType.FIXED_ATTEMPT.name()));
+
+ /**
+ * Constructs an {@code IcebergReplacePartitionsStep} with the specified
parameters.
+ *
+ * @param destTableIdStr the identifier of the destination table as a string
+ * @param serializedDataFiles [from List<DataFiles>] the serialized data
files to be used for replacing partitions
+ * @param properties the properties containing configuration
+ */
+ public IcebergOverwritePartitionsStep(String destTableIdStr, String
partitionColName, String partitionValue, byte[] serializedDataFiles, Properties
properties) {
+ this.destTableIdStr = destTableIdStr;
+ this.partitionColName = partitionColName;
+ this.partitionValue = partitionValue;
+ this.serializedDataFiles = serializedDataFiles;
+ this.properties = properties;
+ }
+
+ @Override
+ public boolean isCompleted() {
+ return false;
+ }
+
+ /**
+ * Executes the partition replacement in the destination Iceberg table.
+ * Also, have retry mechanism as done in {@link
IcebergRegisterStep#execute()}
+ *
+ * @throws IOException if an I/O error occurs during execution
+ */
+ @Override
+ public void execute() throws IOException {
+ // Unlike IcebergRegisterStep::execute, which validates dest table
metadata has not changed between copy entity
+ // generation and the post-copy commit, do no such validation here, so
dest table writes may continue throughout
+ // our copying. any new data written in the meanwhile to THE SAME
partitions we are about to overwrite will be
+ // clobbered and replaced by the copy entities from our execution.
+ IcebergTable destTable =
createDestinationCatalog().openTable(TableIdentifier.parse(this.destTableIdStr));
+ List<DataFile> dataFiles =
SerializationUtil.deserializeFromBytes(this.serializedDataFiles);
+ try {
+ log.info("~{}~ Starting partition overwrite - partition: {}; value: {};
numDataFiles: {}; path[0]: {}",
+ this.destTableIdStr,
+ this.partitionColName,
+ this.partitionValue,
+ dataFiles.size(),
+ dataFiles.get(0).path()
+ );
+ Retryer<Void> overwritePartitionsRetryer =
createOverwritePartitionsRetryer();
+ overwritePartitionsRetryer.call(() -> {
+ destTable.overwritePartition(dataFiles, this.partitionColName,
this.partitionValue);
+ return null;
+ });
+ log.info("~{}~ Successful partition overwrite - partition: {}; value:
{}",
+ this.destTableIdStr,
+ this.partitionColName,
+ this.partitionValue
+ );
+ } catch (ExecutionException executionException) {
+ String msg = String.format("~%s~ Failed to overwrite partitions",
this.destTableIdStr);
+ log.error(msg, executionException);
+ throw new RuntimeException(msg, executionException.getCause());
+ } catch (RetryException retryException) {
+ String interruptedNote = Thread.currentThread().isInterrupted() ? "...
then interrupted" : "";
+ String msg = String.format("~%s~ Failure attempting to overwrite
partition [num failures: %d] %s",
+ this.destTableIdStr,
+ retryException.getNumberOfFailedAttempts(),
+ interruptedNote);
+ Throwable informativeException =
retryException.getLastFailedAttempt().hasException()
+ ? retryException.getLastFailedAttempt().getExceptionCause()
+ : retryException;
+ log.error(msg, informativeException);
+ throw new RuntimeException(msg, informativeException);
+ }
+ }
+
+ protected IcebergCatalog createDestinationCatalog() throws IOException {
+ return IcebergDatasetFinder.createIcebergCatalog(this.properties,
IcebergDatasetFinder.CatalogLocation.DESTINATION);
+ }
+
+ private Retryer<Void> createOverwritePartitionsRetryer() {
+ Config config = ConfigFactory.parseProperties(this.properties);
+ Config retryerOverridesConfig =
config.hasPath(IcebergOverwritePartitionsStep.OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX)
+ ?
config.getConfig(IcebergOverwritePartitionsStep.OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX)
+ : ConfigFactory.empty();
+
+ return
RetryerFactory.newInstance(retryerOverridesConfig.withFallback(RETRYER_FALLBACK_CONFIG),
Optional.of(new RetryListener() {
+ @Override
+ public <V> void onRetry(Attempt<V> attempt) {
+ if (attempt.hasException()) {
+ String msg = String.format("~%s~ Exception while overwriting
partitions [attempt: %d; elapsed: %s]",
+ destTableIdStr,
+ attempt.getAttemptNumber(),
+
Duration.ofMillis(attempt.getDelaySinceFirstAttempt()).toString());
+ log.warn(msg, attempt.getExceptionCause());
+ }
+ }
+ }));
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
new file mode 100644
index 0000000000..42582f09e3
--- /dev/null
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.util.SerializationUtil;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.ImmutableList;
+import com.google.common.base.Preconditions;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
+import org.apache.gobblin.data.management.copy.CopyableDataset;
+import org.apache.gobblin.util.function.CheckedExceptionFunction;
+import
org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergMatchesAnyPropNamePartitionFilterPredicate;
+import
org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateUtil;
+
+/**
+ * Iceberg Partition dataset implementing {@link CopyableDataset}
+ * <p>
+ * This class extends {@link IcebergDataset} and provides functionality to
filter partitions
+ * and generate copy entities for partition based data movement.
+ * </p>
+ */
+@Slf4j
+public class IcebergPartitionDataset extends IcebergDataset {
+ // Currently hardcoded these transforms here but eventually it will depend
on filter predicate implementation and can
+ // be moved to a common place or inside each filter predicate.
+ private static final List<String> supportedTransforms =
ImmutableList.of("identity", "truncate");
+ private final Predicate<StructLike> partitionFilterPredicate;
+ private final String partitionColumnName;
+ private final String partitionColValue;
+
+ public IcebergPartitionDataset(IcebergTable srcIcebergTable, IcebergTable
destIcebergTable, Properties properties,
+ FileSystem sourceFs, boolean shouldIncludeMetadataPath, String
partitionColumnName, String partitionColValue)
+ throws IOException {
+ super(srcIcebergTable, destIcebergTable, properties, sourceFs,
shouldIncludeMetadataPath);
+ this.partitionColumnName = partitionColumnName;
+ this.partitionColValue = partitionColValue;
+ this.partitionFilterPredicate = createPartitionFilterPredicate();
+ }
+
+ /**
+ * Generates copy entities for partition based data movement.
+ * It finds files specific to the partition and create destination data
files based on the source data files.
+ * Also updates the destination data files with destination table write data
location and add UUID to the file path
+ * to avoid conflicts.
+ *
+ * @param targetFs the target file system
+ * @param copyConfig the copy configuration
+ * @return a collection of copy entities
+ * @throws IOException if an I/O error occurs
+ */
+ @Override
+ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs,
CopyConfiguration copyConfig) throws IOException {
+ // TODO: Refactor the IcebergDataset::generateCopyEntities to avoid code
duplication
+ // Differences are getting data files, copying ancestor permission and
adding post publish steps
+ String fileSet = this.getFileSetId();
+ List<CopyEntity> copyEntities = Lists.newArrayList();
+ IcebergTable srcIcebergTable = getSrcIcebergTable();
+ List<DataFile> srcDataFiles =
srcIcebergTable.getPartitionSpecificDataFiles(this.partitionFilterPredicate);
+ Map<Path, DataFile> destDataFileBySrcPath =
calcDestDataFileBySrcPath(srcDataFiles);
+ Configuration defaultHadoopConfiguration = new Configuration();
+
+ for (Map.Entry<Path, FileStatus> entry :
calcSrcFileStatusByDestFilePath(destDataFileBySrcPath).entrySet()) {
+ Path destPath = entry.getKey();
+ FileStatus srcFileStatus = entry.getValue();
+ // TODO: should be the same FS each time; try creating once, reusing
thereafter, to not recreate wastefully
+ FileSystem actualSourceFs =
getSourceFileSystemFromFileStatus(srcFileStatus, defaultHadoopConfiguration);
+
+ CopyableFile fileEntity = CopyableFile.fromOriginAndDestination(
+ actualSourceFs, srcFileStatus, targetFs.makeQualified(destPath),
copyConfig)
+ .fileSet(fileSet)
+ .datasetOutputPath(targetFs.getUri().getPath())
+ .build();
+
+ fileEntity.setSourceData(getSourceDataset(this.sourceFs));
+ fileEntity.setDestinationData(getDestinationDataset(targetFs));
+ copyEntities.add(fileEntity);
+ }
+
+ // Adding this check to avoid adding post publish step when there are no
files to copy.
+ List<DataFile> destDataFiles = new
ArrayList<>(destDataFileBySrcPath.values());
+ if (CollectionUtils.isNotEmpty(destDataFiles)) {
+ copyEntities.add(createOverwritePostPublishStep(destDataFiles));
+ }
+
+ log.info("~{}~ generated {} copy entities", fileSet, copyEntities.size());
+ return copyEntities;
+ }
+
+ private Map<Path, DataFile> calcDestDataFileBySrcPath(List<DataFile>
srcDataFiles)
+ throws IcebergTable.TableNotFoundException {
+ String fileSet = this.getFileSetId();
+ Map<Path, DataFile> destDataFileBySrcPath = Maps.newHashMap();
+ if (srcDataFiles.isEmpty()) {
+ log.warn("~{}~ found no data files for partition col : {} with partition
value : {} to copy", fileSet,
+ this.partitionColumnName, this.partitionColValue);
+ return destDataFileBySrcPath;
+ }
+ TableMetadata srcTableMetadata =
getSrcIcebergTable().accessTableMetadata();
+ TableMetadata destTableMetadata =
getDestIcebergTable().accessTableMetadata();
+ PartitionSpec partitionSpec = destTableMetadata.spec();
+ // tableMetadata.property(TableProperties.WRITE_DATA_LOCATION, "") returns
null if the property is not set and
+ // doesn't respect passed default value, so to avoid NPE in .replace() we
are setting it to empty string.
+ String srcWriteDataLocation =
Optional.ofNullable(srcTableMetadata.property(TableProperties.WRITE_DATA_LOCATION,
+ "")).orElse("");
+ String destWriteDataLocation =
Optional.ofNullable(destTableMetadata.property(TableProperties.WRITE_DATA_LOCATION,
+ "")).orElse("");
+ if (StringUtils.isEmpty(srcWriteDataLocation) ||
StringUtils.isEmpty(destWriteDataLocation)) {
+ log.warn(
+ "~{}~ Either source or destination table does not have write data
location : source table write data location : {} , destination table write data
location : {}",
+ fileSet,
+ srcWriteDataLocation,
+ destWriteDataLocation
+ );
+ }
+ srcDataFiles.forEach(dataFile -> {
+ String srcFilePath = dataFile.path().toString();
+ Path updatedDestFilePath = relocateDestPath(srcFilePath,
srcWriteDataLocation, destWriteDataLocation);
+ log.debug("~{}~ Path changed from Src : {} to Dest : {}", fileSet,
srcFilePath, updatedDestFilePath);
+ destDataFileBySrcPath.put(new Path(srcFilePath),
DataFiles.builder(partitionSpec)
+ .copy(dataFile)
+ .withPath(updatedDestFilePath.toString())
+ .build());
+ });
+ log.info("~{}~ created {} destination data files", fileSet,
destDataFileBySrcPath.size());
+ return destDataFileBySrcPath;
+ }
+
+ private Path relocateDestPath(String curPathStr, String prefixToBeReplaced,
String prefixToReplaceWith) {
+ String updPathStr = curPathStr.replace(prefixToBeReplaced,
prefixToReplaceWith);
+ return addUUIDToPath(updPathStr);
+ }
+
+ private Path addUUIDToPath(String filePathStr) {
+ Path filePath = new Path(filePathStr);
+ String fileDir = filePath.getParent().toString();
+ String fileName = filePath.getName();
+ String newFileName = String.join("-",UUID.randomUUID().toString(),
fileName);
+ return new Path(fileDir, newFileName);
+ }
+
+ private Map<Path, FileStatus> calcSrcFileStatusByDestFilePath(Map<Path,
DataFile> destDataFileBySrcPath)
+ throws IOException {
+ Function<Path, FileStatus> getFileStatus =
CheckedExceptionFunction.wrapToTunneled(this.sourceFs::getFileStatus);
+ Map<Path, FileStatus> srcFileStatusByDestFilePath = Maps.newHashMap();
+ try {
+ srcFileStatusByDestFilePath = destDataFileBySrcPath.entrySet()
+ .stream()
+ .collect(Collectors.toMap(entry -> new
Path(entry.getValue().path().toString()),
+ entry -> getFileStatus.apply(entry.getKey())));
+ } catch (CheckedExceptionFunction.WrappedIOException wrapper) {
+ wrapper.rethrowWrapped();
+ }
+ return srcFileStatusByDestFilePath;
+ }
+
+ private PostPublishStep createOverwritePostPublishStep(List<DataFile>
destDataFiles) {
+ byte[] serializedDataFiles =
SerializationUtil.serializeToBytes(destDataFiles);
+
+ IcebergOverwritePartitionsStep icebergOverwritePartitionStep = new
IcebergOverwritePartitionsStep(
+ this.getDestIcebergTable().getTableId().toString(),
+ this.partitionColumnName,
+ this.partitionColValue,
+ serializedDataFiles,
+ this.properties
+ );
+
+ return new PostPublishStep(this.getFileSetId(), Maps.newHashMap(),
icebergOverwritePartitionStep, 0);
+ }
+
+ private Predicate<StructLike> createPartitionFilterPredicate() throws
IOException {
+ //TODO: Refactor it later using factory or other way to support different
types of filter predicate
+ // Also take into consideration creation of Expression Filter to be used
in overwrite api
+ TableMetadata srcTableMetadata =
getSrcIcebergTable().accessTableMetadata();
+ Optional<Integer> partitionColumnIndexOpt =
IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex(
+ this.partitionColumnName,
+ srcTableMetadata,
+ supportedTransforms
+ );
+ Preconditions.checkArgument(partitionColumnIndexOpt.isPresent(),
String.format(
+ "Partition column %s not found in table %s",
+ this.partitionColumnName, this.getFileSetId()));
+ int partitionColumnIndex = partitionColumnIndexOpt.get();
+ return new
IcebergMatchesAnyPropNamePartitionFilterPredicate(partitionColumnIndex,
this.partitionColValue);
+ }
+
+}
\ No newline at end of file
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java
new file mode 100644
index 0000000000..581a265e38
--- /dev/null
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+
+import com.google.common.base.Preconditions;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Finder class for locating and creating partitioned Iceberg datasets.
+ * <p>
+ * This class extends {@link IcebergDatasetFinder} and provides functionality
to create
+ * {@link IcebergPartitionDataset} instances based on the specified source and
destination Iceberg catalogs.
+ * </p>
+ */
+@Slf4j
+public class IcebergPartitionDatasetFinder extends IcebergDatasetFinder {
+ public static final String ICEBERG_PARTITION_NAME_KEY = "partition.name";
+ public static final String ICEBERG_PARTITION_VALUE_KEY = "partition.value";
+
+ public IcebergPartitionDatasetFinder(FileSystem sourceFs, Properties
properties) {
+ super(sourceFs, properties);
+ }
+
+ @Override
+ protected IcebergDataset createSpecificDataset(IcebergTable srcIcebergTable,
IcebergTable destIcebergTable,
+ Properties properties, FileSystem fs, boolean shouldIncludeMetadataPath)
throws IOException {
+// TODO: Add Validator for source and destination tables later
+
+ String partitionColumnName = getLocationQualifiedProperty(properties,
IcebergDatasetFinder.CatalogLocation.SOURCE,
+ ICEBERG_PARTITION_NAME_KEY);
+ Preconditions.checkArgument(StringUtils.isNotEmpty(partitionColumnName),
+ "Partition column name cannot be empty");
+
+ String partitionColumnValue = getLocationQualifiedProperty(properties,
IcebergDatasetFinder.CatalogLocation.SOURCE,
+ ICEBERG_PARTITION_VALUE_KEY);
+ Preconditions.checkArgument(StringUtils.isNotEmpty(partitionColumnValue),
+ "Partition value cannot be empty");
+
+ return new IcebergPartitionDataset(srcIcebergTable, destIcebergTable,
properties, fs,
+ getConfigShouldCopyMetadataPath(properties), partitionColumnName,
partitionColumnValue);
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
index e802e10297..5221007cdc 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
@@ -20,20 +20,29 @@ package org.apache.gobblin.data.management.copy.iceberg;
import java.io.IOException;
import java.net.URI;
import java.time.Instant;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.iceberg.DataFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestReader;
+import org.apache.iceberg.OverwriteFiles;
import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import com.google.common.annotations.VisibleForTesting;
@@ -47,6 +56,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.dataset.DatasetConstants;
import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.util.measurement.GrowthMilestoneTracker;
import static
org.apache.gobblin.data.management.copy.iceberg.IcebergSnapshotInfo.ManifestFileInfo;
@@ -77,10 +87,11 @@ public class IcebergTable {
private final String datasetDescriptorPlatform;
private final TableOperations tableOps;
private final String catalogUri;
+ private final Table table;
@VisibleForTesting
- IcebergTable(TableIdentifier tableId, TableOperations tableOps, String
catalogUri) {
- this(tableId, tableId.toString(), DatasetConstants.PLATFORM_ICEBERG,
tableOps, catalogUri);
+ IcebergTable(TableIdentifier tableId, TableOperations tableOps, String
catalogUri, Table table) {
+ this(tableId, tableId.toString(), DatasetConstants.PLATFORM_ICEBERG,
tableOps, catalogUri, table);
}
/** @return metadata info limited to the most recent (current) snapshot */
@@ -217,4 +228,74 @@ public class IcebergTable {
this.tableOps.commit(dstMetadata, srcMetadata);
}
}
+
+ /**
+ * Retrieves a list of data files from the current snapshot that match the
specified partition filter predicate.
+ *
+ * @param icebergPartitionFilterPredicate the predicate to filter partitions
+ * @return a list of data files that match the partition filter predicate
+ * @throws TableNotFoundException if error occurred while accessing the
table metadata
+ * @throws RuntimeException if error occurred while reading the manifest file
+ */
+ public List<DataFile> getPartitionSpecificDataFiles(Predicate<StructLike>
icebergPartitionFilterPredicate)
+ throws IOException {
+ TableMetadata tableMetadata = accessTableMetadata();
+ Snapshot currentSnapshot = tableMetadata.currentSnapshot();
+ long currentSnapshotId = currentSnapshot.snapshotId();
+ List<DataFile> knownDataFiles = new ArrayList<>();
+ GrowthMilestoneTracker growthMilestoneTracker = new
GrowthMilestoneTracker();
+ //TODO: Add support for deleteManifests as well later
+ // Currently supporting dataManifests only
+ List<ManifestFile> dataManifestFiles =
currentSnapshot.dataManifests(this.tableOps.io());
+ for (ManifestFile manifestFile : dataManifestFiles) {
+ if (growthMilestoneTracker.isAnotherMilestone(knownDataFiles.size())) {
+ log.info("~{}~ for snapshot '{}' - before manifest-file '{}' '{}'
total known iceberg datafiles", tableId,
+ currentSnapshotId,
+ manifestFile.path(),
+ knownDataFiles.size()
+ );
+ }
+ try (ManifestReader<DataFile> manifestReader =
ManifestFiles.read(manifestFile, this.tableOps.io());
+ CloseableIterator<DataFile> dataFiles = manifestReader.iterator()) {
+ dataFiles.forEachRemaining(dataFile -> {
+ if (icebergPartitionFilterPredicate.test(dataFile.partition())) {
+ knownDataFiles.add(dataFile.copy());
+ }
+ });
+ } catch (IOException e) {
+ String errMsg = String.format("~%s~ for snapshot '%d' - Failed to read
manifest file: %s", tableId,
+ currentSnapshotId, manifestFile.path());
+ log.error(errMsg, e);
+ throw new IOException(errMsg, e);
+ }
+ }
+ return knownDataFiles;
+ }
+
+ /**
+ * Overwrite partition data files in the table for the specified partition
col name & partition value.
+ * <p>
+ * Overwrite partition replaces the partition using the expression filter
provided.
+ * </p>
+ * @param dataFiles the list of data files to replace partitions with
+ * @param partitionColName the partition column name whose data files are to
be replaced
+ * @param partitionValue the partition column value on which data files
will be replaced
+ */
+ protected void overwritePartition(List<DataFile> dataFiles, String
partitionColName, String partitionValue)
+ throws TableNotFoundException {
+ if (dataFiles.isEmpty()) {
+ return;
+ }
+ log.info("~{}~ SnapshotId before overwrite: {}", tableId,
accessTableMetadata().currentSnapshot().snapshotId());
+ OverwriteFiles overwriteFiles = this.table.newOverwrite();
+ overwriteFiles.overwriteByRowFilter(Expressions.equal(partitionColName,
partitionValue));
+ dataFiles.forEach(overwriteFiles::addFile);
+ overwriteFiles.commit();
+ this.tableOps.refresh();
+ // Note : this would only arise in a high-frequency commit scenario, but
there's no guarantee that the current
+ // snapshot is necessarily the one from the commit just before. another
writer could have just raced to commit
+ // in between.
+ log.info("~{}~ SnapshotId after overwrite: {}", tableId,
accessTableMetadata().currentSnapshot().snapshotId());
+ }
+
}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergMatchesAnyPropNamePartitionFilterPredicate.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergMatchesAnyPropNamePartitionFilterPredicate.java
new file mode 100644
index 0000000000..ee5d6acb28
--- /dev/null
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergMatchesAnyPropNamePartitionFilterPredicate.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg.predicates;
+
+import java.util.Objects;
+import java.util.function.Predicate;
+
+import org.apache.iceberg.StructLike;
+
+/**
+ * Predicate implementation for filtering Iceberg partitions based on
specified partition value.
+ * <p>
+ * This class filters partitions by checking if the partition value matches
the specified partition value.
+ * </p>
+ */
+public class IcebergMatchesAnyPropNamePartitionFilterPredicate implements
Predicate<StructLike> {
+ private final int partitionColumnIndex;
+ private final String partitionValue;
+
+ /**
+ * Constructs an {@code IcebergMatchesAnyPropNamePartitionFilterPredicate}
with the specified parameters.
+ *
+ * @param partitionColumnIndex the index of the partition column in
partition spec
+ * @param partitionValue the partition value used to match
+ */
+ public IcebergMatchesAnyPropNamePartitionFilterPredicate(int
partitionColumnIndex, String partitionValue) {
+ this.partitionColumnIndex = partitionColumnIndex;
+ this.partitionValue = partitionValue;
+ }
+
+ /**
+ * Check if the partition value matches the specified partition value.
+ *
+ * @param partition the partition to check
+ * @return {@code true} if the partition value matches the specified
partition value, otherwise {@code false}
+ */
+ @Override
+ public boolean test(StructLike partition) {
+ // Just a cautious check to avoid NPE, ideally partition shouldn't be null
if table is partitioned
+ if (Objects.isNull(partition)) {
+ return false;
+ }
+
+ Object partitionVal = partition.get(this.partitionColumnIndex,
Object.class);
+ // Need this check to avoid NPE on partitionVal.toString()
+ if (Objects.isNull(partitionVal)) {
+ return false;
+ }
+
+ return this.partitionValue.equals(partitionVal.toString());
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateUtil.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateUtil.java
new file mode 100644
index 0000000000..358fc9de1e
--- /dev/null
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateUtil.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg.predicates;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.TableMetadata;
+
+/**
+ * Utility class for creating and managing partition filter predicates for
Iceberg tables.
+ * <p>
+ * This class provides methods to retrieve the index of a partition column in
the table metadata
+ * and ensures that the partition transform is supported.
+ * </p>
+ * <p>
+ * Note: This class is not meant to be instantiated.
+ * </p>
+ */
+public class IcebergPartitionFilterPredicateUtil {
+ private IcebergPartitionFilterPredicateUtil() {
+ }
+
+ /**
+ * Retrieves the index of the partition column from the partition spec in
the table metadata.
+ *
+ * @param partitionColumnName the name of the partition column to find
+ * @param tableMetadata the metadata of the Iceberg table
+ * @param supportedTransforms a list of supported partition transforms
+ * @return the index of the partition column if found, otherwise -1
+ * @throws IllegalArgumentException if the partition transform is not
supported
+ */
+ public static Optional<Integer> getPartitionColumnIndex(
+ String partitionColumnName,
+ TableMetadata tableMetadata,
+ List<String> supportedTransforms
+ ) throws IOException {
+ List<PartitionField> partitionFields = tableMetadata.spec().fields();
+ for (int idx = 0; idx < partitionFields.size(); idx++) {
+ PartitionField partitionField = partitionFields.get(idx);
+ if (partitionField.name().equals(partitionColumnName)) {
+ String transform = partitionField.transform().toString().toLowerCase();
+ if (!supportedTransforms.contains(transform)) {
+ throw new IOException(
+ String.format(" For ~{%s:%d}~ Partition transform %s is not
supported. Supported transforms are %s",
+ partitionColumnName,
+ idx,
+ transform,
+ supportedTransforms));
+ }
+ return Optional.of(idx);
+ }
+ }
+ return Optional.empty();
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
index b9babbc888..a446fbb1a7 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -92,6 +93,7 @@ public class IcebergDatasetTest {
private static final String MANIFEST_PATH_0 = ROOT_PATH +
"metadata/manifest.a";
private static final String MANIFEST_DATA_PATH_0A = ROOT_PATH + "data/p0/a";
private static final String MANIFEST_DATA_PATH_0B = ROOT_PATH + "data/p0/b";
+ private static final String REGISTER_COMMIT_STEP =
IcebergRegisterStep.class.getName();
private static final MockIcebergTable.SnapshotPaths SNAPSHOT_PATHS_0 =
new MockIcebergTable.SnapshotPaths(Optional.of(METADATA_PATH),
MANIFEST_LIST_PATH_0, Arrays.asList(
new IcebergSnapshotInfo.ManifestFileInfo(MANIFEST_PATH_0,
@@ -120,7 +122,10 @@ public class IcebergDatasetTest {
TableIdentifier tableId = TableIdentifier.of(testDbName, testTblName);
String qualifiedTableName = "foo_prefix." + tableId.toString();
String platformName = "Floe";
- IcebergTable table = new IcebergTable(tableId, qualifiedTableName,
platformName, Mockito.mock(TableOperations.class), SRC_CATALOG_URI);
+ IcebergTable table = new IcebergTable(tableId, qualifiedTableName,
platformName,
+ Mockito.mock(TableOperations.class),
+ SRC_CATALOG_URI,
+ Mockito.mock(Table.class));
FileSystem mockFs = Mockito.mock(FileSystem.class);
Mockito.when(mockFs.getUri()).thenReturn(SRC_FS_URI);
DatasetDescriptor expected = new DatasetDescriptor(platformName,
URI.create(SRC_CATALOG_URI), qualifiedTableName);
@@ -428,17 +433,17 @@ public class IcebergDatasetTest {
for (CopyEntity copyEntity : copyEntities) {
String json = copyEntity.toString();
if (isCopyableFile(json)) {
- String filepath =
CopyEntityDeserializer.getFilePathAsStringFromJson(json);
+ String filepath =
CopyEntityDeserializer.getOriginFilePathAsStringFromJson(json);
actual.add(filepath);
} else{
- verifyPostPublishStep(json);
+ verifyPostPublishStep(json, REGISTER_COMMIT_STEP);
}
}
Assert.assertEquals(actual.size(), expected.size(), "Set" +
actual.toString() + " vs Set" + expected.toString());
Assert.assertEqualsNoOrder(actual.toArray(), expected.toArray());
}
- private static boolean isCopyableFile(String json) {
+ public static boolean isCopyableFile(String json) {
String objectType = new Gson().fromJson(json, JsonObject.class)
.getAsJsonPrimitive("object-type")
.getAsString();
@@ -452,14 +457,14 @@ public class IcebergDatasetTest {
List<CopyEntityDeserializer.FileOwnerAndPermissions>
ancestorFileOwnerAndPermissionsList =
CopyEntityDeserializer.getAncestorOwnerAndPermissions(copyEntityJson);
CopyEntityDeserializer.FileOwnerAndPermissions
destinationFileOwnerAndPermissions =
CopyEntityDeserializer.getDestinationOwnerAndPermissions(copyEntityJson);
- Path filePath = new
Path(CopyEntityDeserializer.getFilePathAsStringFromJson(copyEntityJson));
+ Path filePath = new
Path(CopyEntityDeserializer.getOriginFilePathAsStringFromJson(copyEntityJson));
FileStatus fileStatus = expectedPathsAndFileStatuses.get(filePath);
verifyFileStatus(destinationFileOwnerAndPermissions, fileStatus);
// providing path's parent to verify ancestor owner and permissions
verifyAncestorPermissions(ancestorFileOwnerAndPermissionsList,
filePath.getParent(),
expectedPathsAndFileStatuses);
} else {
- verifyPostPublishStep(copyEntityJson);
+ verifyPostPublishStep(copyEntityJson, REGISTER_COMMIT_STEP);
}
}
}
@@ -481,8 +486,7 @@ public class IcebergDatasetTest {
}
}
- private static void verifyPostPublishStep(String json) {
- String expectedCommitStep =
"org.apache.gobblin.data.management.copy.iceberg.IcebergRegisterStep";
+ public static void verifyPostPublishStep(String json, String
expectedCommitStep) {
String actualCommitStep = new Gson().fromJson(json, JsonObject.class)
.getAsJsonObject("object-data").getAsJsonObject("step").getAsJsonPrimitive("object-type").getAsString();
Assert.assertEquals(actualCommitStep, expectedCommitStep);
@@ -582,7 +586,7 @@ public class IcebergDatasetTest {
return fs;
}
- protected static FileStatus createEmptyFileStatus(String pathString)
throws IOException {
+ public static FileStatus createEmptyFileStatus(String pathString) throws
IOException {
Path path = new Path(pathString);
FileStatus fileStatus = new FileStatus();
fileStatus.setPath(path);
@@ -640,7 +644,7 @@ public class IcebergDatasetTest {
}
}
- private static class CopyEntityDeserializer {
+ protected static class CopyEntityDeserializer {
@Data
public static class FileOwnerAndPermissions {
@@ -652,13 +656,20 @@ public class IcebergDatasetTest {
String otherActionPermission = FsAction.valueOf("READ_WRITE").toString();
}
- public static String getFilePathAsStringFromJson(String json) {
- String filepath = new Gson().fromJson(json, JsonObject.class)
- .getAsJsonObject("object-data")
- .getAsJsonObject("origin")
-
.getAsJsonObject("object-data").getAsJsonObject("path").getAsJsonObject("object-data")
-
.getAsJsonObject("uri").getAsJsonPrimitive("object-data").getAsString();
- return filepath;
+ public static String getOriginFilePathAsStringFromJson(String json) {
+ return new Gson().fromJson(json, JsonObject.class)
+ .getAsJsonObject("object-data")
+ .getAsJsonObject("origin")
+
.getAsJsonObject("object-data").getAsJsonObject("path").getAsJsonObject("object-data")
+
.getAsJsonObject("uri").getAsJsonPrimitive("object-data").getAsString();
+ }
+
+ public static String getDestinationFilePathAsStringFromJson(String json) {
+ return new Gson().fromJson(json, JsonObject.class)
+ .getAsJsonObject("object-data")
+ .getAsJsonObject("destination")
+ .getAsJsonObject("object-data")
+
.getAsJsonObject("uri").getAsJsonPrimitive("object-data").getAsString();
}
public static List<FileOwnerAndPermissions>
getAncestorOwnerAndPermissions(String json) {
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java
new file mode 100644
index 0000000000..6e273ca2d6
--- /dev/null
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.util.SerializationUtil;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TIMES;
+
+/** Tests for {@link IcebergOverwritePartitionsStep} */
+public class IcebergOverwritePartitionsStepTest {
+ private final String destTableIdStr = "db.foo";
+ private final String testPartitionColName = "testPartition";
+ private final String testPartitionColValue = "testValue";
+ private IcebergTable mockIcebergTable;
+ private IcebergCatalog mockIcebergCatalog;
+ private Properties mockProperties;
+ private byte[] serializedDummyDataFiles;
+ private IcebergOverwritePartitionsStep spyIcebergOverwritePartitionsStep;
+
+ @BeforeMethod
+ public void setUp() throws IOException {
+ mockIcebergTable = Mockito.mock(IcebergTable.class);
+ mockIcebergCatalog = Mockito.mock(IcebergCatalog.class);
+ mockProperties = new Properties();
+
+ List<DataFile> dummyDataFiles = createDummyDataFiles();
+ serializedDummyDataFiles =
SerializationUtil.serializeToBytes(dummyDataFiles);
+
+ spyIcebergOverwritePartitionsStep = Mockito.spy(new
IcebergOverwritePartitionsStep(destTableIdStr,
+ testPartitionColName, testPartitionColValue, serializedDummyDataFiles,
mockProperties));
+
+
Mockito.when(mockIcebergCatalog.openTable(Mockito.any(TableIdentifier.class))).thenReturn(mockIcebergTable);
+
Mockito.doReturn(mockIcebergCatalog).when(spyIcebergOverwritePartitionsStep).createDestinationCatalog();
+ }
+
+ @Test
+ public void testNeverIsCompleted() {
+ Assert.assertFalse(spyIcebergOverwritePartitionsStep.isCompleted());
+ }
+
+ @Test
+ public void testExecute() {
+ try {
+
Mockito.doNothing().when(mockIcebergTable).overwritePartition(Mockito.anyList(),
Mockito.anyString(),
+ Mockito.anyString());
+ spyIcebergOverwritePartitionsStep.execute();
+ Mockito.verify(mockIcebergTable,
Mockito.times(1)).overwritePartition(Mockito.anyList(),
+ Mockito.anyString(), Mockito.anyString());
+ } catch (IOException e) {
+ Assert.fail(String.format("Unexpected IOException : %s", e));
+ }
+ }
+
+ @Test
+ public void testExecuteWithRetry() {
+ try {
+ // first call throw exception which will be retried and on second call
nothing happens
+ Mockito.doThrow(new
RuntimeException()).doNothing().when(mockIcebergTable).overwritePartition(Mockito.anyList(),
+ Mockito.anyString(), Mockito.anyString());
+ spyIcebergOverwritePartitionsStep.execute();
+ Mockito.verify(mockIcebergTable,
Mockito.times(2)).overwritePartition(Mockito.anyList(),
+ Mockito.anyString(), Mockito.anyString());
+ } catch (IOException e) {
+ Assert.fail(String.format("Unexpected IOException : %s", e));
+ }
+ }
+
+ @Test
+ public void testExecuteWithDefaultRetry() throws
IcebergTable.TableNotFoundException {
+ try {
+ // Always throw exception
+ Mockito.doThrow(new
RuntimeException()).when(mockIcebergTable).overwritePartition(Mockito.anyList(),
+ Mockito.anyString(), Mockito.anyString());
+ spyIcebergOverwritePartitionsStep.execute();
+ } catch (RuntimeException e) {
+ Mockito.verify(mockIcebergTable,
Mockito.times(3)).overwritePartition(Mockito.anyList(),
+ Mockito.anyString(), Mockito.anyString());
+ assertRetryTimes(e, 3);
+ } catch (IOException e) {
+ Assert.fail(String.format("Unexpected IOException : %s", e));
+ }
+ }
+
+ @Test
+ public void testExecuteWithCustomRetryConfig() throws IOException {
+ int retryCount = 7;
+
mockProperties.setProperty(IcebergOverwritePartitionsStep.OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX
+ "." + RETRY_TIMES,
+ Integer.toString(retryCount));
+ spyIcebergOverwritePartitionsStep = Mockito.spy(new
IcebergOverwritePartitionsStep(destTableIdStr,
+ testPartitionColName, testPartitionColValue, serializedDummyDataFiles,
mockProperties));
+
Mockito.when(mockIcebergCatalog.openTable(Mockito.any(TableIdentifier.class))).thenReturn(mockIcebergTable);
+
Mockito.doReturn(mockIcebergCatalog).when(spyIcebergOverwritePartitionsStep).createDestinationCatalog();
+ try {
+ // Always throw exception
+ Mockito.doThrow(new
RuntimeException()).when(mockIcebergTable).overwritePartition(Mockito.anyList(),
+ Mockito.anyString(), Mockito.anyString());
+ spyIcebergOverwritePartitionsStep.execute();
+ } catch (RuntimeException e) {
+ Mockito.verify(mockIcebergTable,
Mockito.times(retryCount)).overwritePartition(Mockito.anyList(),
+ Mockito.anyString(), Mockito.anyString());
+ assertRetryTimes(e, retryCount);
+ } catch (IOException e) {
+ Assert.fail(String.format("Unexpected IOException : %s", e));
+ }
+ }
+
+ private List<DataFile> createDummyDataFiles() {
+ DataFile dataFile1 = DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath("/path/to/db/foo/data/datafile1.orc")
+ .withFileSizeInBytes(1234)
+ .withRecordCount(100)
+ .build();
+
+ DataFile dataFile2 = DataFiles.builder(PartitionSpec.unpartitioned())
+ .withPath("/path/to/db/foo/data/datafile2.orc")
+ .withFileSizeInBytes(9876)
+ .withRecordCount(50)
+ .build();
+
+ return ImmutableList.of(dataFile1, dataFile2);
+ }
+
+ private void assertRetryTimes(RuntimeException re, Integer retryTimes) {
+ String msg = String.format("~%s~ Failure attempting to overwrite partition
[num failures: %d]", destTableIdStr, retryTimes);
+ Assert.assertTrue(re.getMessage().startsWith(msg), re.getMessage());
+ }
+}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetTest.java
new file mode 100644
index 0000000000..7e50197285
--- /dev/null
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetTest.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopyContext;
+import org.apache.gobblin.data.management.copy.PreserveAttributes;
+import
org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateUtil;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+
+import static org.mockito.ArgumentMatchers.any;
+
+
+/** Tests for {@link
org.apache.gobblin.data.management.copy.iceberg.IcebergPartitionDataset} */
+public class IcebergPartitionDatasetTest {
+ private IcebergTable srcIcebergTable;
+ private IcebergTable destIcebergTable;
+ private TableMetadata srcTableMetadata;
+ private TableMetadata destTableMetadata;
+ private static FileSystem sourceFs;
+ private static FileSystem targetFs;
+ private IcebergPartitionDataset icebergPartitionDataset;
+ private MockedStatic<IcebergPartitionFilterPredicateUtil>
icebergPartitionFilterPredicateUtil;
+ private static final String SRC_TEST_DB = "srcTestDB";
+ private static final String SRC_TEST_TABLE = "srcTestTable";
+ private static final String SRC_WRITE_LOCATION = SRC_TEST_DB + "/" +
SRC_TEST_TABLE + "/data";
+ private static final String DEST_TEST_DB = "destTestDB";
+ private static final String DEST_TEST_TABLE = "destTestTable";
+ private static final String DEST_WRITE_LOCATION = DEST_TEST_DB + "/" +
DEST_TEST_TABLE + "/data";
+ private static final String TEST_ICEBERG_PARTITION_COLUMN_NAME =
"testPartition";
+ private static final String TEST_ICEBERG_PARTITION_COLUMN_VALUE =
"testValue";
+ private static final String OVERWRITE_COMMIT_STEP =
IcebergOverwritePartitionsStep.class.getName();
+ private final Properties copyConfigProperties = new Properties();
+ private final Properties properties = new Properties();
+ private static final URI SRC_FS_URI;
+ private static final URI DEST_FS_URI;
+
+ static {
+ try {
+ SRC_FS_URI = new URI("abc", "the.source.org", "/", null);
+ DEST_FS_URI = new URI("xyz", "the.dest.org", "/", null);
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("should not occur!", e);
+ }
+ }
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ setupSrcFileSystem();
+ setupDestFileSystem();
+
+ TableIdentifier tableIdentifier = TableIdentifier.of(SRC_TEST_DB,
SRC_TEST_TABLE);
+
+ srcIcebergTable = Mockito.mock(IcebergTable.class);
+ destIcebergTable = Mockito.mock(IcebergTable.class);
+
+ srcTableMetadata = Mockito.mock(TableMetadata.class);
+ destTableMetadata = Mockito.mock(TableMetadata.class);
+
Mockito.when(destTableMetadata.spec()).thenReturn(Mockito.mock(PartitionSpec.class));
+
+ Mockito.when(srcIcebergTable.getTableId()).thenReturn(tableIdentifier);
+ Mockito.when(destIcebergTable.getTableId()).thenReturn(tableIdentifier);
+
Mockito.when(srcIcebergTable.accessTableMetadata()).thenReturn(srcTableMetadata);
+
Mockito.when(destIcebergTable.accessTableMetadata()).thenReturn(destTableMetadata);
+
Mockito.when(srcIcebergTable.getDatasetDescriptor(Mockito.any())).thenReturn(Mockito.mock(DatasetDescriptor.class));
+
Mockito.when(destIcebergTable.getDatasetDescriptor(Mockito.any())).thenReturn(Mockito.mock(DatasetDescriptor.class));
+
+ icebergPartitionFilterPredicateUtil =
Mockito.mockStatic(IcebergPartitionFilterPredicateUtil.class);
+ icebergPartitionFilterPredicateUtil
+ .when(() ->
IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex(Mockito.anyString(),
Mockito.any(), Mockito.any()))
+ .thenReturn(Optional.of(0));
+
+ copyConfigProperties.setProperty("data.publisher.final.dir", "/test");
+ }
+
+ @AfterMethod
+ public void cleanUp() {
+ icebergPartitionFilterPredicateUtil.close();
+ }
+
+ @Test
+ public void testGenerateCopyEntities() throws IOException {
+ List<String> srcFilePaths = new ArrayList<>();
+ srcFilePaths.add(SRC_WRITE_LOCATION + "/file1.orc");
+ Map<String, DataFile> mockDataFilesBySrcPath =
createDataFileMocksBySrcPath(srcFilePaths);
+
Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())).thenReturn(
+ new ArrayList<>(mockDataFilesBySrcPath.values()));
+
+ icebergPartitionDataset = new TestIcebergPartitionDataset(srcIcebergTable,
destIcebergTable, properties, sourceFs,
+ true);
+
+ CopyConfiguration copyConfiguration =
+ CopyConfiguration.builder(targetFs,
copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString(""))
+ .copyContext(new CopyContext()).build();
+
+ Collection<CopyEntity> copyEntities =
icebergPartitionDataset.generateCopyEntities(targetFs, copyConfiguration);
+
+ verifyCopyEntities(copyEntities, new
ArrayList<>(mockDataFilesBySrcPath.keySet()), true);
+ }
+
+ @Test
+ public void testGenerateCopyEntitiesWithEmptyDataFiles() throws IOException {
+ List<DataFile> srcDataFiles = Lists.newArrayList();
+
Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())).thenReturn(srcDataFiles);
+
+ icebergPartitionDataset = new IcebergPartitionDataset(srcIcebergTable,
destIcebergTable, properties, sourceFs,
+ true, TEST_ICEBERG_PARTITION_COLUMN_NAME,
TEST_ICEBERG_PARTITION_COLUMN_VALUE);
+ Collection<CopyEntity> copyEntities =
icebergPartitionDataset.generateCopyEntities(targetFs,
+ Mockito.mock(CopyConfiguration.class));
+
+ // Since No data files are present, no copy entities should be generated
+ verifyCopyEntities(copyEntities, Collections.emptyList(), true);
+ }
+
+ @Test
+ public void testMultipleCopyEntitiesGenerated() throws IOException {
+ List<String> srcFilePaths = new ArrayList<>();
+ srcFilePaths.add(SRC_WRITE_LOCATION + "/file1.orc");
+ srcFilePaths.add(SRC_WRITE_LOCATION + "/file2.orc");
+ srcFilePaths.add(SRC_WRITE_LOCATION + "/file3.orc");
+ srcFilePaths.add(SRC_WRITE_LOCATION + "/file4.orc");
+ srcFilePaths.add(SRC_WRITE_LOCATION + "/file5.orc");
+
+ Map<String, DataFile> mockDataFilesBySrcPath =
createDataFileMocksBySrcPath(srcFilePaths);
+
Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())).thenReturn(
+ new ArrayList<>(mockDataFilesBySrcPath.values()));
+
+ icebergPartitionDataset = new TestIcebergPartitionDataset(srcIcebergTable,
destIcebergTable, properties, sourceFs,
+ true);
+
+ CopyConfiguration copyConfiguration =
+ CopyConfiguration.builder(targetFs,
copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString(""))
+ .copyContext(new CopyContext()).build();
+
+ Collection<CopyEntity> copyEntities =
icebergPartitionDataset.generateCopyEntities(targetFs, copyConfiguration);
+
+ verifyCopyEntities(copyEntities, new
ArrayList<>(mockDataFilesBySrcPath.keySet()), true);
+ }
+
+ @Test
+ public void testWithDifferentSrcAndDestTableWriteLocation() throws
IOException {
+ List<String> srcFilePaths = new ArrayList<>();
+ srcFilePaths.add(SRC_WRITE_LOCATION + "/randomFile--Name.orc");
+
Mockito.when(srcTableMetadata.property(TableProperties.WRITE_DATA_LOCATION,
"")).thenReturn(SRC_WRITE_LOCATION);
+
Mockito.when(destTableMetadata.property(TableProperties.WRITE_DATA_LOCATION,
"")).thenReturn(DEST_WRITE_LOCATION);
+
+ Map<String, DataFile> mockDataFilesBySrcPath =
createDataFileMocksBySrcPath(srcFilePaths);
+
Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())).thenReturn(
+ new ArrayList<>(mockDataFilesBySrcPath.values()));
+
+ icebergPartitionDataset = new TestIcebergPartitionDataset(srcIcebergTable,
destIcebergTable, properties, sourceFs,
+ true);
+
+ CopyConfiguration copyConfiguration =
+ CopyConfiguration.builder(targetFs,
copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString(""))
+ .copyContext(new CopyContext()).build();
+
+ List<CopyEntity> copyEntities =
+ (List<CopyEntity>)
icebergPartitionDataset.generateCopyEntities(targetFs, copyConfiguration);
+
+ verifyCopyEntities(copyEntities, new
ArrayList<>(mockDataFilesBySrcPath.keySet()), false);
+ }
+
+ private static void setupSrcFileSystem() throws IOException {
+ sourceFs = Mockito.mock(FileSystem.class);
+ Mockito.when(sourceFs.getUri()).thenReturn(SRC_FS_URI);
+ Mockito.when(sourceFs.makeQualified(any(Path.class)))
+ .thenAnswer(invocation -> invocation.getArgument(0,
Path.class).makeQualified(SRC_FS_URI, new Path("/")));
+
Mockito.when(sourceFs.getFileStatus(any(Path.class))).thenAnswer(invocation -> {
+ Path path = invocation.getArgument(0, Path.class);
+ Path qualifiedPath = sourceFs.makeQualified(path);
+ return
IcebergDatasetTest.MockFileSystemBuilder.createEmptyFileStatus(qualifiedPath.toString());
+ });
+ }
+
+ private static void setupDestFileSystem() throws IOException {
+ targetFs = Mockito.mock(FileSystem.class);
+ Mockito.when(targetFs.getUri()).thenReturn(DEST_FS_URI);
+ Mockito.when(targetFs.makeQualified(any(Path.class)))
+ .thenAnswer(invocation -> invocation.getArgument(0,
Path.class).makeQualified(DEST_FS_URI, new Path("/")));
+ // Since we are adding UUID to the file name for every file while creating
destination path,
+ // so return file not found exception if trying to find file status on
destination file system
+ Mockito.when(targetFs.getFileStatus(any(Path.class))).thenThrow(new
FileNotFoundException());
+ }
+
+ private static Map<String, DataFile>
createDataFileMocksBySrcPath(List<String> srcFilePaths) throws IOException {
+ Map<String, DataFile> dataFileMocksBySrcPath = new HashMap<>();
+ for (String srcFilePath : srcFilePaths) {
+ DataFile dataFile = Mockito.mock(DataFile.class);
+ Path dataFilePath = new Path(srcFilePath);
+ String qualifiedPath = sourceFs.makeQualified(dataFilePath).toString();
+ Mockito.when(dataFile.path()).thenReturn(dataFilePath.toString());
+
Mockito.when(sourceFs.getFileStatus(Mockito.eq(dataFilePath))).thenReturn(
+
IcebergDatasetTest.MockFileSystemBuilder.createEmptyFileStatus(qualifiedPath));
+ dataFileMocksBySrcPath.put(qualifiedPath, dataFile);
+ }
+ return dataFileMocksBySrcPath;
+ }
+
+ private static void verifyCopyEntities(Collection<CopyEntity> copyEntities,
List<String> expectedSrcFilePaths,
+ boolean sameSrcAndDestWriteLocation) {
+ List<String> actualSrcFilePaths = new ArrayList<>();
+ String srcWriteLocationStart = SRC_FS_URI + SRC_WRITE_LOCATION;
+ String destWriteLocationStart = DEST_FS_URI + (sameSrcAndDestWriteLocation
? SRC_WRITE_LOCATION : DEST_WRITE_LOCATION);
+ String srcErrorMsg = String.format("Source Location should start with %s",
srcWriteLocationStart);
+ String destErrorMsg = String.format("Destination Location should start
with %s", destWriteLocationStart);
+ for (CopyEntity copyEntity : copyEntities) {
+ String json = copyEntity.toString();
+ if (IcebergDatasetTest.isCopyableFile(json)) {
+ String originFilepath =
IcebergDatasetTest.CopyEntityDeserializer.getOriginFilePathAsStringFromJson(json);
+ actualSrcFilePaths.add(originFilepath);
+ String destFilepath =
IcebergDatasetTest.CopyEntityDeserializer.getDestinationFilePathAsStringFromJson(json);
+ Assert.assertTrue(originFilepath.startsWith(srcWriteLocationStart),
srcErrorMsg);
+ Assert.assertTrue(destFilepath.startsWith(destWriteLocationStart),
destErrorMsg);
+ String originFileName =
originFilepath.substring(srcWriteLocationStart.length() + 1);
+ String destFileName =
destFilepath.substring(destWriteLocationStart.length() + 1);
+ Assert.assertTrue(destFileName.endsWith(originFileName), "Incorrect
file name in destination path");
+ Assert.assertTrue(destFileName.length() > originFileName.length() + 1,
+ "Destination file name should be longer than source file name as
UUID is appended");
+ } else{
+ IcebergDatasetTest.verifyPostPublishStep(json, OVERWRITE_COMMIT_STEP);
+ }
+ }
+ Assert.assertEquals(actualSrcFilePaths.size(), expectedSrcFilePaths.size(),
+ "Set" + actualSrcFilePaths + " vs Set" + expectedSrcFilePaths);
+ Assert.assertEqualsNoOrder(actualSrcFilePaths.toArray(),
expectedSrcFilePaths.toArray());
+ }
+
+ /**
+ * See {@link
org.apache.gobblin.data.management.copy.iceberg.IcebergDatasetTest.TrickIcebergDataset}
+ * */
+ protected static class TestIcebergPartitionDataset extends
IcebergPartitionDataset {
+
+ public TestIcebergPartitionDataset(IcebergTable srcIcebergTable,
IcebergTable destIcebergTable,
+ Properties properties, FileSystem sourceFs, boolean
shouldIncludeMetadataPath) throws IOException {
+ super(srcIcebergTable, destIcebergTable, properties, sourceFs,
shouldIncludeMetadataPath,
+ TEST_ICEBERG_PARTITION_COLUMN_NAME,
TEST_ICEBERG_PARTITION_COLUMN_VALUE);
+ }
+
+ @Override
+ protected FileSystem getSourceFileSystemFromFileStatus(FileStatus
fileStatus, Configuration hadoopConfig) {
+ return this.sourceFs;
+ }
+ }
+}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
index a1a29444ed..63aa27221b 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
@@ -21,11 +21,13 @@ import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -33,8 +35,11 @@ import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.avro.AvroSchemaUtil;
@@ -61,7 +66,7 @@ public class IcebergTableTest extends HiveMetastoreTest {
.fields()
.name("id")
.type()
- .longType()
+ .stringType()
.noDefault()
.endRecord();
protected static final Schema icebergSchema =
AvroSchemaUtil.toIceberg(avroDataSchema);
@@ -71,6 +76,7 @@ public class IcebergTableTest extends HiveMetastoreTest {
private final String dbName = "myicebergdb";
private final String tableName = "justtesting";
+ private final String destTableName = "destTable";
private TableIdentifier tableId;
private Table table;
private String catalogUri;
@@ -85,7 +91,7 @@ public class IcebergTableTest extends HiveMetastoreTest {
@BeforeMethod
public void setUpEachTest() {
tableId = TableIdentifier.of(dbName, tableName);
- table = catalog.createTable(tableId, icebergSchema);
+ table = catalog.createTable(tableId, icebergSchema, icebergPartitionSpec);
catalogUri = catalog.getConf().get(CatalogProperties.URI);
metadataBasePath = calcMetadataBasePath(tableId);
}
@@ -106,7 +112,8 @@ public class IcebergTableTest extends HiveMetastoreTest {
);
initializeSnapshots(table, perSnapshotFilesets);
- IcebergSnapshotInfo snapshotInfo = new IcebergTable(tableId,
catalog.newTableOps(tableId), catalogUri).getCurrentSnapshotInfo();
+ IcebergSnapshotInfo snapshotInfo = new IcebergTable(tableId,
catalog.newTableOps(tableId), catalogUri,
+ catalog.loadTable(tableId)).getCurrentSnapshotInfo();
verifySnapshotInfo(snapshotInfo, perSnapshotFilesets,
perSnapshotFilesets.size());
}
@@ -114,7 +121,8 @@ public class IcebergTableTest extends HiveMetastoreTest {
@Test(expectedExceptions = IcebergTable.TableNotFoundException.class)
public void testGetCurrentSnapshotInfoOnBogusTable() throws IOException {
TableIdentifier bogusTableId = TableIdentifier.of(dbName, tableName +
"_BOGUS");
- IcebergSnapshotInfo snapshotInfo = new IcebergTable(bogusTableId,
catalog.newTableOps(bogusTableId), catalogUri).getCurrentSnapshotInfo();
+ IcebergSnapshotInfo snapshotInfo = new IcebergTable(bogusTableId,
catalog.newTableOps(bogusTableId), catalogUri,
+ null).getCurrentSnapshotInfo();
Assert.fail("expected an exception when using table ID '" + bogusTableId +
"'");
}
@@ -129,7 +137,8 @@ public class IcebergTableTest extends HiveMetastoreTest {
);
initializeSnapshots(table, perSnapshotFilesets);
- List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new
IcebergTable(tableId, catalog.newTableOps(tableId),
catalogUri).getAllSnapshotInfosIterator());
+ List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new
IcebergTable(tableId, catalog.newTableOps(tableId),
+ catalogUri, catalog.loadTable(tableId)).getAllSnapshotInfosIterator());
Assert.assertEquals(snapshotInfos.size(), perSnapshotFilesets.size(), "num
snapshots");
for (int i = 0; i < snapshotInfos.size(); ++i) {
@@ -149,7 +158,8 @@ public class IcebergTableTest extends HiveMetastoreTest {
);
initializeSnapshots(table, perSnapshotFilesets);
- List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new
IcebergTable(tableId, catalog.newTableOps(tableId),
catalogUri).getIncrementalSnapshotInfosIterator());
+ List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new
IcebergTable(tableId, catalog.newTableOps(tableId),
+ catalogUri,
catalog.loadTable(tableId)).getIncrementalSnapshotInfosIterator());
Assert.assertEquals(snapshotInfos.size(), perSnapshotFilesets.size(), "num
snapshots");
for (int i = 0; i < snapshotInfos.size(); ++i) {
@@ -169,7 +179,8 @@ public class IcebergTableTest extends HiveMetastoreTest {
);
initializeSnapshots(table, perSnapshotFilesets);
- List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new
IcebergTable(tableId, catalog.newTableOps(tableId),
catalogUri).getIncrementalSnapshotInfosIterator());
+ List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new
IcebergTable(tableId, catalog.newTableOps(tableId),
+ catalogUri,
catalog.loadTable(tableId)).getIncrementalSnapshotInfosIterator());
Assert.assertEquals(snapshotInfos.size(), perSnapshotFilesets.size(), "num
snapshots");
for (int i = 0; i < snapshotInfos.size(); ++i) {
@@ -197,10 +208,11 @@ public class IcebergTableTest extends HiveMetastoreTest {
// Expect existing property values to be deleted if it does not exist on
the source
destTableProperties.put("deletedTableProperty",
"deletedTablePropertyValue");
- TableIdentifier destTableId = TableIdentifier.of(dbName, "destTable");
+ TableIdentifier destTableId = TableIdentifier.of(dbName, destTableName);
catalog.createTable(destTableId, icebergSchema, null, destTableProperties);
- IcebergTable destIcebergTable = new IcebergTable(destTableId,
catalog.newTableOps(destTableId), catalogUri);
+ IcebergTable destIcebergTable = new IcebergTable(destTableId,
catalog.newTableOps(destTableId), catalogUri,
+ catalog.loadTable(destTableId));
// Mock a source table with the same table UUID copying new properties
TableMetadata newSourceTableProperties =
destIcebergTable.accessTableMetadata().replaceProperties(srcTableProperties);
@@ -209,6 +221,86 @@ public class IcebergTableTest extends HiveMetastoreTest {
Assert.assertEquals(destIcebergTable.accessTableMetadata().properties().get("newKey"),
"newValue");
Assert.assertEquals(destIcebergTable.accessTableMetadata().properties().get("testKey"),
"testValueNew");
Assert.assertNull(destIcebergTable.accessTableMetadata().properties().get("deletedTableProperty"));
+
+ catalog.dropTable(destTableId);
+ }
+
+ /** Verify that getPartitionSpecificDataFiles return datafiles belonging to
the partition defined by predicate */
+ @Test
+ public void testGetPartitionSpecificDataFiles() throws IOException {
+ // Note - any specific file path format is not mandatory to be mapped to
specific partition
+ List<String> paths = Arrays.asList(
+ "/path/tableName/data/id=1/file1.orc",
+ "/path/tableName/data/file3.orc",
+ "/path/tableName/data/id=2/file5.orc",
+ "/path/tableName/data/file4.orc",
+ "/path/tableName/data/id=3/file2.orc"
+ );
+ // Using the schema defined in start of this class
+ PartitionData partitionData = new
PartitionData(icebergPartitionSpec.partitionType());
+ partitionData.set(0, "1");
+
+ addPartitionDataFiles(table,
createDataFiles(paths.stream().collect(Collectors.toMap(Function.identity(), v
-> partitionData))));
+
+ IcebergTable icebergTable = new IcebergTable(tableId,
+ catalog.newTableOps(tableId),
+ catalogUri,
+ catalog.loadTable(tableId));
+ // Using AlwaysTrue & AlwaysFalse Predicate to avoid mocking of predicate
class
+ Predicate<StructLike> alwaysTruePredicate = partition -> true;
+ Predicate<StructLike> alwaysFalsePredicate = partition -> false;
+
Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysTruePredicate).size(),
5);
+
Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysFalsePredicate).size(),
0);
+ }
+
+ /** Verify that overwritePartition replace data files belonging to given
partition col and value */
+ @Test
+ public void testOverwritePartition() throws IOException {
+ // Note - any specific file path format is not mandatory to be mapped to
specific partition
+ List<String> paths = Arrays.asList(
+ "/path/tableName/data/id=1/file1.orc",
+ "/path/tableName/data/file2.orc"
+ );
+ // Using the schema defined in start of this class
+ PartitionData partition1Data = new
PartitionData(icebergPartitionSpec.partitionType());
+ partition1Data.set(0, "1");
+
+ addPartitionDataFiles(table,
createDataFiles(paths.stream().collect(Collectors.toMap(Function.identity(), v
-> partition1Data))));
+
+ IcebergTable icebergTable = new IcebergTable(tableId,
+ catalog.newTableOps(tableId),
+ catalogUri,
+ catalog.loadTable(tableId));
+
+ verifyAnyOrder(paths,
icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths
should match");
+
+ List<String> paths2 = Arrays.asList(
+ "/path/tableName/data/file3.orc",
+ "/path/tableName/data/id=2/file4.orc"
+ );
+ // Using the schema defined in start of this class
+ PartitionData partition2Data = new
PartitionData(icebergPartitionSpec.partitionType());
+ partition2Data.set(0, "2");
+
+ List<DataFile> partition2DataFiles =
createDataFiles(paths2.stream().collect(Collectors.toMap(Function.identity(), v
-> partition2Data)));
+ // here, since partition data with value 2 doesn't exist yet,
+ // we expect it to get added to the table, w/o changing or deleting any
other partitions
+ icebergTable.overwritePartition(partition2DataFiles, "id", "2");
+ List<String> expectedPaths2 = new ArrayList<>(paths);
+ expectedPaths2.addAll(paths2);
+ verifyAnyOrder(expectedPaths2,
icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths
should match");
+
+ List<String> paths3 = Arrays.asList(
+ "/path/tableName/data/id=2/file5.orc",
+ "/path/tableName/data/file6.orc"
+ );
+ // Reusing same partition data to create data file with different paths
+ List<DataFile> partition1NewDataFiles =
createDataFiles(paths3.stream().collect(Collectors.toMap(Function.identity(), v
-> partition1Data)));
+ // here, since partition data with value 1 already exists, we expect it to
get updated in the table with newer path
+ icebergTable.overwritePartition(partition1NewDataFiles, "id", "1");
+ List<String> expectedPaths3 = new ArrayList<>(paths2);
+ expectedPaths3.addAll(paths3);
+ verifyAnyOrder(expectedPaths3,
icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths
should match");
}
/** full validation for a particular {@link IcebergSnapshotInfo} */
@@ -333,4 +425,25 @@ public class IcebergTableTest extends HiveMetastoreTest {
protected static <T, C extends Collection<T>> List<T> flatten(Collection<C>
cc) {
return cc.stream().flatMap(x -> x.stream()).collect(Collectors.toList());
}
+
+ private static void addPartitionDataFiles(Table table, List<DataFile>
dataFiles) {
+ dataFiles.forEach(dataFile ->
table.newAppend().appendFile(dataFile).commit());
+ }
+
+ private static List<DataFile> createDataFiles(Map<String, PartitionData>
pathWithPartitionData) {
+ return pathWithPartitionData.entrySet().stream()
+ .map(e -> createDataFileWithPartition(e.getKey(), e.getValue()))
+ .collect(Collectors.toList());
+ }
+
+ private static DataFile createDataFileWithPartition(String path,
PartitionData partitionData) {
+ return DataFiles.builder(icebergPartitionSpec)
+ .withPath(path)
+ .withFileSizeInBytes(8)
+ .withRecordCount(1)
+ .withPartition(partitionData)
+ .withFormat(FileFormat.ORC)
+ .build();
+ }
+
}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergMatchesAnyPropNamePartitionFilterPredicateTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergMatchesAnyPropNamePartitionFilterPredicateTest.java
new file mode 100644
index 0000000000..4eb16500e6
--- /dev/null
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergMatchesAnyPropNamePartitionFilterPredicateTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg.predicates;
+
+import org.apache.iceberg.StructLike;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/** Tests for {@link
org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergMatchesAnyPropNamePartitionFilterPredicate}
*/
+public class IcebergMatchesAnyPropNamePartitionFilterPredicateTest {
+ private static final String TEST_PARTITION_VALUE_1 = "value1";
+ private IcebergMatchesAnyPropNamePartitionFilterPredicate predicate;
+
+ @BeforeMethod
+ public void setup() {
+ predicate = new IcebergMatchesAnyPropNamePartitionFilterPredicate(0,
TEST_PARTITION_VALUE_1);
+ }
+
+ @Test
+ public void testPartitionValueNULL() {
+ // Just mocking, so that the partition value is NULL
+ Assert.assertFalse(predicate.test(Mockito.mock(StructLike.class)));
+ }
+
+ @Test
+ public void testWhenPartitionIsNull() {
+ Assert.assertFalse(predicate.test(null));
+ }
+
+ @Test
+ public void testPartitionValueMatch() {
+ StructLike mockPartition = Mockito.mock(StructLike.class);
+ Mockito.when(mockPartition.get(Mockito.anyInt(),
Mockito.eq(Object.class))).thenReturn("value1");
+ Assert.assertTrue(predicate.test(mockPartition));
+ }
+
+ @Test
+ public void testPartitionValueDoesNotMatch() {
+ StructLike mockPartition = Mockito.mock(StructLike.class);
+ Mockito.when(mockPartition.get(Mockito.anyInt(),
Mockito.eq(Object.class))).thenReturn("<invalid_value>");
+ Assert.assertFalse(predicate.test(mockPartition));
+ }
+}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateUtilTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateUtilTest.java
new file mode 100644
index 0000000000..9743b5ab62
--- /dev/null
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateUtilTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg.predicates;
+
+import java.io.IOException;
+import java.util.List;
+
+import java.util.Optional;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.transforms.Transform;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+
+/** Tests for {@link
org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateUtil}
*/
+public class IcebergPartitionFilterPredicateUtilTest {
+ private static TableMetadata mockTableMetadata;
+ private final List<String> supportedTransforms =
ImmutableList.of("supported1", "supported2");
+
+ @Test
+ public void testPartitionTransformNotSupported() {
+ setupMockData("col1", "unsupported");
+ IOException exception = Assert.expectThrows(IOException.class, () -> {
+ IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex("col1",
mockTableMetadata, supportedTransforms);
+ });
+ Assert.assertTrue(exception.getMessage().contains("Partition transform
unsupported is not supported. Supported transforms are [supported1,
supported2]"));
+ }
+
+ @Test
+ public void testPartitionTransformSupported() throws IOException {
+ setupMockData("col1", "supported1");
+ int result =
+ IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex("col1",
mockTableMetadata, supportedTransforms)
+ .get();
+ Assert.assertEquals(result, 0);
+ }
+
+ @Test
+ public void testPartitionColumnNotFound() throws IOException {
+ setupMockData("col", "supported1");
+ Optional<Integer> result =
IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex("col2",
+ mockTableMetadata, supportedTransforms);
+ Assert.assertFalse(result.isPresent());
+ }
+
+ @Test
+ public void testPartitionColumnFoundIndex1() throws IOException {
+ mockTableMetadata = Mockito.mock(TableMetadata.class);
+ PartitionSpec mockPartitionSpec = Mockito.mock(PartitionSpec.class);
+ PartitionField mockPartitionField1 = Mockito.mock(PartitionField.class);
+ PartitionField mockPartitionField2 = Mockito.mock(PartitionField.class);
+ Transform mockTransform1 = Mockito.mock(Transform.class);
+ Transform mockTransform2 = Mockito.mock(Transform.class);
+
+ List<PartitionField> partitionFields =
ImmutableList.of(mockPartitionField1, mockPartitionField2);
+
+ Mockito.when(mockTableMetadata.spec()).thenReturn(mockPartitionSpec);
+ Mockito.when(mockPartitionSpec.fields()).thenReturn(partitionFields);
+ Mockito.when(mockPartitionField1.name()).thenReturn("col1");
+ Mockito.when(mockPartitionField1.transform()).thenReturn(mockTransform1);
+ Mockito.when(mockTransform1.toString()).thenReturn("supported1");
+ Mockito.when(mockPartitionField2.name()).thenReturn("col2");
+ Mockito.when(mockPartitionField2.transform()).thenReturn(mockTransform2);
+ Mockito.when(mockTransform2.toString()).thenReturn("supported2");
+
+ int result =
+ IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex("col2",
mockTableMetadata, supportedTransforms)
+ .get();
+ Assert.assertEquals(result, 1);
+ }
+
+ private static void setupMockData(String name, String transform) {
+ mockTableMetadata = Mockito.mock(TableMetadata.class);
+
+ PartitionSpec mockPartitionSpec = Mockito.mock(PartitionSpec.class);
+ PartitionField mockPartitionField = Mockito.mock(PartitionField.class);
+ Transform mockTransform = Mockito.mock(Transform.class);
+
+ List<PartitionField> partitionFields =
ImmutableList.of(mockPartitionField);
+
+ Mockito.when(mockTableMetadata.spec()).thenReturn(mockPartitionSpec);
+ Mockito.when(mockPartitionSpec.fields()).thenReturn(partitionFields);
+ Mockito.when(mockPartitionField.name()).thenReturn(name);
+ Mockito.when(mockPartitionField.transform()).thenReturn(mockTransform);
+ Mockito.when(mockTransform.toString()).thenReturn(transform);
+ }
+}
\ No newline at end of file