[ 
https://issues.apache.org/jira/browse/GOBBLIN-2159?focusedWorklogId=939320&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-939320
 ]

ASF GitHub Bot logged work on GOBBLIN-2159:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Oct/24 21:55
            Start Date: 21/Oct/24 21:55
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #4058:
URL: https://github.com/apache/gobblin/pull/4058#discussion_r1809524465


##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java:
##########
@@ -333,4 +348,119 @@ protected static <T> void verifyAnyOrder(Collection<T> 
actual, Collection<T> exp
   protected static <T, C extends Collection<T>> List<T> flatten(Collection<C> 
cc) {
     return cc.stream().flatMap(x -> x.stream()).collect(Collectors.toList());
   }
+
+  @Test
+  public void testGetPartitionSpecificDataFiles() throws IOException {
+    TableIdentifier testTableId = TableIdentifier.of(dbName, 
"testTableForPartitionSpecificDataFiles");
+    Table testTable = catalog.createTable(testTableId, icebergSchema, 
icebergPartitionSpec);
+
+    List<String> paths = Arrays.asList(
+        "/path/tableName/data/id=1/file1.orc",
+        "/path/tableName/data/id=1/file3.orc",
+        "/path/tableName/data/id=1/file5.orc",
+        "/path/tableName/data/id=1/file4.orc",
+        "/path/tableName/data/id=1/file2.orc"
+    );
+    // Using the schema defined in start of this class
+    PartitionData partitionData = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData.set(0, "1");
+    List<PartitionData> partitionDataList = Collections.nCopies(paths.size(), 
partitionData);
+
+    addPartitionDataFiles(testTable, paths, partitionDataList);
+
+    IcebergTable icebergTable = new IcebergTable(testTableId,
+        catalog.newTableOps(testTableId),
+        catalogUri,
+        catalog.loadTable(testTableId));
+    // Using AlwaysTrue & AlwaysFalse Predicate to avoid mocking of predicate 
class
+    Predicate<StructLike> alwaysTruePredicate = partition -> true;
+    Predicate<StructLike> alwaysFalsePredicate = partition -> false;
+    
Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysTruePredicate).size(),
 5);
+    
Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysFalsePredicate).size(),
 0);
+
+    catalog.dropTable(testTableId);
+  }
+
+  @Test
+  public void testOverwritePartition() throws IOException {
+    TableIdentifier overwritePartitionTestTableId = TableIdentifier.of(dbName, 
"testTableForOverwritePartition");
+    Table testTable = catalog.createTable(overwritePartitionTestTableId, 
icebergSchema, icebergPartitionSpec);
+
+    List<String> paths = Arrays.asList(
+        "/path/tableName/data/id=1/file1.orc",
+        "/path/tableName/data/id=1/file2.orc"
+    );
+    // Using the schema defined in start of this class
+    PartitionData partitionData = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData.set(0, "1");
+    PartitionData partitionData2 = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData2.set(0, "1");
+    List<PartitionData> partitionDataList = Arrays.asList(partitionData, 
partitionData2);
+
+    addPartitionDataFiles(testTable, paths, partitionDataList);
+
+    IcebergTable icebergTable = new IcebergTable(overwritePartitionTestTableId,
+        catalog.newTableOps(overwritePartitionTestTableId),
+        catalogUri,
+        catalog.loadTable(overwritePartitionTestTableId));
+
+    verifyAnyOrder(paths, 
icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths 
should match");
+
+    List<String> paths2 = Arrays.asList(
+        "/path/tableName/data/id=2/file3.orc",
+        "/path/tableName/data/id=2/file4.orc"
+    );
+    // Using the schema defined in start of this class
+    PartitionData partitionData3 = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData3.set(0, "2");
+    PartitionData partitionData4 = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData4.set(0, "2");
+    List<PartitionData> partitionDataList2 = Arrays.asList(partitionData3, 
partitionData4);
+
+    List<DataFile> dataFiles2 = getDataFiles(paths2, partitionDataList2);
+    // here, since partition data with value 2 doesn't exist yet, we expect it 
to get added to the table
+    icebergTable.overwritePartition(dataFiles2, "id", "2");
+    List<String> expectedPaths2 = new ArrayList<>(paths);
+    expectedPaths2.addAll(paths2);
+    verifyAnyOrder(expectedPaths2, 
icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths 
should match");
+
+    List<String> paths3 = Arrays.asList(
+      "/path/tableName/data/id=1/file5.orc",
+      "/path/tableName/data/id=1/file6.orc"
+    );
+    // Reusing same partition dats to create data file with different paths
+    List<DataFile> dataFiles3 = getDataFiles(paths3, partitionDataList);
+    // here, since partition data with value 1 already exists, we expect it to 
get updated in the table with newer path
+    icebergTable.overwritePartition(dataFiles3, "id", "1");
+    List<String> expectedPaths3 = new ArrayList<>(paths2);
+    expectedPaths3.addAll(paths3);
+    verifyAnyOrder(expectedPaths3, 
icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths 
should match");
+
+    catalog.dropTable(overwritePartitionTestTableId);
+  }
+
+  private static void addPartitionDataFiles(Table table, List<String> paths, 
List<PartitionData> partitionDataList) {
+    Assert.assertEquals(paths.size(), partitionDataList.size());
+    getDataFiles(paths, partitionDataList).forEach(dataFile -> 
table.newAppend().appendFile(dataFile).commit());
+  }

Review Comment:
   debatable whether a need for this abstraction, as it's just a `.forEach` 
call.  the confusing params might initially trick us to believe it's doing more 
than that.
   
   the impl should be:
   ```
   void addPartitionDataFiles(Table table, List<DataFile> dataFiles) {
     dataFiles.forEach(df -> table.newAppend().appendFile(df).commit());
   }
   ```
   
   then call it as:
   ```
   addPartitionDataFiles(table, createDataFiles(partitionDataByPath))
   ```





Issue Time Tracking
-------------------

    Worklog Id:     (was: 939320)
    Time Spent: 7h 10m  (was: 7h)

> Support Partition Based Copy in Iceberg Distcp
> ----------------------------------------------
>
>                 Key: GOBBLIN-2159
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2159
>             Project: Apache Gobblin
>          Issue Type: Task
>            Reporter: Vivek Rai
>            Priority: Major
>          Time Spent: 7h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to