This is an automated email from the ASF dual-hosted git repository.
abhijain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 7923a96979 Add pre-publish step for IcebergSource and fix other issues
in Iceberg file based copy (#4155)
7923a96979 is described below
commit 7923a96979296bcf2478b258445c9cb14a4bcece
Author: thisisArjit <[email protected]>
AuthorDate: Mon Nov 24 16:51:58 2025 +0530
Add pre-publish step for IcebergSource and fix other issues in Iceberg file
based copy (#4155)
* Workaround for Iceberg file based copy issues
* Add pre-publish step for Iceberg file based copy for file deletion
---
.../copy/iceberg/IcebergFileStreamExtractor.java | 19 +-
.../management/copy/iceberg/IcebergSource.java | 193 +++++++--
.../iceberg/IcebergFileStreamExtractorTest.java | 37 ++
.../copy/iceberg/IcebergSourceDeleteStepTest.java | 468 +++++++++++++++++++++
.../management/copy/iceberg/IcebergSourceTest.java | 119 +++---
5 files changed, 749 insertions(+), 87 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamExtractor.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamExtractor.java
index 4644d6ea94..10cabfc9c4 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamExtractor.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamExtractor.java
@@ -22,6 +22,7 @@ import java.io.InputStream;
import java.net.URI;
import java.util.Collections;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
@@ -44,14 +45,17 @@ import org.apache.gobblin.data.management.copy.CopyableFile;
import org.apache.gobblin.data.management.copy.FileAwareInputStream;
import org.apache.gobblin.source.extractor.filebased.FileBasedExtractor;
import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;
+import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.WriterUtils;
+import org.apache.gobblin.util.filesystem.OwnerAndPermission;
+
/**
* Extractor for file streaming mode that creates FileAwareInputStream for
each file.
- *
+ *
* This extractor is used when {@code iceberg.record.processing.enabled=false}
to stream
* Iceberg table files as binary data to destinations like Azure, HDFS
- *
+ *
* Each "record" is a {@link FileAwareInputStream} representing one file from
* the Iceberg table. The downstream writer handles streaming the file content.
*/
@@ -123,7 +127,7 @@ public class IcebergFileStreamExtractor extends
FileBasedExtractor<String, FileA
// Open source stream using fsHelper
final InputStream inputStream;
try {
- inputStream =
this.getCloser().register(this.getFsHelper().getFileStream(filePath));
+ inputStream = this.getFsHelper().getFileStream(filePath);
} catch (FileBasedHelperException e) {
throw new IOException("Failed to open source stream for: " + filePath,
e);
}
@@ -141,8 +145,15 @@ public class IcebergFileStreamExtractor extends
FileBasedExtractor<String, FileA
}
Path destinationPath = computeDestinationPath(filePath, finalDir,
sourcePath.getName());
+ List<OwnerAndPermission> ancestorOwnerAndPermissionList =
+ CopyableFile.resolveReplicatedOwnerAndPermissionsRecursively(originFs,
+ sourcePath.getParent(), PathUtils.getRootPathChild(sourcePath),
copyConfiguration);
// Build CopyableFile using cached targetFs and copyConfiguration
(initialized once in constructor)
- CopyableFile copyableFile =
CopyableFile.fromOriginAndDestination(originFs, originStatus, destinationPath,
this.copyConfiguration).build();
+ CopyableFile copyableFile =
CopyableFile.fromOriginAndDestination(originFs, originStatus, destinationPath,
this.copyConfiguration)
+ .datasetOutputPath(targetFs.getUri().getPath())
+ .ancestorsOwnerAndPermission(ancestorOwnerAndPermissionList)
+ .build();
+ copyableFile.setFsDatasets(originFs, targetFs);
FileAwareInputStream fileAwareInputStream = FileAwareInputStream.builder()
.file(copyableFile)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java
index 10baa7846c..9987f1d7d1 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java
@@ -24,6 +24,9 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
import com.google.common.base.Optional;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
@@ -56,6 +59,13 @@ import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
+import static
org.apache.gobblin.configuration.ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR;
+import static
org.apache.gobblin.data.management.copy.CopySource.SERIALIZED_COPYABLE_DATASET;
+
+import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
+import org.apache.gobblin.util.commit.DeleteFileCommitStep;
+import org.apache.gobblin.commit.CommitStep;
+
/**
* Unified Iceberg source that supports partition-based data copying from
Iceberg tables.
*
@@ -107,7 +117,6 @@ public class IcebergSource extends FileBasedSource<String,
FileAwareInputStream>
public static final String ICEBERG_RECORD_PROCESSING_ENABLED =
"iceberg.record.processing.enabled";
public static final boolean DEFAULT_RECORD_PROCESSING_ENABLED = false;
public static final String ICEBERG_FILES_PER_WORKUNIT =
"iceberg.files.per.workunit";
- public static final int DEFAULT_FILES_PER_WORKUNIT = 10;
public static final String ICEBERG_FILTER_ENABLED = "iceberg.filter.enabled";
public static final String ICEBERG_FILTER_DATE = "iceberg.filter.date"; //
Date value (e.g., 2025-04-01 or CURRENT_DATE)
public static final String ICEBERG_LOOKBACK_DAYS = "iceberg.lookback.days";
@@ -123,6 +132,10 @@ public class IcebergSource extends FileBasedSource<String,
FileAwareInputStream>
private static final String HOURLY_PARTITION_SUFFIX = "-00";
private static final String WORK_UNIT_WEIGHT = "iceberg.workUnitWeight";
+ // Delete configuration - similar to RecursiveCopyableDataset
+ public static final String DELETE_FILES_NOT_IN_SOURCE =
"iceberg.copy.delete";
+ public static final boolean DEFAULT_DELETE_FILES_NOT_IN_SOURCE = true;
+
private Optional<LineageInfo> lineageInfo;
private final WorkUnitWeighter weighter = new
FieldWeighter(WORK_UNIT_WEIGHT);
@@ -262,7 +275,7 @@ public class IcebergSource extends FileBasedSource<String,
FileAwareInputStream>
}
String datePartitionColumn = state.getProp(ICEBERG_PARTITION_COLUMN,
DEFAULT_DATE_PARTITION_COLUMN);
-
+
String dateValue = state.getProp(ICEBERG_FILTER_DATE);
Preconditions.checkArgument(!StringUtils.isBlank(dateValue),
"iceberg.filter.date is required when iceberg.filter.enabled=true");
@@ -281,10 +294,10 @@ public class IcebergSource extends
FileBasedSource<String, FileAwareInputStream>
if (lookbackDays >= 1) {
log.info("Applying lookback period of {} days for date partition column
'{}': {}", lookbackDays, datePartitionColumn, dateValue);
-
+
// Check if hourly partitioning is enabled
boolean isHourlyPartition =
state.getPropAsBoolean(ICEBERG_HOURLY_PARTITION_ENABLED,
DEFAULT_HOURLY_PARTITION_ENABLED);
-
+
// Parse the date in yyyy-MM-dd format
LocalDate start;
try {
@@ -296,7 +309,7 @@ public class IcebergSource extends FileBasedSource<String,
FileAwareInputStream>
log.error(errorMsg);
throw new IllegalArgumentException(errorMsg, e);
}
-
+
for (int i = 0; i < lookbackDays; i++) {
String dateOnly = start.minusDays(i).toString();
// Append hour suffix if hourly partitioning is enabled
@@ -340,7 +353,8 @@ public class IcebergSource extends FileBasedSource<String,
FileAwareInputStream>
* @param table the Iceberg table being copied
* @return list of work units ready for parallel execution
*/
- private List<WorkUnit>
createWorkUnitsFromFiles(List<IcebergTable.FilePathWithPartition>
filesWithPartitions, SourceState state, IcebergTable table) {
+ private List<WorkUnit> createWorkUnitsFromFiles(
+ List<IcebergTable.FilePathWithPartition> filesWithPartitions,
SourceState state, IcebergTable table) throws IOException {
List<WorkUnit> workUnits = Lists.newArrayList();
if (filesWithPartitions.isEmpty()) {
@@ -353,40 +367,32 @@ public class IcebergSource extends
FileBasedSource<String, FileAwareInputStream>
String tableName = table.getTableId().name();
Extract extract = new Extract(Extract.TableType.SNAPSHOT_ONLY, nameSpace,
tableName);
- int filesPerWorkUnit = state.getPropAsInt(ICEBERG_FILES_PER_WORKUNIT,
DEFAULT_FILES_PER_WORKUNIT);
- List<List<IcebergTable.FilePathWithPartition>> groups =
Lists.partition(filesWithPartitions, Math.max(1, filesPerWorkUnit));
- log.info("Grouping {} files into {} work units ({} files per work unit)",
- filesWithPartitions.size(), groups.size(), filesPerWorkUnit);
+ String datasetUrn = table.getTableId().toString();
+ long totalSize = 0L;
- for (int i = 0; i < groups.size(); i++) {
- List<IcebergTable.FilePathWithPartition> group = groups.get(i);
+ for (IcebergTable.FilePathWithPartition fileWithPartition :
filesWithPartitions) {
WorkUnit workUnit = new WorkUnit(extract);
+ String filePath = fileWithPartition.getFilePath();
+ totalSize += fileWithPartition.getFileSize();
- // Store data file paths and their partition metadata separately
- // Note: Only data files (parquet/orc/avro) are included, no Iceberg
metadata files
- List<String> filePaths = Lists.newArrayList();
+ // Store partition path for each file
Map<String, String> fileToPartitionPath = Maps.newHashMap();
- long totalSize = 0L;
-
- for (IcebergTable.FilePathWithPartition fileWithPartition : group) {
- String filePath = fileWithPartition.getFilePath();
- filePaths.add(filePath);
- // Store partition path for each file
- fileToPartitionPath.put(filePath,
fileWithPartition.getPartitionPath());
- // Accumulate file sizes for work unit weight
- totalSize += fileWithPartition.getFileSize();
- }
+ fileToPartitionPath.put(filePath, fileWithPartition.getPartitionPath());
+ workUnit.setProp(ConfigurationKeys.DATASET_URN_KEY, datasetUrn);
- workUnit.setProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL,
String.join(",", filePaths));
+ workUnit.setProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL,
filePath);
+ // Serialized copyable dataset / file is not required during work unit
generation step.
+ // Copyable file is created during process work unit step
(IcebergFileStreamExtractor)
+ workUnit.setProp(SERIALIZED_COPYABLE_DATASET, "{}");
// Store partition path mapping as JSON for extractor to use
workUnit.setProp(ICEBERG_FILE_PARTITION_PATH, new
com.google.gson.Gson().toJson(fileToPartitionPath));
// Set work unit size for dynamic scaling (instead of just file count)
- workUnit.setProp(ServiceConfigKeys.WORK_UNIT_SIZE, totalSize);
+ workUnit.setProp(ServiceConfigKeys.WORK_UNIT_SIZE,
fileWithPartition.getFileSize());
// Set work unit weight for bin packing
- setWorkUnitWeight(workUnit, totalSize);
+ setWorkUnitWeight(workUnit, fileWithPartition.getFileSize());
// Carry partition info to extractor for destination path mapping
if (state.contains(ICEBERG_PARTITION_KEY)) {
@@ -399,13 +405,140 @@ public class IcebergSource extends
FileBasedSource<String, FileAwareInputStream>
// Add lineage information for data governance and tracking
addLineageSourceInfo(state, workUnit, table);
workUnits.add(workUnit);
-
- log.info("Created work unit {} with {} files, total size: {} bytes", i,
group.size(), totalSize);
}
+ log.info("Created {} work unit(s), total size: {} bytes",
workUnits.size(), totalSize);
+
+ // Add delete step to overwrite partitions
+ addDeleteStepIfNeeded(state, workUnits, extract, datasetUrn);
return workUnits;
}
+ /**
+ * Creates a PrePublishStep with DeleteFileCommitStep to delete ALL files in
impacted directories.
+ * This enables complete partition rewrites - all existing files in target
partitions are deleted
+ * before new files are copied from source.
+ *
+ * Execution Order:
+ * 1. Source Phase (this method): Identifies directories to delete and
creates PrePublishStep
+ * 2. Task Execution: Files are copied from source to staging directory
+ * 3. Publisher Phase - PrePublishStep: Deletes ALL files from target
directories (BEFORE rename)
+ * 4. Publisher Phase - Rename: Moves files from staging to final target
location
+ *
+ * Behavior:
+ * 1. If filter is enabled (iceberg.filter.enabled=true): Deletes ALL files
in specific partition
+ * directories based on source partition values. For example, if source
has partitions 2025-10-11,
+ * 2025-10-10, 2025-10-09, it will delete ALL files in those partition
directories in target.
+ * 2. If filter is disabled (iceberg.filter.enabled=false): Deletes ALL
files in the entire root directory.
+ * 3. No file comparison - this is a complete rewrite of the impacted
directories.
+ */
+ private void addDeleteStepIfNeeded(SourceState state, List<WorkUnit>
workUnits, Extract extract, String datasetUrn) throws IOException {
+ boolean deleteEnabled = state.getPropAsBoolean(DELETE_FILES_NOT_IN_SOURCE,
DEFAULT_DELETE_FILES_NOT_IN_SOURCE);
+ if (!deleteEnabled || workUnits.isEmpty()) {
+ log.info("Delete not enabled or no work units created, skipping delete
step");
+ return;
+ }
+
+ // Get target filesystem and directory
+ String targetRootDir = state.getProp(DATA_PUBLISHER_FINAL_DIR);
+ if (targetRootDir == null) {
+ log.warn("DATA_PUBLISHER_FINAL_DIR not configured, cannot determine
directories to delete");
+ return;
+ }
+
+ try {
+ FileSystem targetFs = HadoopUtils.getWriterFileSystem(state, 1, 0);
+ Path targetRootPath = new Path(targetRootDir);
+
+ if (!targetFs.exists(targetRootPath)) {
+ log.info("Target directory {} does not exist, no directories to
delete", targetRootPath);
+ return;
+ }
+
+ // Determine which directories to delete based on filter configuration
+ List<Path> directoriesToDelete = Lists.newArrayList();
+ boolean filterEnabled = state.getPropAsBoolean(ICEBERG_FILTER_ENABLED,
true);
+
+ if (!filterEnabled) {
+ // No filter: Delete entire root directory to rewrite all data
+ log.info("Filter disabled - will delete entire root directory: {}",
targetRootPath);
+ directoriesToDelete.add(targetRootPath);
+ } else {
+ // Filter enabled: Delete only specific partition directories
+ String partitionColumn = state.getProp(ICEBERG_PARTITION_KEY);
+ String partitionValuesStr = state.getProp(ICEBERG_PARTITION_VALUES);
+
+ if (partitionColumn == null || partitionValuesStr == null) {
+ log.warn("Partition key or values not found in state, cannot
determine partition directories to delete");
+ return;
+ }
+
+ // Parse partition values (comma-separated list from lookback
calculation)
+ // These values already include hourly suffix if applicable (e.g.,
"2025-10-11-00,2025-10-10-00,2025-10-09-00")
+ String[] values = partitionValuesStr.split(",");
+ log.info("Filter enabled - will delete {} partition directories for
{}={}",
+ values.length, partitionColumn, partitionValuesStr);
+
+ // Collect partition directories to delete
+ for (String value : values) {
+ String trimmedValue = value.trim();
+ // Construct partition directory path:
targetRoot/partitionColumn=value/
+ // Example: /root/datepartition=2025-10-11-00/
+ Path partitionDir = new Path(targetRootPath, partitionColumn + "=" +
trimmedValue);
+
+ if (targetFs.exists(partitionDir)) {
+ log.info("Found partition directory to delete: {}", partitionDir);
+ directoriesToDelete.add(partitionDir);
+ } else {
+ log.info("Partition directory does not exist in target: {}",
partitionDir);
+ }
+ }
+ }
+
+ if (directoriesToDelete.isEmpty()) {
+ log.info("No directories to delete in target directory {}",
targetRootPath);
+ return;
+ }
+
+ // Delete directories (and all their contents) for complete overwrite
+ // DeleteFileCommitStep will recursively delete all files within these
directories
+ log.info("Will delete {} for complete overwrite",
directoriesToDelete.size());
+
+ // Log directories to be deleted
+ for (Path dir : directoriesToDelete) {
+ log.info("Will delete directory: {}", dir);
+ }
+
+ // Create DeleteFileCommitStep to delete directories recursively
+ // Note: deleteEmptyDirs is not needed since we're deleting entire
directories
+ CommitStep deleteStep = DeleteFileCommitStep.fromPaths(targetFs,
directoriesToDelete, state.getProperties());
+
+ // Create a dedicated work unit for the delete step
+ WorkUnit deleteWorkUnit = new WorkUnit(extract);
+ deleteWorkUnit.addAll(state);
+
+ // Set properties so extractor knows this is a delete-only work unit (no
files to copy)
+ deleteWorkUnit.setProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL,
"");
+ deleteWorkUnit.setProp(SERIALIZED_COPYABLE_DATASET, "{}");
+ deleteWorkUnit.setProp(ConfigurationKeys.DATASET_URN_KEY, datasetUrn);
+ deleteWorkUnit.setProp(ICEBERG_FILE_PARTITION_PATH, "{}");
+ setWorkUnitWeight(deleteWorkUnit, 0);
+
+ // Use PrePublishStep to delete BEFORE copying new files
+ PrePublishStep prePublishStep = new PrePublishStep(datasetUrn,
Maps.newHashMap(), deleteStep, 0);
+
+ // Serialize the PrePublishStep as a CopyEntity
+ CopySource.serializeCopyEntity(deleteWorkUnit, prePublishStep);
+ workUnits.add(deleteWorkUnit);
+
+ log.info("Added PrePublishStep with DeleteFileCommitStep to work units");
+
+ } catch (Exception e) {
+ log.error("Failed to create delete step", e);
+ throw new IOException("Failed to create delete step", e);
+ }
+ }
+
/**
* Create catalog using existing IcebergDatasetFinder logic
*/
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamExtractorTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamExtractorTest.java
index ef885bec18..2963bc8360 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamExtractorTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamExtractorTest.java
@@ -325,6 +325,43 @@ public class IcebergFileStreamExtractorTest {
}
}
+ @Test
+ public void testCopyableFileHasRequiredFieldsPopulated() throws Exception {
+ // Test that CopyableFile has all required fields populated for publish
phase
+ Iterator<FileAwareInputStream> iterator =
extractor.downloadFile(testFile.getAbsolutePath());
+ FileAwareInputStream fais = iterator.next();
+
+ org.apache.gobblin.data.management.copy.CopyableFile copyableFile =
fais.getFile();
+
+ // Verify destinationData is populated
+ org.apache.gobblin.dataset.Descriptor destinationData =
+ copyableFile.getDestinationData();
+ Assert.assertNotNull(destinationData,
+ "CopyableFile destinationData should be populated");
+
+ // Verify datasetOutputPath is populated
+ String datasetOutputPath = copyableFile.getDatasetOutputPath();
+ Assert.assertNotNull(datasetOutputPath,
+ "CopyableFile datasetOutputPath should be populated");
+ Assert.assertFalse(datasetOutputPath.isEmpty(),
+ "CopyableFile datasetOutputPath should not be empty");
+
+ // Verify ancestorsOwnerAndPermission is populated
+ java.util.List<org.apache.gobblin.util.filesystem.OwnerAndPermission>
ancestorsOwnerAndPermission =
+ copyableFile.getAncestorsOwnerAndPermission();
+ Assert.assertNotNull(ancestorsOwnerAndPermission,
+ "CopyableFile ancestorsOwnerAndPermission should be populated");
+ // ancestorsOwnerAndPermission may be empty list if no ancestors need
permission preservation,
+ // but it should not be null
+
+ // Additional verification: ensure origin and destination paths are set
+ // This is important for the publish phase
+ Assert.assertNotNull(copyableFile.getOrigin(),
+ "CopyableFile should have origin FileStatus");
+ Assert.assertNotNull(copyableFile.getDestination(),
+ "CopyableFile should have destination Path");
+ }
+
private void deleteDirectory(File directory) {
File[] files = directory.listFiles();
if (files != null) {
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSourceDeleteStepTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSourceDeleteStepTest.java
new file mode 100644
index 0000000000..c49c6e6331
--- /dev/null
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSourceDeleteStepTest.java
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.data.management.copy.CopySource;
+import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
+import org.apache.gobblin.source.workunit.Extract;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+/**
+ * Unit tests for the delete step functionality in {@link IcebergSource}.
+ * Tests the addDeleteStepIfNeeded method with various configurations.
+ *
+ * Note: The delete feature performs complete directory deletion (not
individual file deletion).
+ * When enabled, entire partition directories are deleted for a clean
overwrite, regardless of
+ * which files exist in the source.
+ */
+public class IcebergSourceDeleteStepTest {
+
+ private File tempDir;
+ private FileSystem localFs;
+ private SourceState state;
+ private IcebergSource source;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ // Create temporary directory for testing
+ tempDir = Files.createTempDir();
+ tempDir.deleteOnExit();
+
+ // Get local filesystem
+ Configuration conf = new Configuration();
+ localFs = FileSystem.getLocal(conf);
+
+ // Initialize source state
+ state = new SourceState();
+ state.setProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, "file:///");
+ state.setProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR,
tempDir.getAbsolutePath());
+
+ // Initialize IcebergSource
+ source = new IcebergSource();
+ }
+
+ @AfterMethod
+ public void tearDown() throws Exception {
+ if (tempDir != null && tempDir.exists()) {
+ deleteDirectory(tempDir);
+ }
+ }
+
+ /**
+ * Test: Delete is disabled - should not create delete work unit
+ */
+ @Test
+ public void testDeleteDisabled() throws Exception {
+ // Configure: delete disabled
+ state.setProp(IcebergSource.DELETE_FILES_NOT_IN_SOURCE, false);
+
+ List<WorkUnit> workUnits = Lists.newArrayList();
+ Extract extract = new Extract(Extract.TableType.SNAPSHOT_ONLY, "test",
"test");
+
+ // Invoke private method using reflection
+ invokeAddDeleteStepIfNeeded(workUnits, extract);
+
+ // Assert: No delete work unit should be added
+ Assert.assertEquals(workUnits.size(), 0, "No work unit should be added
when delete is disabled");
+ }
+
+ /**
+ * Test: Delete enabled but no work units - should not create delete work
unit
+ */
+ @Test
+ public void testDeleteEnabledButNoWorkUnits() throws Exception {
+ // Configure: delete enabled
+ state.setProp(IcebergSource.DELETE_FILES_NOT_IN_SOURCE, true);
+
+ List<WorkUnit> workUnits = Lists.newArrayList(); // Empty work units
+ Extract extract = new Extract(Extract.TableType.SNAPSHOT_ONLY, "test",
"test");
+
+ // Invoke private method using reflection
+ invokeAddDeleteStepIfNeeded(workUnits, extract);
+
+ // Assert: No delete work unit should be added
+ Assert.assertEquals(workUnits.size(), 0, "No work unit should be added
when there are no existing work units");
+ }
+
+ /**
+ * Test: Delete enabled, filter disabled - should delete entire root
directory
+ */
+ @Test
+ public void testDeleteEntireRootDirectory() throws Exception {
+ // Create test files in root directory
+ createTestFile("file1.parquet");
+ createTestFile("partition1/file2.parquet");
+ createTestFile("partition2/file3.parquet");
+
+ // Configure: delete enabled, filter disabled
+ state.setProp(IcebergSource.DELETE_FILES_NOT_IN_SOURCE, true);
+ state.setProp(IcebergSource.ICEBERG_FILTER_ENABLED, false);
+
+ List<WorkUnit> workUnits = Lists.newArrayList();
+ workUnits.add(createDummyWorkUnit()); // Add at least one work unit
+ Extract extract = new Extract(Extract.TableType.SNAPSHOT_ONLY, "test",
"test");
+
+ // Invoke private method using reflection
+ invokeAddDeleteStepIfNeeded(workUnits, extract);
+
+ // Assert: One delete work unit should be added
+ Assert.assertEquals(workUnits.size(), 2, "One delete work unit should be
added");
+
+ // Verify the delete work unit
+ WorkUnit deleteWorkUnit = workUnits.get(1);
+
Assert.assertEquals(deleteWorkUnit.getProp(ConfigurationKeys.DATASET_URN_KEY),
"datasetUrn");
+
Assert.assertEquals(deleteWorkUnit.getProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL),
"");
+
+ // Verify it's a PrePublishStep
+ Class<?> entityClass = CopySource.getCopyEntityClass(deleteWorkUnit);
+ Assert.assertEquals(entityClass, PrePublishStep.class);
+
+ // Verify entire root directory is being deleted (not individual files)
+ List<String> directoriesToDelete =
getFilesToDeleteFromWorkUnit(deleteWorkUnit);
+ Assert.assertEquals(directoriesToDelete.size(), 1, "Should delete 1
directory (root directory)");
+
+ // Verify the root directory path is in the delete list
+ assertContainsPartition(directoriesToDelete, tempDir.getAbsolutePath());
+ }
+
+ /**
+ * Test: Delete enabled, filter enabled with single partition - should
delete specific partition directory
+ */
+ @Test
+ public void testDeleteSinglePartitionDirectory() throws Exception {
+ // Create test files in partition directories
+ createTestFile("datepartition=2025-10-11/file1.parquet");
+ createTestFile("datepartition=2025-10-11/file2.parquet");
+ createTestFile("datepartition=2025-10-10/file3.parquet");
+ createTestFile("datepartition=2025-10-09/file4.parquet");
+
+ // Configure: delete enabled, filter enabled for single partition
+ state.setProp(IcebergSource.DELETE_FILES_NOT_IN_SOURCE, true);
+ state.setProp(IcebergSource.ICEBERG_FILTER_ENABLED, true);
+ state.setProp(IcebergSource.ICEBERG_PARTITION_KEY, "datepartition");
+ state.setProp(IcebergSource.ICEBERG_PARTITION_VALUES, "2025-10-11");
+
+ List<WorkUnit> workUnits = Lists.newArrayList();
+ workUnits.add(createDummyWorkUnit());
+ Extract extract = new Extract(Extract.TableType.SNAPSHOT_ONLY, "test",
"test");
+
+ // Invoke private method using reflection
+ invokeAddDeleteStepIfNeeded(workUnits, extract);
+
+ // Assert: One delete work unit should be added
+ Assert.assertEquals(workUnits.size(), 2, "One delete work unit should be
added");
+
+ // Verify the delete work unit
+ WorkUnit deleteWorkUnit = workUnits.get(1);
+
Assert.assertEquals(deleteWorkUnit.getProp(ConfigurationKeys.DATASET_URN_KEY),
"datasetUrn");
+
+ // Verify correct partition directory is being deleted
+ List<String> directoriesToDelete =
getFilesToDeleteFromWorkUnit(deleteWorkUnit);
+ Assert.assertEquals(directoriesToDelete.size(), 1, "Should delete 1
partition directory");
+
+ // Verify only 2025-10-11 partition directory is included
+ assertContainsPartition(directoriesToDelete, "datepartition=2025-10-11");
+
+ // Verify other partition directories are NOT included
+ assertNotContainsPartition(directoriesToDelete,
"datepartition=2025-10-10");
+ assertNotContainsPartition(directoriesToDelete,
"datepartition=2025-10-09");
+ }
+
+ /**
+ * Test: Delete enabled, filter enabled with multiple partitions (lookback)
- should delete multiple partition directories
+ */
+ @Test
+ public void testDeleteMultiplePartitionDirectories() throws Exception {
+ // Create test files in partition directories
+ createTestFile("datepartition=2025-10-11/file1.parquet");
+ createTestFile("datepartition=2025-10-11/file2.parquet");
+ createTestFile("datepartition=2025-10-10/file3.parquet");
+ createTestFile("datepartition=2025-10-09/file4.parquet");
+ createTestFile("datepartition=2025-10-08/file5.parquet"); // Should not be
deleted
+
+ // Configure: delete enabled, filter enabled with lookback (3 days)
+ state.setProp(IcebergSource.DELETE_FILES_NOT_IN_SOURCE, true);
+ state.setProp(IcebergSource.ICEBERG_FILTER_ENABLED, true);
+ state.setProp(IcebergSource.ICEBERG_PARTITION_KEY, "datepartition");
+ state.setProp(IcebergSource.ICEBERG_PARTITION_VALUES,
"2025-10-11,2025-10-10,2025-10-09");
+
+ List<WorkUnit> workUnits = Lists.newArrayList();
+ workUnits.add(createDummyWorkUnit());
+ Extract extract = new Extract(Extract.TableType.SNAPSHOT_ONLY, "test",
"test");
+
+ // Invoke private method using reflection
+ invokeAddDeleteStepIfNeeded(workUnits, extract);
+
+ // Assert: One delete work unit should be added
+ Assert.assertEquals(workUnits.size(), 2, "One delete work unit should be
added");
+
+ // Verify the delete work unit contains correct partition directories
+ WorkUnit deleteWorkUnit = workUnits.get(1);
+ List<String> directoriesToDelete =
getFilesToDeleteFromWorkUnit(deleteWorkUnit);
+
+ // We expect 3 partition directories to be deleted
+ Assert.assertEquals(directoriesToDelete.size(), 3, "Should delete 3
partition directories");
+
+ // Verify correct partition directories are included
+ assertContainsPartition(directoriesToDelete, "datepartition=2025-10-11");
+ assertContainsPartition(directoriesToDelete, "datepartition=2025-10-10");
+ assertContainsPartition(directoriesToDelete, "datepartition=2025-10-09");
+
+ // Verify 2025-10-08 partition directory is NOT included
+ assertNotContainsPartition(directoriesToDelete,
"datepartition=2025-10-08");
+ }
+
+ /**
+ * Test: Delete enabled with hourly partitions - should handle hourly suffix
correctly
+ */
+ @Test
+ public void testDeleteHourlyPartitions() throws Exception {
+ // Create test files in hourly partition directories
+ createTestFile("datepartition=2025-10-11-00/file1.parquet");
+ createTestFile("datepartition=2025-10-11-00/file2.parquet");
+ createTestFile("datepartition=2025-10-10-00/file3.parquet");
+
+ // Configure: delete enabled with hourly partitions
+ state.setProp(IcebergSource.DELETE_FILES_NOT_IN_SOURCE, true);
+ state.setProp(IcebergSource.ICEBERG_FILTER_ENABLED, true);
+ state.setProp(IcebergSource.ICEBERG_PARTITION_KEY, "datepartition");
+ state.setProp(IcebergSource.ICEBERG_PARTITION_VALUES,
"2025-10-11-00,2025-10-10-00");
+ state.setProp(IcebergSource.ICEBERG_HOURLY_PARTITION_ENABLED, true);
+
+ List<WorkUnit> workUnits = Lists.newArrayList();
+ workUnits.add(createDummyWorkUnit());
+ Extract extract = new Extract(Extract.TableType.SNAPSHOT_ONLY, "test",
"test");
+
+ // Invoke private method using reflection
+ invokeAddDeleteStepIfNeeded(workUnits, extract);
+
+ // Assert: One delete work unit should be added
+ Assert.assertEquals(workUnits.size(), 2, "One delete work unit should be
added for hourly partitions");
+
+ // Verify correct hourly partition directories are being deleted
+ WorkUnit deleteWorkUnit = workUnits.get(1);
+ List<String> directoriesToDelete =
getFilesToDeleteFromWorkUnit(deleteWorkUnit);
+ Assert.assertEquals(directoriesToDelete.size(), 2, "Should delete 2 hourly
partition directories");
+
+ // Verify correct hourly partition directories are included
+ assertContainsPartition(directoriesToDelete,
"datepartition=2025-10-11-00");
+ assertContainsPartition(directoriesToDelete,
"datepartition=2025-10-10-00");
+ }
+
+ /**
+ * Test: Delete enabled but target directory doesn't exist - should not
create delete work unit
+ */
+ @Test
+ public void testDeleteTargetDirectoryNotExists() throws Exception {
+ // Configure: delete enabled but point to non-existent directory
+ state.setProp(IcebergSource.DELETE_FILES_NOT_IN_SOURCE, true);
+ state.setProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR,
"/non/existent/path");
+
+ List<WorkUnit> workUnits = Lists.newArrayList();
+ workUnits.add(createDummyWorkUnit());
+ Extract extract = new Extract(Extract.TableType.SNAPSHOT_ONLY, "test",
"test");
+
+ // Invoke private method using reflection
+ invokeAddDeleteStepIfNeeded(workUnits, extract);
+
+ // Assert: No delete work unit should be added (only the dummy work unit
remains)
+ Assert.assertEquals(workUnits.size(), 1, "No delete work unit should be
added when target doesn't exist");
+ }
+
+ /**
+ * Test: Delete enabled but partition directory doesn't exist - should
handle gracefully
+ */
+ @Test
+ public void testDeletePartitionDirectoryNotExists() throws Exception {
+ // Don't create any files - partition directories don't exist
+
+ // Configure: delete enabled, filter enabled
+ state.setProp(IcebergSource.DELETE_FILES_NOT_IN_SOURCE, true);
+ state.setProp(IcebergSource.ICEBERG_FILTER_ENABLED, true);
+ state.setProp(IcebergSource.ICEBERG_PARTITION_KEY, "datepartition");
+ state.setProp(IcebergSource.ICEBERG_PARTITION_VALUES, "2025-10-11");
+
+ List<WorkUnit> workUnits = Lists.newArrayList();
+ workUnits.add(createDummyWorkUnit());
+ Extract extract = new Extract(Extract.TableType.SNAPSHOT_ONLY, "test",
"test");
+
+ // Invoke private method using reflection
+ invokeAddDeleteStepIfNeeded(workUnits, extract);
+
+ // Assert: No delete work unit should be added when no files exist to
delete
+ Assert.assertEquals(workUnits.size(), 1, "No delete work unit should be
added when partition directory doesn't exist");
+ }
+
+ /**
+ * Test: Missing partition key in state - should handle gracefully
+ */
+ @Test
+ public void testDeleteMissingPartitionKey() throws Exception {
+ // Configure: delete enabled but missing partition key
+ state.setProp(IcebergSource.DELETE_FILES_NOT_IN_SOURCE, true);
+ state.setProp(IcebergSource.ICEBERG_FILTER_ENABLED, true);
+ // Missing: ICEBERG_PARTITION_KEY and ICEBERG_PARTITION_VALUES
+
+ List<WorkUnit> workUnits = Lists.newArrayList();
+ workUnits.add(createDummyWorkUnit());
+ Extract extract = new Extract(Extract.TableType.SNAPSHOT_ONLY, "test",
"test");
+
+ // Invoke private method using reflection
+ invokeAddDeleteStepIfNeeded(workUnits, extract);
+
+ // Assert: No delete work unit should be added when partition info is
missing
+ Assert.assertEquals(workUnits.size(), 1, "No delete work unit should be
added when partition info is missing");
+ }
+
+ /**
+ * Test: Missing DATA_PUBLISHER_FINAL_DIR - should handle gracefully and not
create delete work unit
+ */
+ @Test
+ public void testDeleteMissingDataPublisherFinalDir() throws Exception {
+ // Configure: delete enabled but DATA_PUBLISHER_FINAL_DIR is not set
+ state.setProp(IcebergSource.DELETE_FILES_NOT_IN_SOURCE, true);
+ state.setProp(IcebergSource.ICEBERG_FILTER_ENABLED, true);
+ state.setProp(IcebergSource.ICEBERG_PARTITION_KEY, "datepartition");
+ state.setProp(IcebergSource.ICEBERG_PARTITION_VALUES, "2025-10-11");
+ // Remove DATA_PUBLISHER_FINAL_DIR
+ state.removeProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR);
+
+ List<WorkUnit> workUnits = Lists.newArrayList();
+ workUnits.add(createDummyWorkUnit());
+ Extract extract = new Extract(Extract.TableType.SNAPSHOT_ONLY, "test",
"test");
+
+ // Invoke private method using reflection
+ invokeAddDeleteStepIfNeeded(workUnits, extract);
+
+ // Assert: No delete work unit should be added when
DATA_PUBLISHER_FINAL_DIR is missing
+ Assert.assertEquals(workUnits.size(), 1,
+ "No delete work unit should be added when DATA_PUBLISHER_FINAL_DIR is
not configured");
+ }
+
+ // ==================== Helper Methods ====================
+
+ /**
+ * Invoke the private addDeleteStepIfNeeded method using reflection
+ */
+ private void invokeAddDeleteStepIfNeeded(List<WorkUnit> workUnits, Extract
extract) throws Exception {
+ Method method =
IcebergSource.class.getDeclaredMethod("addDeleteStepIfNeeded",
+ SourceState.class,
+ List.class,
+ Extract.class,
+ String.class);
+ method.setAccessible(true);
+ method.invoke(source, state, workUnits, extract, "datasetUrn");
+ }
+
+ /**
+ * Create a test file in the temp directory
+ */
+ private void createTestFile(String relativePath) throws IOException {
+ File file = new File(tempDir, relativePath);
+ file.getParentFile().mkdirs();
+ file.createNewFile();
+ }
+
+ /**
+ * Create a dummy work unit for testing
+ */
+ private WorkUnit createDummyWorkUnit() {
+ Extract extract = new Extract(Extract.TableType.SNAPSHOT_ONLY, "test",
"test");
+ WorkUnit workUnit = new WorkUnit(extract);
+ workUnit.setProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL,
"dummy.parquet");
+ return workUnit;
+ }
+
+ /**
+ * Extract file paths from DeleteFileCommitStep using reflection
+ */
+ private List<String> getFilesToDeleteFromWorkUnit(WorkUnit deleteWorkUnit)
throws Exception {
+ PrePublishStep prePublishStep = (PrePublishStep)
CopySource.deserializeCopyEntity(deleteWorkUnit);
+ Object deleteStep = prePublishStep.getStep();
+
+ // Use reflection to access pathsToDelete field from DeleteFileCommitStep
+ java.lang.reflect.Field field =
deleteStep.getClass().getDeclaredField("pathsToDelete");
+ field.setAccessible(true);
+
+ @SuppressWarnings("unchecked")
+ java.util.Collection<org.apache.hadoop.fs.FileStatus> pathsToDelete =
+ (java.util.Collection<org.apache.hadoop.fs.FileStatus>)
field.get(deleteStep);
+
+ List<String> filePaths = Lists.newArrayList();
+ for (org.apache.hadoop.fs.FileStatus status : pathsToDelete) {
+ filePaths.add(status.getPath().toString());
+ }
+ return filePaths;
+ }
+
+ /**
+ * Verify that a file path contains the expected partition directory
+ */
+ private void assertContainsPartition(List<String> filePaths, String
partitionDir) {
+ boolean found = false;
+ for (String path : filePaths) {
+ if (path.contains(partitionDir)) {
+ found = true;
+ break;
+ }
+ }
+ Assert.assertTrue(found, "Expected to find partition directory: " +
partitionDir + " in paths: " + filePaths);
+ }
+
+ /**
+ * Verify that a file path does NOT contain the expected partition directory
+ */
+ private void assertNotContainsPartition(List<String> filePaths, String
partitionDir) {
+ for (String path : filePaths) {
+ Assert.assertFalse(path.contains(partitionDir),
+ "Should NOT find partition directory: " + partitionDir + " but found
in: " + path);
+ }
+ }
+
+ /**
+ * Recursively delete a directory
+ */
+ private void deleteDirectory(File directory) {
+ if (directory.isDirectory()) {
+ File[] files = directory.listFiles();
+ if (files != null) {
+ for (File file : files) {
+ deleteDirectory(file);
+ }
+ }
+ }
+ directory.delete();
+ }
+}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSourceTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSourceTest.java
index 248108c0c8..a8c8444d02 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSourceTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSourceTest.java
@@ -19,9 +19,13 @@ package org.apache.gobblin.data.management.copy.iceberg;
import java.lang.reflect.Method;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Properties;
+import java.util.Set;
import java.util.stream.Collectors;
import com.google.common.collect.Lists;
@@ -54,22 +58,25 @@ import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.hive.HiveMetastoreTest;
import org.apache.iceberg.shaded.org.apache.avro.SchemaBuilder;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Ignore;
import org.testng.annotations.Test;
-import static org.mockito.Mockito.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
-import java.util.HashMap;
-import java.util.Map;
/**
* Unit tests for {@link IcebergSource}.
*/
+@RunWith(Enclosed.class)
public class IcebergSourceTest {
@Mock
@@ -152,17 +159,17 @@ public class IcebergSourceTest {
m.setAccessible(true);
List<WorkUnit> workUnits = (List<WorkUnit>) m.invoke(icebergSource,
filesWithPartitions, sourceState, mockTable);
- // Verify single work unit contains all 3 data files by default
(filesPerWorkUnit default=10)
- Assert.assertEquals(workUnits.size(), 1, "Should create 1 work unit");
- WorkUnit wu = workUnits.get(0);
- String filesToPull =
wu.getProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL);
- Assert.assertNotNull(filesToPull);
- Assert.assertEquals(filesToPull.split(",").length, 3);
-
- // Verify extract info
- Assert.assertEquals(wu.getExtract().getNamespace(), "iceberg");
- Assert.assertEquals(wu.getExtract().getTable(), "test_table");
- Assert.assertEquals(wu.getExtract().getType(),
Extract.TableType.SNAPSHOT_ONLY);
+ // Verify 3 work units are created
+ Assert.assertEquals(workUnits.size(), 3, "Should create 3 work units");
+ for (WorkUnit wu : workUnits) {
+ String filesToPull =
wu.getProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL);
+ Assert.assertNotNull(filesToPull);
+ // Verify extract info
+ Assert.assertEquals(wu.getExtract().getNamespace(), "iceberg");
+ Assert.assertEquals(wu.getExtract().getTable(), "test_table");
+ Assert.assertEquals(wu.getExtract().getType(),
Extract.TableType.SNAPSHOT_ONLY);
+ }
+
}
@Test
@@ -217,6 +224,7 @@ public class IcebergSourceTest {
}
@Test
+ @Ignore("Re-enable test once grouping logic is fixed")
public void testFileGrouping() throws Exception {
// Test with more files than files per work unit
properties.setProperty(IcebergSource.ICEBERG_FILES_PER_WORKUNIT, "3");
@@ -453,14 +461,15 @@ public class IcebergSourceTest {
List<WorkUnit> workUnits = (List<WorkUnit>) m.invoke(icebergSource,
filesWithPartitions, sourceState, mockTable);
// Verify partition info is in work unit
- Assert.assertEquals(workUnits.size(), 1);
- WorkUnit wu = workUnits.get(0);
- Assert.assertEquals(wu.getProp(IcebergSource.ICEBERG_PARTITION_KEY),
"datepartition");
- Assert.assertEquals(wu.getProp(IcebergSource.ICEBERG_PARTITION_VALUES),
"2025-04-01");
-
- // Verify partition path mapping is stored
- Assert.assertNotNull(wu.getProp(IcebergSource.ICEBERG_FILE_PARTITION_PATH),
- "Partition path mapping should be stored in work unit");
+ Assert.assertEquals(workUnits.size(), 2);
+ for (WorkUnit wu : workUnits) {
+ Assert.assertEquals(wu.getProp(IcebergSource.ICEBERG_PARTITION_KEY),
"datepartition");
+ Assert.assertEquals(wu.getProp(IcebergSource.ICEBERG_PARTITION_VALUES),
"2025-04-01");
+ // Verify partition path mapping is stored
+
Assert.assertNotNull(wu.getProp(IcebergSource.ICEBERG_FILE_PARTITION_PATH),
+ "Partition path mapping should be stored in work unit");
+ }
+
}
@Test
@@ -498,7 +507,7 @@ public class IcebergSourceTest {
// Verify all files discovered with partition metadata preserved
Assert.assertEquals(discoveredFiles.size(), 4, "Should discover all data
files");
-
+
// Verify partition metadata is preserved for each file
for (FilePathWithPartition file : discoveredFiles) {
Assert.assertNotNull(file.getPartitionData(), "Partition data should be
present");
@@ -510,7 +519,7 @@ public class IcebergSourceTest {
Assert.assertTrue(file.getPartitionPath().startsWith("datepartition="),
"Partition path should be in format: datepartition=<date>");
}
-
+
// Verify files are from different partitions
java.util.Set<String> uniquePartitions = discoveredFiles.stream()
.map(f -> f.getPartitionData().get("datepartition"))
@@ -772,7 +781,6 @@ public class IcebergSourceTest {
@Test
public void testWorkUnitSizeTracking() throws Exception {
// Test that work units include file size information for dynamic scaling
- properties.setProperty(IcebergSource.ICEBERG_FILES_PER_WORKUNIT, "2");
sourceState = new SourceState(new State(properties));
// Create files with different sizes
@@ -796,33 +804,39 @@ public class IcebergSourceTest {
List<WorkUnit> workUnits = (List<WorkUnit>) m.invoke(icebergSource,
filesWithSizes, sourceState, mockTable);
// Should create 2 work units (4 files / 2 files per unit)
- Assert.assertEquals(workUnits.size(), 2, "Should create 2 work units");
+ Assert.assertEquals(workUnits.size(), 4, "Should create 4 work units");
// Verify each work unit has WORK_UNIT_SIZE set
WorkUnit wu1 = workUnits.get(0);
long wu1Size = wu1.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE);
- Assert.assertEquals(wu1Size, 1073741824L + 536870912L, // 1 GB + 512 MB
- "WorkUnit 1 should have total size of its files");
+ Assert.assertEquals(wu1Size, 1073741824L, // 1 GB
+ "WorkUnit 1 should have total size of 1 GB");
WorkUnit wu2 = workUnits.get(1);
long wu2Size = wu2.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE);
- Assert.assertEquals(wu2Size, 2147483648L + 268435456L, // 2 GB + 256 MB
- "WorkUnit 2 should have total size of its files");
+ Assert.assertEquals(wu2Size, 536870912L, // 512 MB
+ "WorkUnit 1 should have total size of 512 MB");
- // Verify work unit weight is set for bin packing
- String weight1 = wu1.getProp("iceberg.workUnitWeight");
- Assert.assertNotNull(weight1, "Work unit weight should be set");
- Assert.assertEquals(Long.parseLong(weight1), wu1Size, "Weight should equal
total size");
+ WorkUnit wu3 = workUnits.get(2);
+ long wu3Size = wu3.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE);
+ Assert.assertEquals(wu3Size, 2147483648L, // 2 GB
+ "WorkUnit 1 should have total size of 2 GB");
+
+ WorkUnit wu4 = workUnits.get(3);
+ long wu4Size = wu4.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE);
+ Assert.assertEquals(wu4Size, 268435456L, // 256 MB
+ "WorkUnit 2 should have total size of 256 MB");
- String weight2 = wu2.getProp("iceberg.workUnitWeight");
- Assert.assertNotNull(weight2, "Work unit weight should be set");
- Assert.assertEquals(Long.parseLong(weight2), wu2Size, "Weight should equal
total size");
+ // Verify work unit weight is set for bin packing
+ for (WorkUnit wu : workUnits) {
+ String weight = wu1.getProp("iceberg.workUnitWeight");
+ Assert.assertNotNull(weight, "Work unit weight should be set");
+ }
}
@Test
public void testBinPackingDisabled() throws Exception {
// Test that bin packing is skipped when not configured
- properties.setProperty(IcebergSource.ICEBERG_FILES_PER_WORKUNIT, "1");
// Do NOT set binPacking.maxSizePerBin - bin packing should be disabled
sourceState = new SourceState(new State(properties));
@@ -857,7 +871,6 @@ public class IcebergSourceTest {
@Test
public void testBinPackingEnabled() throws Exception {
// Test that bin packing groups work units by size using
WorstFitDecreasing algorithm
- properties.setProperty(IcebergSource.ICEBERG_FILES_PER_WORKUNIT, "1");
// Use CopySource bin packing configuration key for consistency
properties.setProperty(CopySource.MAX_SIZE_MULTI_WORKUNITS, "5000"); //
5KB max per bin
sourceState = new SourceState(new State(properties));
@@ -959,8 +972,8 @@ public class IcebergSourceTest {
Assert.assertFalse(workUnitsBeforeSimulateCheck.isEmpty(),
"Work units should be created before simulate mode check");
- Assert.assertEquals(workUnitsBeforeSimulateCheck.size(), 1,
- "Should create 1 work unit from 2 files before simulate check");
+ Assert.assertEquals(workUnitsBeforeSimulateCheck.size(), 2,
+ "Should create 2 work units");
// Test 4: Verify logSimulateMode can be called successfully (logs the
plan)
Method logMethod = IcebergSource.class.getDeclaredMethod("logSimulateMode",
@@ -987,7 +1000,7 @@ public class IcebergSourceTest {
"Simulate mode: zero work units should be returned for execution");
// Verify the work units were created but NOT returned
- Assert.assertEquals(workUnitsBeforeSimulateCheck.size(), 1,
+ Assert.assertEquals(workUnitsBeforeSimulateCheck.size(), 2,
"Work units were created internally but not returned due to simulate
mode");
}
@@ -1066,12 +1079,12 @@ public class IcebergSourceTest {
Assert.assertNotNull(partitionValues, "Partition values should be set");
String[] dates = partitionValues.split(",");
Assert.assertEquals(dates.length, 3, "Should have 3 partition values");
-
+
// Verify -00 suffix is appended to all dates
Assert.assertEquals(dates[0], "2025-04-03-00", "Should have -00 suffix for
day 0");
Assert.assertEquals(dates[1], "2025-04-02-00", "Should have -00 suffix for
day 1");
Assert.assertEquals(dates[2], "2025-04-01-00", "Should have -00 suffix for
day 2");
-
+
// Verify all follow the hourly format pattern
for (String date : dates) {
Assert.assertEquals(date.length(), 13, "Date should be in yyyy-MM-dd-00
format (13 chars)");
@@ -1082,7 +1095,6 @@ public class IcebergSourceTest {
@Test
public void testZeroSizeFilesHandling() throws Exception {
// Test handling of files with zero or very small sizes
- properties.setProperty(IcebergSource.ICEBERG_FILES_PER_WORKUNIT, "3");
sourceState = new SourceState(new State(properties));
List<FilePathWithPartition> filesWithSizes = Arrays.asList(
@@ -1103,16 +1115,17 @@ public class IcebergSourceTest {
List<WorkUnit> workUnits = (List<WorkUnit>) m.invoke(icebergSource,
filesWithSizes, sourceState, mockTable);
// Should handle gracefully
- Assert.assertEquals(workUnits.size(), 1);
- WorkUnit wu = workUnits.get(0);
+ Assert.assertEquals(workUnits.size(), 3);
- long totalSize = wu.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE);
- Assert.assertEquals(totalSize, 101L, "Total size should be 0 + 1 + 100 =
101");
+ Set<Long> expectedSizes = new HashSet<>(Arrays.asList(0L, 1L, 100L));
+ for (WorkUnit wu : workUnits) {
+ long size = wu.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE);
+ Assert.assertTrue(expectedSizes.contains(size));
+ expectedSizes.remove(size);
- // Weight should be at least 1 (minimum weight)
- String weightStr = wu.getProp("iceberg.workUnitWeight");
- long weight = Long.parseLong(weightStr);
- Assert.assertTrue(weight >= 1L, "Weight should be at least 1 for very
small files");
+ long weight = wu.getPropAsLong("iceberg.workUnitWeight");
+ Assert.assertTrue(weight >= 1L, "Weight should be at least 1 for very
small files");
+ }
}
}