This is an automated email from the ASF dual-hosted git repository.

vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ad7ad22b2 [GOBBLIN-2231] Added extractor for partition-aware file 
copy from Iceberg to any dest (#4154)
9ad7ad22b2 is described below

commit 9ad7ad22b2f59613354db934818f9475b7d47486
Author: Prateek Khandelwal <[email protected]>
AuthorDate: Tue Nov 11 19:50:47 2025 +0530

    [GOBBLIN-2231] Added extractor for partition-aware file copy from Iceberg 
to any dest (#4154)
    
    * Added IcebergFileStreamExtractor for partition-aware file copy from 
Iceberg to any dest
    
    * Address review comments and fix indentation
    
    * Add IcebergFileStreamHelper change
    
    * Fix test failure
---
 .../copy/iceberg/IcebergFileStreamExtractor.java   | 132 +++++++-
 .../copy/iceberg/IcebergFileStreamHelper.java      |  13 +-
 .../iceberg/IcebergFileStreamExtractorTest.java    | 341 +++++++++++++++++++++
 .../management/copy/iceberg/IcebergSourceTest.java |   2 +
 4 files changed, 485 insertions(+), 3 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamExtractor.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamExtractor.java
index e87b187d74..4644d6ea94 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamExtractor.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamExtractor.java
@@ -18,15 +18,33 @@
 package org.apache.gobblin.data.management.copy.iceberg;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Collections;
 import java.util.Iterator;
+import java.util.Map;
 
-import org.apache.commons.lang3.NotImplementedException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Optional;
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+import com.google.gson.reflect.TypeToken;
 
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyableFile;
 import org.apache.gobblin.data.management.copy.FileAwareInputStream;
 import org.apache.gobblin.source.extractor.filebased.FileBasedExtractor;
+import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;
+import org.apache.gobblin.util.WriterUtils;
 
 /**
  * Extractor for file streaming mode that creates FileAwareInputStream for 
each file.
@@ -40,8 +58,39 @@ import 
org.apache.gobblin.source.extractor.filebased.FileBasedExtractor;
 @Slf4j
 public class IcebergFileStreamExtractor extends FileBasedExtractor<String, 
FileAwareInputStream> {
 
+  private final Map<String, String> fileToPartitionPathMap;
+  private final Gson gson = new Gson();
+  private final FileSystem targetFs;
+  private final CopyConfiguration copyConfiguration;
+
   public IcebergFileStreamExtractor(WorkUnitState workUnitState) throws 
IOException {
     super(workUnitState, new IcebergFileStreamHelper(workUnitState));
+
+    // Initialize target FileSystem and CopyConfiguration once
+    String writerFsUri = 
workUnitState.getProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, 
ConfigurationKeys.LOCAL_FS_URI);
+    Configuration writerConf = WriterUtils.getFsConfiguration(workUnitState);
+    this.targetFs = FileSystem.get(URI.create(writerFsUri), writerConf);
+    this.copyConfiguration = CopyConfiguration.builder(this.targetFs, 
workUnitState.getProperties()).build();
+
+    // Load partition path mapping from work unit (set by IcebergSource)
+    String partitionPathJson = 
workUnitState.getProp(IcebergSource.ICEBERG_FILE_PARTITION_PATH);
+    if (!StringUtils.isBlank(partitionPathJson)) {
+      try {
+        this.fileToPartitionPathMap = gson.fromJson(partitionPathJson,
+            new TypeToken<Map<String, String>>() {}.getType());
+        log.info("Loaded partition path mapping for {} files", 
fileToPartitionPathMap.size());
+      } catch (JsonSyntaxException e) {
+        String errorMsg = String.format("Failed to parse partition path 
mapping from work unit. "
+            + "Expected valid JSON map, got: '%s'. Error: %s",
+            partitionPathJson.length() > 200 ? partitionPathJson.substring(0, 
200) + "..." : partitionPathJson,
+            e.getMessage());
+        log.error(errorMsg, e);
+        throw new IOException(errorMsg, e);
+      }
+    } else {
+      this.fileToPartitionPathMap = Collections.emptyMap();
+      log.info("No partition path mapping found in work unit");
+    }
   }
 
   @Override
@@ -50,9 +99,88 @@ public class IcebergFileStreamExtractor extends 
FileBasedExtractor<String, FileA
     return "FileAwareInputStream";
   }
 
+  /**
+   * Downloads a file and wraps it in a {@link FileAwareInputStream} for 
streaming to the destination.
+   *
+   * <p>This method performs the following operations:
+   * <ol>
+   *   <li>Opens an input stream for the source file using {@link 
IcebergFileStreamHelper}</li>
+   *   <li>Retrieves source file metadata (FileStatus) from the source 
filesystem</li>
+   *   <li>Computes the destination path, which may be partition-aware based 
on work unit metadata</li>
+   *   <li>Builds a {@link CopyableFile} containing both source and 
destination metadata</li>
+   *   <li>Wraps the input stream and metadata in a {@link 
FileAwareInputStream}</li>
+   * </ol>
+   *
+   * @param filePath the absolute path to the source file to download
+   * @return an iterator containing a single {@link FileAwareInputStream} 
wrapping the file
+   * @throws IOException if the file cannot be opened, file metadata cannot be 
retrieved,
+   *                     or destination path computation fails
+   */
   @Override
   public Iterator<FileAwareInputStream> downloadFile(String filePath) throws 
IOException {
-    throw new NotImplementedException("Not yet implemented");
+    log.info("Preparing FileAwareInputStream for file: {}", filePath);
+
+    // Open source stream using fsHelper
+    final InputStream inputStream;
+    try {
+      inputStream = 
this.getCloser().register(this.getFsHelper().getFileStream(filePath));
+    } catch (FileBasedHelperException e) {
+      throw new IOException("Failed to open source stream for: " + filePath, 
e);
+    }
+
+    // Get source file metadata using fsHelper's FileSystem
+    Path sourcePath = new Path(filePath);
+    FileSystem originFs = ((IcebergFileStreamHelper) 
this.getFsHelper()).getFileSystemForPath(sourcePath);
+    FileStatus originStatus = originFs.getFileStatus(sourcePath);
+
+    // Compute partition-aware destination path
+    String finalDir = 
this.workUnitState.getProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR);
+    if (StringUtils.isBlank(finalDir)) {
+      throw new IOException("Required configuration '" + 
ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR + "' is not set. "
+          + "Cannot determine destination path for file: " + filePath);
+    }
+    Path destinationPath = computeDestinationPath(filePath, finalDir, 
sourcePath.getName());
+
+    // Build CopyableFile using cached targetFs and copyConfiguration 
(initialized once in constructor)
+    CopyableFile copyableFile = 
CopyableFile.fromOriginAndDestination(originFs, originStatus, destinationPath, 
this.copyConfiguration).build();
+
+    FileAwareInputStream fileAwareInputStream = FileAwareInputStream.builder()
+        .file(copyableFile)
+        .inputStream(inputStream)
+        .split(Optional.absent())
+        .build();
+
+    return Collections.singletonList(fileAwareInputStream).iterator();
+  }
+
+  /**
+   * Compute destination path with partition awareness.
+   *
+   * <p>If partition metadata is available for this file, the destination path 
will include
+   * the partition path: {@code <finalDir>/<partitionPath>/<filename>}</p>
+   *
+   * <p>Otherwise, the file is placed directly under finalDir: {@code 
<finalDir>/<filename>}</p>
+   *
+   * @param sourceFilePath the source file path
+   * @param finalDir the final directory from configuration
+   * @param fileName the file name
+   * @return the computed destination path
+   */
+  private Path computeDestinationPath(String sourceFilePath, String finalDir, 
String fileName) {
+    String partitionPath = fileToPartitionPathMap.get(sourceFilePath);
+
+    if (!StringUtils.isBlank(partitionPath)) {
+      // Partition-aware path: <finalDir>/<partitionPath>/<filename>
+      // Example: /data/table1/datepartition=2025-04-01/file.orc
+      Path destinationPath = new Path(new Path(finalDir, partitionPath), 
fileName);
+      log.info("Computed partition-aware destination: {} -> {}", 
sourceFilePath, destinationPath);
+      return destinationPath;
+    } else {
+      // No partition info: <finalDir>/<filename>
+      Path destinationPath = new Path(finalDir, fileName);
+      log.info("Computed flat destination (no partition): {} -> {}", 
sourceFilePath, destinationPath);
+      return destinationPath;
+    }
   }
 
 }
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamHelper.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamHelper.java
index ebe69ef1ce..e3da2b5506 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamHelper.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamHelper.java
@@ -113,7 +113,18 @@ public class IcebergFileStreamHelper implements 
TimestampAwareFileBasedHelper {
     }
   }
 
-  private FileSystem getFileSystemForPath(Path path) throws IOException {
+  /**
+   * Get FileSystem for a given path, reusing the default FileSystem when 
possible.
+   * 
+   * <p>This method avoids creating unnecessary FileSystem instances by 
reusing the
+   * default FileSystem for paths with the same scheme. Only creates a new 
FileSystem
+   * if the path uses a different scheme (e.g., reading from HDFS while 
default is local).
+   * 
+   * @param path the path to get FileSystem for
+   * @return FileSystem instance (either the default or a new one for 
cross-scheme access)
+   * @throws IOException if FileSystem cannot be created
+   */
+  public FileSystem getFileSystemForPath(Path path) throws IOException {
     // If path has a different scheme than the default FileSystem, get 
scheme-specific FS
     if (path.toUri().getScheme() != null &&
       !path.toUri().getScheme().equals(fileSystem.getUri().getScheme())) {
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamExtractorTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamExtractorTest.java
new file mode 100644
index 0000000000..ef885bec18
--- /dev/null
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamExtractorTest.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.copy.FileAwareInputStream;
+import org.apache.gobblin.source.workunit.Extract;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+/**
+* Tests for {@link IcebergFileStreamExtractor}.
+*/
+public class IcebergFileStreamExtractorTest {
+
+  private File tempDir;
+  private File testFile;
+  private IcebergFileStreamExtractor extractor;
+  private WorkUnitState workUnitState;
+
+  @BeforeMethod
+  public void setUp() throws Exception {
+    // Create temp directory and test file
+    tempDir = new File(System.getProperty("java.io.tmpdir"), 
"iceberg-extractor-test-" + System.currentTimeMillis());
+    tempDir.mkdirs();
+
+    testFile = new File(tempDir, "test-data.parquet");
+    try (FileOutputStream fos = new FileOutputStream(testFile)) {
+      fos.write("Test data content for streaming".getBytes());
+    }
+
+    // Set up work unit state
+    Properties properties = new Properties();
+    properties.setProperty(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL, 
testFile.getAbsolutePath());
+    properties.setProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, 
tempDir.getAbsolutePath() + "/final");
+    properties.setProperty(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, 
"file:///");
+    properties.setProperty(ConfigurationKeys.WRITER_STAGING_DIR, 
tempDir.getAbsolutePath() + "/staging");
+    properties.setProperty(ConfigurationKeys.WRITER_OUTPUT_DIR, 
tempDir.getAbsolutePath() + "/output");
+
+    WorkUnit workUnit = WorkUnit.create(
+      new Extract(Extract.TableType.SNAPSHOT_ONLY, "test_namespace", 
"test_table")
+    );
+    workUnitState = new WorkUnitState(workUnit, new 
org.apache.gobblin.configuration.State(properties));
+
+    // Initialize extractor
+    extractor = new IcebergFileStreamExtractor(workUnitState);
+  }
+
+  @AfterMethod
+  public void tearDown() throws Exception {
+    if (extractor != null) {
+      extractor.close();
+    }
+    // Clean up temp files
+    if (tempDir != null && tempDir.exists()) {
+      deleteDirectory(tempDir);
+    }
+  }
+
+  @Test
+  public void testGetSchema() throws Exception {
+    String schema = extractor.getSchema();
+    Assert.assertEquals(schema, "FileAwareInputStream",
+        "Schema should be FileAwareInputStream for file streaming mode");
+  }
+
+  @Test
+  public void testDownloadFile() throws Exception {
+    // Test downloading a file
+    Iterator<FileAwareInputStream> iterator = 
extractor.downloadFile(testFile.getAbsolutePath());
+
+    Assert.assertTrue(iterator.hasNext(), "Should return at least one 
FileAwareInputStream");
+
+    FileAwareInputStream fais = iterator.next();
+    Assert.assertNotNull(fais, "FileAwareInputStream should not be null");
+    Assert.assertNotNull(fais.getFile(), "CopyableFile should not be null");
+    Assert.assertNotNull(fais.getInputStream(), "InputStream should not be 
null");
+
+    // Verify no more items
+    Assert.assertFalse(iterator.hasNext(), "Should only return one 
FileAwareInputStream per file");
+  }
+
+  @Test
+  public void testDownloadFileWithCopyableFileMetadata() throws Exception {
+    Iterator<FileAwareInputStream> iterator = 
extractor.downloadFile(testFile.getAbsolutePath());
+    FileAwareInputStream fais = iterator.next();
+
+    // Verify CopyableFile has correct metadata
+    Assert.assertNotNull(fais.getFile().getOrigin(), "Origin FileStatus should 
be set");
+    Assert.assertNotNull(fais.getFile().getDestination(), "Destination should 
be set");
+
+    // Verify origin path matches source
+    
Assert.assertTrue(fais.getFile().getOrigin().getPath().toString().contains(testFile.getName()),
+        "Origin path should contain test file name");
+
+    // Verify destination path is under final dir
+    String finalDir = 
workUnitState.getProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR);
+    
Assert.assertTrue(fais.getFile().getDestination().toString().contains(finalDir),
+        "Destination should be under final dir");
+    
Assert.assertTrue(fais.getFile().getDestination().toString().contains(testFile.getName()),
+        "Destination should contain test file name");
+  }
+
+  @Test
+  public void testDownloadFileStreamIsReadable() throws Exception {
+    Iterator<FileAwareInputStream> iterator = 
extractor.downloadFile(testFile.getAbsolutePath());
+    FileAwareInputStream fais = iterator.next();
+
+    // Verify we can read from the stream
+    byte[] buffer = new byte[1024];
+    int bytesRead = fais.getInputStream().read(buffer);
+    Assert.assertTrue(bytesRead > 0, "Should be able to read bytes from 
stream");
+
+    String content = new String(buffer, 0, bytesRead);
+    Assert.assertTrue(content.contains("Test data content"), "Stream content 
should match file content");
+  }
+
+  @Test(expectedExceptions = IOException.class)
+  public void testDownloadNonExistentFile() throws Exception {
+    // Test error handling for non-existent file
+    extractor.downloadFile("/non/existent/path/file.parquet");
+  }
+
+  @Test
+  public void testDownloadMultipleFiles() throws Exception {
+    // Create second test file
+    File testFile2 = new File(tempDir, "test-data-2.parquet");
+    try (FileOutputStream fos = new FileOutputStream(testFile2)) {
+      fos.write("Second test file content".getBytes());
+    }
+
+    // Download first file
+    Iterator<FileAwareInputStream> iterator1 = 
extractor.downloadFile(testFile.getAbsolutePath());
+    Assert.assertTrue(iterator1.hasNext());
+    FileAwareInputStream fais1 = iterator1.next();
+    Assert.assertNotNull(fais1);
+
+    // Close first extractor and create new one for second file (single file 
per extractor)
+    extractor.close();
+    extractor = new IcebergFileStreamExtractor(workUnitState);
+
+    // Download second file
+    Iterator<FileAwareInputStream> iterator2 = 
extractor.downloadFile(testFile2.getAbsolutePath());
+    Assert.assertTrue(iterator2.hasNext());
+    FileAwareInputStream fais2 = iterator2.next();
+    Assert.assertNotNull(fais2);
+
+    // Verify different files
+    Assert.assertNotEquals(fais1.getFile().getOrigin().getPath().getName(),
+    fais2.getFile().getOrigin().getPath().getName(),
+        "Should process different files");
+  }
+
+  @Test
+  public void testFileMetadataPreservation() throws Exception {
+    Iterator<FileAwareInputStream> iterator = 
extractor.downloadFile(testFile.getAbsolutePath());
+    FileAwareInputStream fais = iterator.next();
+
+    // Verify origin file status is captured
+    Assert.assertTrue(fais.getFile().getOrigin().isFile(),
+        "Origin should be a file, not directory");
+    Assert.assertTrue(fais.getFile().getOrigin().getLen() > 0,
+        "Origin file size should be greater than 0");
+  }
+
+  @Test
+  public void testPartitionAwareDestinationPath() throws Exception {
+    // Test that partition path is correctly included in destination
+    // Set up partition path mapping in work unit
+    String partitionPath = "datepartition=2025-04-01";
+    String fileToPartitionJson = String.format("{\"%s\":\"%s\"}", 
testFile.getAbsolutePath(), partitionPath);
+
+    Properties propsWithPartition = new Properties();
+    propsWithPartition.putAll(workUnitState.getProperties());
+    propsWithPartition.setProperty(IcebergSource.ICEBERG_FILE_PARTITION_PATH, 
fileToPartitionJson);
+
+    WorkUnit workUnit = WorkUnit.create(
+        new Extract(Extract.TableType.SNAPSHOT_ONLY, "test_namespace", 
"test_table"));
+    WorkUnitState wuStateWithPartition = new WorkUnitState(workUnit,
+        new org.apache.gobblin.configuration.State(propsWithPartition));
+
+    // Create new extractor with partition mapping
+    IcebergFileStreamExtractor extractorWithPartition = new 
IcebergFileStreamExtractor(wuStateWithPartition);
+
+    try {
+      Iterator<FileAwareInputStream> iterator = 
extractorWithPartition.downloadFile(testFile.getAbsolutePath());
+      FileAwareInputStream fais = iterator.next();
+
+      // Verify destination includes partition path
+      String destinationPath = fais.getFile().getDestination().toString();
+      Assert.assertTrue(destinationPath.contains(partitionPath),
+          "Destination should contain partition path: " + partitionPath);
+      Assert.assertTrue(destinationPath.contains(testFile.getName()),
+          "Destination should contain file name");
+
+      // Verify path structure: <finalDir>/<partitionPath>/<filename>
+      String expectedPathSubstring = partitionPath + "/" + testFile.getName();
+      Assert.assertTrue(destinationPath.contains(expectedPathSubstring),
+          "Destination should have structure: " + expectedPathSubstring);
+    } finally {
+      extractorWithPartition.close();
+    }
+  }
+
+  @Test
+  public void testMultiplePartitionAwareFiles() throws Exception {
+    // Test multiple files with different partitions
+    File testFile2 = new File(tempDir, "test-data-2.parquet");
+    try (FileOutputStream fos = new FileOutputStream(testFile2)) {
+      fos.write("Second partition data".getBytes());
+    }
+
+    // Set up partition mappings for both files
+    String partition1 = "datepartition=2025-04-01";
+    String partition2 = "datepartition=2025-04-02";
+    String fileToPartitionJson = String.format("{\"%s\":\"%s\", 
\"%s\":\"%s\"}",
+        testFile.getAbsolutePath(), partition1,
+        testFile2.getAbsolutePath(), partition2);
+
+    Properties propsWithPartitions = new Properties();
+    propsWithPartitions.putAll(workUnitState.getProperties());
+    propsWithPartitions.setProperty(IcebergSource.ICEBERG_FILE_PARTITION_PATH, 
fileToPartitionJson);
+
+    WorkUnit workUnit = WorkUnit.create(
+        new Extract(Extract.TableType.SNAPSHOT_ONLY, "test_namespace", 
"test_table"));
+    WorkUnitState wuStateWithPartitions = new WorkUnitState(workUnit,
+        new org.apache.gobblin.configuration.State(propsWithPartitions));
+
+    IcebergFileStreamExtractor extractorWithPartitions = new 
IcebergFileStreamExtractor(wuStateWithPartitions);
+
+    try {
+      // Process first file
+      Iterator<FileAwareInputStream> iterator1 = 
extractorWithPartitions.downloadFile(testFile.getAbsolutePath());
+      FileAwareInputStream fais1 = iterator1.next();
+      
Assert.assertTrue(fais1.getFile().getDestination().toString().contains(partition1),
+      "First file should map to partition1");
+
+      // Close and create new extractor for second file
+      extractorWithPartitions.close();
+      extractorWithPartitions = new 
IcebergFileStreamExtractor(wuStateWithPartitions);
+
+      // Process second file
+      Iterator<FileAwareInputStream> iterator2 = 
extractorWithPartitions.downloadFile(testFile2.getAbsolutePath());
+      FileAwareInputStream fais2 = iterator2.next();
+      
Assert.assertTrue(fais2.getFile().getDestination().toString().contains(partition2),
+          "Second file should map to partition2");
+    } finally {
+      extractorWithPartitions.close();
+    }
+  }
+
+  @Test
+  public void testNoPartitionMappingFallback() throws Exception {
+    // Test that when no partition mapping is provided, files go directly 
under finalDir
+    // (extractor was already created without partition mapping in setUp)
+    Iterator<FileAwareInputStream> iterator = 
extractor.downloadFile(testFile.getAbsolutePath());
+    FileAwareInputStream fais = iterator.next();
+
+    String destinationPath = fais.getFile().getDestination().toString();
+    String finalDir = 
workUnitState.getProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR);
+
+    // Should be: <finalDir>/<filename> (no partition subdirectory)
+    String expectedPath = finalDir + "/" + testFile.getName();
+    Assert.assertEquals(destinationPath, expectedPath,
+        "Without partition mapping, destination should be directly under 
finalDir");
+  }
+
+  @Test(expectedExceptions = IOException.class)
+  public void testMalformedPartitionJsonThrowsException() throws Exception {
+    // Test that malformed JSON in partition path mapping throws a clear 
exception
+    Properties propsWithMalformedJson = new Properties();
+    propsWithMalformedJson.putAll(workUnitState.getProperties());
+    
propsWithMalformedJson.setProperty(IcebergSource.ICEBERG_FILE_PARTITION_PATH, 
"{invalid json missing quote");
+
+    WorkUnit workUnit = WorkUnit.create(
+        new Extract(Extract.TableType.SNAPSHOT_ONLY, "test_namespace", 
"test_table")
+    );
+    WorkUnitState wuStateWithMalformedJson = new WorkUnitState(workUnit,
+        new org.apache.gobblin.configuration.State(propsWithMalformedJson));
+
+    // Should throw IOException wrapping JsonSyntaxException with clear error 
message
+    IcebergFileStreamExtractor testExtractor = null;
+    try {
+      testExtractor = new IcebergFileStreamExtractor(wuStateWithMalformedJson);
+      Assert.fail("Should throw IOException for malformed partition JSON");
+    } catch (IOException e) {
+      // Verify error message is informative
+      Assert.assertTrue(e.getMessage().contains("Failed to parse partition 
path mapping"),
+          "Error message should indicate JSON parsing failure");
+      Assert.assertTrue(e.getMessage().contains("invalid json missing quote"),
+          "Error message should include the malformed JSON snippet");
+      Assert.assertTrue(e.getCause() instanceof 
com.google.gson.JsonSyntaxException,
+          "Root cause should be JsonSyntaxException");
+      throw e; // Re-throw for @Test(expectedExceptions)
+    } finally {
+      if (testExtractor != null) {
+        testExtractor.close();
+      }
+    }
+  }
+
+  private void deleteDirectory(File directory) {
+    File[] files = directory.listFiles();
+    if (files != null) {
+      for (File file : files) {
+        if (file.isDirectory()) {
+          deleteDirectory(file);
+        } else {
+          file.delete();
+        }
+      }
+    }
+    directory.delete();
+  }
+}
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSourceTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSourceTest.java
index b4e6bc7eae..248108c0c8 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSourceTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSourceTest.java
@@ -172,6 +172,8 @@ public class IcebergSourceTest {
     State jobState = new State(properties);
     WorkUnitState workUnitState = new WorkUnitState(dummyWu, jobState);
     workUnitState.setProp(IcebergSource.ICEBERG_RECORD_PROCESSING_ENABLED, 
"false");
+    // IcebergFileStreamExtractor requires data.publisher.final.dir for 
CopyConfiguration
+    workUnitState.setProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, 
"/tmp/test-destination");
 
     Extractor<String, FileAwareInputStream> extractor = 
icebergSource.getExtractor(workUnitState);
     // Verify correct extractor type

Reply via email to