This is an automated email from the ASF dual-hosted git repository.
vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 9ad7ad22b2 [GOBBLIN-2231] Added extractor for partition-aware file
copy from Iceberg to any dest (#4154)
9ad7ad22b2 is described below
commit 9ad7ad22b2f59613354db934818f9475b7d47486
Author: Prateek Khandelwal <[email protected]>
AuthorDate: Tue Nov 11 19:50:47 2025 +0530
[GOBBLIN-2231] Added extractor for partition-aware file copy from Iceberg
to any dest (#4154)
* Added IcebergFileStreamExtractor for partition-aware file copy from
Iceberg to any dest
* Address review comments and fix indentation
* Add IcebergFileStreamHelper change
* Fix test failure
---
.../copy/iceberg/IcebergFileStreamExtractor.java | 132 +++++++-
.../copy/iceberg/IcebergFileStreamHelper.java | 13 +-
.../iceberg/IcebergFileStreamExtractorTest.java | 341 +++++++++++++++++++++
.../management/copy/iceberg/IcebergSourceTest.java | 2 +
4 files changed, 485 insertions(+), 3 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamExtractor.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamExtractor.java
index e87b187d74..4644d6ea94 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamExtractor.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamExtractor.java
@@ -18,15 +18,33 @@
package org.apache.gobblin.data.management.copy.iceberg;
import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Collections;
import java.util.Iterator;
+import java.util.Map;
-import org.apache.commons.lang3.NotImplementedException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Optional;
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+import com.google.gson.reflect.TypeToken;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyableFile;
import org.apache.gobblin.data.management.copy.FileAwareInputStream;
import org.apache.gobblin.source.extractor.filebased.FileBasedExtractor;
+import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;
+import org.apache.gobblin.util.WriterUtils;
/**
* Extractor for file streaming mode that creates FileAwareInputStream for
each file.
@@ -40,8 +58,39 @@ import
org.apache.gobblin.source.extractor.filebased.FileBasedExtractor;
@Slf4j
public class IcebergFileStreamExtractor extends FileBasedExtractor<String,
FileAwareInputStream> {
+ private final Map<String, String> fileToPartitionPathMap;
+ private final Gson gson = new Gson();
+ private final FileSystem targetFs;
+ private final CopyConfiguration copyConfiguration;
+
public IcebergFileStreamExtractor(WorkUnitState workUnitState) throws
IOException {
super(workUnitState, new IcebergFileStreamHelper(workUnitState));
+
+ // Initialize target FileSystem and CopyConfiguration once
+ String writerFsUri =
workUnitState.getProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI,
ConfigurationKeys.LOCAL_FS_URI);
+ Configuration writerConf = WriterUtils.getFsConfiguration(workUnitState);
+ this.targetFs = FileSystem.get(URI.create(writerFsUri), writerConf);
+ this.copyConfiguration = CopyConfiguration.builder(this.targetFs,
workUnitState.getProperties()).build();
+
+ // Load partition path mapping from work unit (set by IcebergSource)
+ String partitionPathJson =
workUnitState.getProp(IcebergSource.ICEBERG_FILE_PARTITION_PATH);
+ if (!StringUtils.isBlank(partitionPathJson)) {
+ try {
+ this.fileToPartitionPathMap = gson.fromJson(partitionPathJson,
+ new TypeToken<Map<String, String>>() {}.getType());
+ log.info("Loaded partition path mapping for {} files",
fileToPartitionPathMap.size());
+ } catch (JsonSyntaxException e) {
+ String errorMsg = String.format("Failed to parse partition path
mapping from work unit. "
+ + "Expected valid JSON map, got: '%s'. Error: %s",
+ partitionPathJson.length() > 200 ? partitionPathJson.substring(0,
200) + "..." : partitionPathJson,
+ e.getMessage());
+ log.error(errorMsg, e);
+ throw new IOException(errorMsg, e);
+ }
+ } else {
+ this.fileToPartitionPathMap = Collections.emptyMap();
+ log.info("No partition path mapping found in work unit");
+ }
}
@Override
@@ -50,9 +99,88 @@ public class IcebergFileStreamExtractor extends
FileBasedExtractor<String, FileA
return "FileAwareInputStream";
}
+ /**
+ * Downloads a file and wraps it in a {@link FileAwareInputStream} for
streaming to the destination.
+ *
+ * <p>This method performs the following operations:
+ * <ol>
+ * <li>Opens an input stream for the source file using {@link
IcebergFileStreamHelper}</li>
+ * <li>Retrieves source file metadata (FileStatus) from the source
filesystem</li>
+ * <li>Computes the destination path, which may be partition-aware based
on work unit metadata</li>
+ * <li>Builds a {@link CopyableFile} containing both source and
destination metadata</li>
+ * <li>Wraps the input stream and metadata in a {@link
FileAwareInputStream}</li>
+ * </ol>
+ *
+ * @param filePath the absolute path to the source file to download
+ * @return an iterator containing a single {@link FileAwareInputStream}
wrapping the file
+ * @throws IOException if the file cannot be opened, file metadata cannot be
retrieved,
+ * or destination path computation fails
+ */
@Override
public Iterator<FileAwareInputStream> downloadFile(String filePath) throws
IOException {
- throw new NotImplementedException("Not yet implemented");
+ log.info("Preparing FileAwareInputStream for file: {}", filePath);
+
+ // Open source stream using fsHelper
+ final InputStream inputStream;
+ try {
+ inputStream =
this.getCloser().register(this.getFsHelper().getFileStream(filePath));
+ } catch (FileBasedHelperException e) {
+ throw new IOException("Failed to open source stream for: " + filePath,
e);
+ }
+
+ // Get source file metadata using fsHelper's FileSystem
+ Path sourcePath = new Path(filePath);
+ FileSystem originFs = ((IcebergFileStreamHelper)
this.getFsHelper()).getFileSystemForPath(sourcePath);
+ FileStatus originStatus = originFs.getFileStatus(sourcePath);
+
+ // Compute partition-aware destination path
+ String finalDir =
this.workUnitState.getProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR);
+ if (StringUtils.isBlank(finalDir)) {
+ throw new IOException("Required configuration '" +
ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR + "' is not set. "
+ + "Cannot determine destination path for file: " + filePath);
+ }
+ Path destinationPath = computeDestinationPath(filePath, finalDir,
sourcePath.getName());
+
+ // Build CopyableFile using cached targetFs and copyConfiguration
(initialized once in constructor)
+ CopyableFile copyableFile =
CopyableFile.fromOriginAndDestination(originFs, originStatus, destinationPath,
this.copyConfiguration).build();
+
+ FileAwareInputStream fileAwareInputStream = FileAwareInputStream.builder()
+ .file(copyableFile)
+ .inputStream(inputStream)
+ .split(Optional.absent())
+ .build();
+
+ return Collections.singletonList(fileAwareInputStream).iterator();
+ }
+
+ /**
+ * Compute destination path with partition awareness.
+ *
+ * <p>If partition metadata is available for this file, the destination path
will include
+ * the partition path: {@code <finalDir>/<partitionPath>/<filename>}</p>
+ *
+ * <p>Otherwise, the file is placed directly under finalDir: {@code
<finalDir>/<filename>}</p>
+ *
+ * @param sourceFilePath the source file path
+ * @param finalDir the final directory from configuration
+ * @param fileName the file name
+ * @return the computed destination path
+ */
+ private Path computeDestinationPath(String sourceFilePath, String finalDir,
String fileName) {
+ String partitionPath = fileToPartitionPathMap.get(sourceFilePath);
+
+ if (!StringUtils.isBlank(partitionPath)) {
+ // Partition-aware path: <finalDir>/<partitionPath>/<filename>
+ // Example: /data/table1/datepartition=2025-04-01/file.orc
+ Path destinationPath = new Path(new Path(finalDir, partitionPath),
fileName);
+ log.info("Computed partition-aware destination: {} -> {}",
sourceFilePath, destinationPath);
+ return destinationPath;
+ } else {
+ // No partition info: <finalDir>/<filename>
+ Path destinationPath = new Path(finalDir, fileName);
+ log.info("Computed flat destination (no partition): {} -> {}",
sourceFilePath, destinationPath);
+ return destinationPath;
+ }
}
}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamHelper.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamHelper.java
index ebe69ef1ce..e3da2b5506 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamHelper.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamHelper.java
@@ -113,7 +113,18 @@ public class IcebergFileStreamHelper implements
TimestampAwareFileBasedHelper {
}
}
- private FileSystem getFileSystemForPath(Path path) throws IOException {
+ /**
+ * Get FileSystem for a given path, reusing the default FileSystem when
possible.
+ *
+ * <p>This method avoids creating unnecessary FileSystem instances by
reusing the
+ * default FileSystem for paths with the same scheme. Only creates a new
FileSystem
+ * if the path uses a different scheme (e.g., reading from HDFS while
default is local).
+ *
+ * @param path the path to get FileSystem for
+ * @return FileSystem instance (either the default or a new one for
cross-scheme access)
+ * @throws IOException if FileSystem cannot be created
+ */
+ public FileSystem getFileSystemForPath(Path path) throws IOException {
// If path has a different scheme than the default FileSystem, get
scheme-specific FS
if (path.toUri().getScheme() != null &&
!path.toUri().getScheme().equals(fileSystem.getUri().getScheme())) {
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamExtractorTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamExtractorTest.java
new file mode 100644
index 0000000000..ef885bec18
--- /dev/null
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamExtractorTest.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.copy.FileAwareInputStream;
+import org.apache.gobblin.source.workunit.Extract;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+/**
+* Tests for {@link IcebergFileStreamExtractor}.
+*/
+public class IcebergFileStreamExtractorTest {
+
+ private File tempDir;
+ private File testFile;
+ private IcebergFileStreamExtractor extractor;
+ private WorkUnitState workUnitState;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ // Create temp directory and test file
+ tempDir = new File(System.getProperty("java.io.tmpdir"),
"iceberg-extractor-test-" + System.currentTimeMillis());
+ tempDir.mkdirs();
+
+ testFile = new File(tempDir, "test-data.parquet");
+ try (FileOutputStream fos = new FileOutputStream(testFile)) {
+ fos.write("Test data content for streaming".getBytes());
+ }
+
+ // Set up work unit state
+ Properties properties = new Properties();
+ properties.setProperty(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL,
testFile.getAbsolutePath());
+ properties.setProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR,
tempDir.getAbsolutePath() + "/final");
+ properties.setProperty(ConfigurationKeys.WRITER_FILE_SYSTEM_URI,
"file:///");
+ properties.setProperty(ConfigurationKeys.WRITER_STAGING_DIR,
tempDir.getAbsolutePath() + "/staging");
+ properties.setProperty(ConfigurationKeys.WRITER_OUTPUT_DIR,
tempDir.getAbsolutePath() + "/output");
+
+ WorkUnit workUnit = WorkUnit.create(
+ new Extract(Extract.TableType.SNAPSHOT_ONLY, "test_namespace",
"test_table")
+ );
+ workUnitState = new WorkUnitState(workUnit, new
org.apache.gobblin.configuration.State(properties));
+
+ // Initialize extractor
+ extractor = new IcebergFileStreamExtractor(workUnitState);
+ }
+
+ @AfterMethod
+ public void tearDown() throws Exception {
+ if (extractor != null) {
+ extractor.close();
+ }
+ // Clean up temp files
+ if (tempDir != null && tempDir.exists()) {
+ deleteDirectory(tempDir);
+ }
+ }
+
+ @Test
+ public void testGetSchema() throws Exception {
+ String schema = extractor.getSchema();
+ Assert.assertEquals(schema, "FileAwareInputStream",
+ "Schema should be FileAwareInputStream for file streaming mode");
+ }
+
+ @Test
+ public void testDownloadFile() throws Exception {
+ // Test downloading a file
+ Iterator<FileAwareInputStream> iterator =
extractor.downloadFile(testFile.getAbsolutePath());
+
+ Assert.assertTrue(iterator.hasNext(), "Should return at least one
FileAwareInputStream");
+
+ FileAwareInputStream fais = iterator.next();
+ Assert.assertNotNull(fais, "FileAwareInputStream should not be null");
+ Assert.assertNotNull(fais.getFile(), "CopyableFile should not be null");
+ Assert.assertNotNull(fais.getInputStream(), "InputStream should not be
null");
+
+ // Verify no more items
+ Assert.assertFalse(iterator.hasNext(), "Should only return one
FileAwareInputStream per file");
+ }
+
+ @Test
+ public void testDownloadFileWithCopyableFileMetadata() throws Exception {
+ Iterator<FileAwareInputStream> iterator =
extractor.downloadFile(testFile.getAbsolutePath());
+ FileAwareInputStream fais = iterator.next();
+
+ // Verify CopyableFile has correct metadata
+ Assert.assertNotNull(fais.getFile().getOrigin(), "Origin FileStatus should
be set");
+ Assert.assertNotNull(fais.getFile().getDestination(), "Destination should
be set");
+
+ // Verify origin path matches source
+
Assert.assertTrue(fais.getFile().getOrigin().getPath().toString().contains(testFile.getName()),
+ "Origin path should contain test file name");
+
+ // Verify destination path is under final dir
+ String finalDir =
workUnitState.getProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR);
+
Assert.assertTrue(fais.getFile().getDestination().toString().contains(finalDir),
+ "Destination should be under final dir");
+
Assert.assertTrue(fais.getFile().getDestination().toString().contains(testFile.getName()),
+ "Destination should contain test file name");
+ }
+
+ @Test
+ public void testDownloadFileStreamIsReadable() throws Exception {
+ Iterator<FileAwareInputStream> iterator =
extractor.downloadFile(testFile.getAbsolutePath());
+ FileAwareInputStream fais = iterator.next();
+
+ // Verify we can read from the stream
+ byte[] buffer = new byte[1024];
+ int bytesRead = fais.getInputStream().read(buffer);
+ Assert.assertTrue(bytesRead > 0, "Should be able to read bytes from
stream");
+
+ String content = new String(buffer, 0, bytesRead);
+ Assert.assertTrue(content.contains("Test data content"), "Stream content
should match file content");
+ }
+
+ @Test(expectedExceptions = IOException.class)
+ public void testDownloadNonExistentFile() throws Exception {
+ // Test error handling for non-existent file
+ extractor.downloadFile("/non/existent/path/file.parquet");
+ }
+
+ @Test
+ public void testDownloadMultipleFiles() throws Exception {
+ // Create second test file
+ File testFile2 = new File(tempDir, "test-data-2.parquet");
+ try (FileOutputStream fos = new FileOutputStream(testFile2)) {
+ fos.write("Second test file content".getBytes());
+ }
+
+ // Download first file
+ Iterator<FileAwareInputStream> iterator1 =
extractor.downloadFile(testFile.getAbsolutePath());
+ Assert.assertTrue(iterator1.hasNext());
+ FileAwareInputStream fais1 = iterator1.next();
+ Assert.assertNotNull(fais1);
+
+ // Close first extractor and create new one for second file (single file
per extractor)
+ extractor.close();
+ extractor = new IcebergFileStreamExtractor(workUnitState);
+
+ // Download second file
+ Iterator<FileAwareInputStream> iterator2 =
extractor.downloadFile(testFile2.getAbsolutePath());
+ Assert.assertTrue(iterator2.hasNext());
+ FileAwareInputStream fais2 = iterator2.next();
+ Assert.assertNotNull(fais2);
+
+ // Verify different files
+ Assert.assertNotEquals(fais1.getFile().getOrigin().getPath().getName(),
+ fais2.getFile().getOrigin().getPath().getName(),
+ "Should process different files");
+ }
+
+ @Test
+ public void testFileMetadataPreservation() throws Exception {
+ Iterator<FileAwareInputStream> iterator =
extractor.downloadFile(testFile.getAbsolutePath());
+ FileAwareInputStream fais = iterator.next();
+
+ // Verify origin file status is captured
+ Assert.assertTrue(fais.getFile().getOrigin().isFile(),
+ "Origin should be a file, not directory");
+ Assert.assertTrue(fais.getFile().getOrigin().getLen() > 0,
+ "Origin file size should be greater than 0");
+ }
+
+ @Test
+ public void testPartitionAwareDestinationPath() throws Exception {
+ // Test that partition path is correctly included in destination
+ // Set up partition path mapping in work unit
+ String partitionPath = "datepartition=2025-04-01";
+ String fileToPartitionJson = String.format("{\"%s\":\"%s\"}",
testFile.getAbsolutePath(), partitionPath);
+
+ Properties propsWithPartition = new Properties();
+ propsWithPartition.putAll(workUnitState.getProperties());
+ propsWithPartition.setProperty(IcebergSource.ICEBERG_FILE_PARTITION_PATH,
fileToPartitionJson);
+
+ WorkUnit workUnit = WorkUnit.create(
+ new Extract(Extract.TableType.SNAPSHOT_ONLY, "test_namespace",
"test_table"));
+ WorkUnitState wuStateWithPartition = new WorkUnitState(workUnit,
+ new org.apache.gobblin.configuration.State(propsWithPartition));
+
+ // Create new extractor with partition mapping
+ IcebergFileStreamExtractor extractorWithPartition = new
IcebergFileStreamExtractor(wuStateWithPartition);
+
+ try {
+ Iterator<FileAwareInputStream> iterator =
extractorWithPartition.downloadFile(testFile.getAbsolutePath());
+ FileAwareInputStream fais = iterator.next();
+
+ // Verify destination includes partition path
+ String destinationPath = fais.getFile().getDestination().toString();
+ Assert.assertTrue(destinationPath.contains(partitionPath),
+ "Destination should contain partition path: " + partitionPath);
+ Assert.assertTrue(destinationPath.contains(testFile.getName()),
+ "Destination should contain file name");
+
+ // Verify path structure: <finalDir>/<partitionPath>/<filename>
+ String expectedPathSubstring = partitionPath + "/" + testFile.getName();
+ Assert.assertTrue(destinationPath.contains(expectedPathSubstring),
+ "Destination should have structure: " + expectedPathSubstring);
+ } finally {
+ extractorWithPartition.close();
+ }
+ }
+
+ @Test
+ public void testMultiplePartitionAwareFiles() throws Exception {
+ // Test multiple files with different partitions
+ File testFile2 = new File(tempDir, "test-data-2.parquet");
+ try (FileOutputStream fos = new FileOutputStream(testFile2)) {
+ fos.write("Second partition data".getBytes());
+ }
+
+ // Set up partition mappings for both files
+ String partition1 = "datepartition=2025-04-01";
+ String partition2 = "datepartition=2025-04-02";
+ String fileToPartitionJson = String.format("{\"%s\":\"%s\",
\"%s\":\"%s\"}",
+ testFile.getAbsolutePath(), partition1,
+ testFile2.getAbsolutePath(), partition2);
+
+ Properties propsWithPartitions = new Properties();
+ propsWithPartitions.putAll(workUnitState.getProperties());
+ propsWithPartitions.setProperty(IcebergSource.ICEBERG_FILE_PARTITION_PATH,
fileToPartitionJson);
+
+ WorkUnit workUnit = WorkUnit.create(
+ new Extract(Extract.TableType.SNAPSHOT_ONLY, "test_namespace",
"test_table"));
+ WorkUnitState wuStateWithPartitions = new WorkUnitState(workUnit,
+ new org.apache.gobblin.configuration.State(propsWithPartitions));
+
+ IcebergFileStreamExtractor extractorWithPartitions = new
IcebergFileStreamExtractor(wuStateWithPartitions);
+
+ try {
+ // Process first file
+ Iterator<FileAwareInputStream> iterator1 =
extractorWithPartitions.downloadFile(testFile.getAbsolutePath());
+ FileAwareInputStream fais1 = iterator1.next();
+
Assert.assertTrue(fais1.getFile().getDestination().toString().contains(partition1),
+ "First file should map to partition1");
+
+ // Close and create new extractor for second file
+ extractorWithPartitions.close();
+ extractorWithPartitions = new
IcebergFileStreamExtractor(wuStateWithPartitions);
+
+ // Process second file
+ Iterator<FileAwareInputStream> iterator2 =
extractorWithPartitions.downloadFile(testFile2.getAbsolutePath());
+ FileAwareInputStream fais2 = iterator2.next();
+
Assert.assertTrue(fais2.getFile().getDestination().toString().contains(partition2),
+ "Second file should map to partition2");
+ } finally {
+ extractorWithPartitions.close();
+ }
+ }
+
+ @Test
+ public void testNoPartitionMappingFallback() throws Exception {
+ // Test that when no partition mapping is provided, files go directly
under finalDir
+ // (extractor was already created without partition mapping in setUp)
+ Iterator<FileAwareInputStream> iterator =
extractor.downloadFile(testFile.getAbsolutePath());
+ FileAwareInputStream fais = iterator.next();
+
+ String destinationPath = fais.getFile().getDestination().toString();
+ String finalDir =
workUnitState.getProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR);
+
+ // Should be: <finalDir>/<filename> (no partition subdirectory)
+ String expectedPath = finalDir + "/" + testFile.getName();
+ Assert.assertEquals(destinationPath, expectedPath,
+ "Without partition mapping, destination should be directly under
finalDir");
+ }
+
+ @Test(expectedExceptions = IOException.class)
+ public void testMalformedPartitionJsonThrowsException() throws Exception {
+ // Test that malformed JSON in partition path mapping throws a clear
exception
+ Properties propsWithMalformedJson = new Properties();
+ propsWithMalformedJson.putAll(workUnitState.getProperties());
+
propsWithMalformedJson.setProperty(IcebergSource.ICEBERG_FILE_PARTITION_PATH,
"{invalid json missing quote");
+
+ WorkUnit workUnit = WorkUnit.create(
+ new Extract(Extract.TableType.SNAPSHOT_ONLY, "test_namespace",
"test_table")
+ );
+ WorkUnitState wuStateWithMalformedJson = new WorkUnitState(workUnit,
+ new org.apache.gobblin.configuration.State(propsWithMalformedJson));
+
+ // Should throw IOException wrapping JsonSyntaxException with clear error
message
+ IcebergFileStreamExtractor testExtractor = null;
+ try {
+ testExtractor = new IcebergFileStreamExtractor(wuStateWithMalformedJson);
+ Assert.fail("Should throw IOException for malformed partition JSON");
+ } catch (IOException e) {
+ // Verify error message is informative
+ Assert.assertTrue(e.getMessage().contains("Failed to parse partition
path mapping"),
+ "Error message should indicate JSON parsing failure");
+ Assert.assertTrue(e.getMessage().contains("invalid json missing quote"),
+ "Error message should include the malformed JSON snippet");
+ Assert.assertTrue(e.getCause() instanceof
com.google.gson.JsonSyntaxException,
+ "Root cause should be JsonSyntaxException");
+ throw e; // Re-throw for @Test(expectedExceptions)
+ } finally {
+ if (testExtractor != null) {
+ testExtractor.close();
+ }
+ }
+ }
+
+ private void deleteDirectory(File directory) {
+ File[] files = directory.listFiles();
+ if (files != null) {
+ for (File file : files) {
+ if (file.isDirectory()) {
+ deleteDirectory(file);
+ } else {
+ file.delete();
+ }
+ }
+ }
+ directory.delete();
+ }
+}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSourceTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSourceTest.java
index b4e6bc7eae..248108c0c8 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSourceTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSourceTest.java
@@ -172,6 +172,8 @@ public class IcebergSourceTest {
State jobState = new State(properties);
WorkUnitState workUnitState = new WorkUnitState(dummyWu, jobState);
workUnitState.setProp(IcebergSource.ICEBERG_RECORD_PROCESSING_ENABLED,
"false");
+ // IcebergFileStreamExtractor requires data.publisher.final.dir for
CopyConfiguration
+ workUnitState.setProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR,
"/tmp/test-destination");
Extractor<String, FileAwareInputStream> extractor =
icebergSource.getExtractor(workUnitState);
// Verify correct extractor type