phet commented on code in PR #4058:
URL: https://github.com/apache/gobblin/pull/4058#discussion_r1809524465


##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java:
##########
@@ -333,4 +348,119 @@ protected static <T> void verifyAnyOrder(Collection<T> 
actual, Collection<T> exp
   protected static <T, C extends Collection<T>> List<T> flatten(Collection<C> 
cc) {
     return cc.stream().flatMap(x -> x.stream()).collect(Collectors.toList());
   }
+
+  @Test
+  public void testGetPartitionSpecificDataFiles() throws IOException {
+    TableIdentifier testTableId = TableIdentifier.of(dbName, 
"testTableForPartitionSpecificDataFiles");
+    Table testTable = catalog.createTable(testTableId, icebergSchema, 
icebergPartitionSpec);
+
+    List<String> paths = Arrays.asList(
+        "/path/tableName/data/id=1/file1.orc",
+        "/path/tableName/data/id=1/file3.orc",
+        "/path/tableName/data/id=1/file5.orc",
+        "/path/tableName/data/id=1/file4.orc",
+        "/path/tableName/data/id=1/file2.orc"
+    );
+    // Using the schema defined in start of this class
+    PartitionData partitionData = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData.set(0, "1");
+    List<PartitionData> partitionDataList = Collections.nCopies(paths.size(), 
partitionData);
+
+    addPartitionDataFiles(testTable, paths, partitionDataList);
+
+    IcebergTable icebergTable = new IcebergTable(testTableId,
+        catalog.newTableOps(testTableId),
+        catalogUri,
+        catalog.loadTable(testTableId));
+    // Using AlwaysTrue & AlwaysFalse Predicate to avoid mocking of predicate 
class
+    Predicate<StructLike> alwaysTruePredicate = partition -> true;
+    Predicate<StructLike> alwaysFalsePredicate = partition -> false;
+    
Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysTruePredicate).size(),
 5);
+    
Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysFalsePredicate).size(),
 0);
+
+    catalog.dropTable(testTableId);
+  }
+
+  @Test
+  public void testOverwritePartition() throws IOException {
+    TableIdentifier overwritePartitionTestTableId = TableIdentifier.of(dbName, 
"testTableForOverwritePartition");
+    Table testTable = catalog.createTable(overwritePartitionTestTableId, 
icebergSchema, icebergPartitionSpec);
+
+    List<String> paths = Arrays.asList(
+        "/path/tableName/data/id=1/file1.orc",
+        "/path/tableName/data/id=1/file2.orc"
+    );
+    // Using the schema defined in start of this class
+    PartitionData partitionData = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData.set(0, "1");
+    PartitionData partitionData2 = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData2.set(0, "1");
+    List<PartitionData> partitionDataList = Arrays.asList(partitionData, 
partitionData2);
+
+    addPartitionDataFiles(testTable, paths, partitionDataList);
+
+    IcebergTable icebergTable = new IcebergTable(overwritePartitionTestTableId,
+        catalog.newTableOps(overwritePartitionTestTableId),
+        catalogUri,
+        catalog.loadTable(overwritePartitionTestTableId));
+
+    verifyAnyOrder(paths, 
icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths 
should match");
+
+    List<String> paths2 = Arrays.asList(
+        "/path/tableName/data/id=2/file3.orc",
+        "/path/tableName/data/id=2/file4.orc"
+    );
+    // Using the schema defined in start of this class
+    PartitionData partitionData3 = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData3.set(0, "2");
+    PartitionData partitionData4 = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData4.set(0, "2");
+    List<PartitionData> partitionDataList2 = Arrays.asList(partitionData3, 
partitionData4);
+
+    List<DataFile> dataFiles2 = getDataFiles(paths2, partitionDataList2);
+    // here, since partition data with value 2 doesn't exist yet, we expect it 
to get added to the table
+    icebergTable.overwritePartition(dataFiles2, "id", "2");
+    List<String> expectedPaths2 = new ArrayList<>(paths);
+    expectedPaths2.addAll(paths2);
+    verifyAnyOrder(expectedPaths2, 
icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths 
should match");
+
+    List<String> paths3 = Arrays.asList(
+      "/path/tableName/data/id=1/file5.orc",
+      "/path/tableName/data/id=1/file6.orc"
+    );
+    // Reusing same partition dats to create data file with different paths
+    List<DataFile> dataFiles3 = getDataFiles(paths3, partitionDataList);
+    // here, since partition data with value 1 already exists, we expect it to 
get updated in the table with newer path
+    icebergTable.overwritePartition(dataFiles3, "id", "1");
+    List<String> expectedPaths3 = new ArrayList<>(paths2);
+    expectedPaths3.addAll(paths3);
+    verifyAnyOrder(expectedPaths3, 
icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths 
should match");
+
+    catalog.dropTable(overwritePartitionTestTableId);
+  }
+
+  private static void addPartitionDataFiles(Table table, List<String> paths, 
List<PartitionData> partitionDataList) {
+    Assert.assertEquals(paths.size(), partitionDataList.size());
+    getDataFiles(paths, partitionDataList).forEach(dataFile -> 
table.newAppend().appendFile(dataFile).commit());
+  }

Review Comment:
   debatable whether a need for this abstraction, as it's just a `.forEach` 
call.  the confusing params might initially trick us to believe it's doing more 
than that.
   
   the impl should be:
   ```
   void addPartitionDataFiles(Table table, List<DataFile> dataFiles) {
     dataFiles.forEach(df -> table.newAppend().appendFile(df).commit());
   }
   ```
   
   then call it as:
   ```
   addPartitionDataFiles(table, createDataFiles(partitionDataByPath))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to