pvary commented on code in PR #13831:
URL: https://github.com/apache/iceberg/pull/13831#discussion_r2298142793


##########
flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java:
##########
@@ -74,27 +79,183 @@ void testDeleteMissingFile() throws Exception {
     Path dummyFile =
         FileSystems.getDefault().getPath(table.location().substring(5), 
DUMMY_FILE_NAME);
 
-    deleteFile(tableLoader(), dummyFile.toString());
+    deleteFile(tableLoader(), dummyFile.toString(), true /* expectSuccess */);
 
     assertThat(listFiles(table)).isEqualTo(TABLE_FILES);
   }
 
   @Test
   void testInvalidURIScheme() throws Exception {
-    deleteFile(tableLoader(), "wrong://");
+    deleteFile(tableLoader(), "wrong://", false /* expectFail */);
 
     assertThat(listFiles(table)).isEqualTo(TABLE_FILES);
   }
 
-  private void deleteFile(TableLoader tableLoader, String fileName) throws 
Exception {
-    tableLoader().open();
+  @Test
+  void testDeleteNonExistentFile() throws Exception {
+    String nonexistentFile = "nonexistentFile.txt";
+
+    deleteFile(tableLoader(), nonexistentFile, true /* expectSuccess */);
+
+    assertThat(listFiles(table)).isEqualTo(TABLE_FILES);
+  }
+
+  @Test
+  void testDelete10MBFile() throws Exception {
+    // Simulate a large file (e.g., 10MB file)
+    String largeFileName = "largeFile.txt";
+    Path largeFile = Path.of(tablePath(table).toString(), largeFileName);
+
+    // Write a large file to disk (this will simulate the large file in the 
filesystem)
+    byte[] largeData = new byte[1024 * 1024 * 10]; // 10 MB
+    Files.write(largeFile, largeData);
+
+    // Verify that the file was created
+    Set<String> files = listFiles(table);
+    assertThat(files).contains(largeFileName);
+
+    // Use the DeleteFilesProcessor to delete the large file
+    deleteFile(tableLoader(), largeFile.toString(), true /* expectSuccess */);
+
+    // Verify that the large file has been deleted
+    files = listFiles(table);
+    assertThat(files).doesNotContain(largeFileName);
+  }
+
+  @Test
+  void testBatchDelete() throws Exception {
+    // Simulate adding multiple files
+    Set<String> filesToDelete = Sets.newHashSet(TABLE_FILES);
+    filesToDelete.add("file1.txt");
+    filesToDelete.add("file2.txt");
+
+    // Use a smaller batch size to trigger batch deletion logic
+    DeleteFilesProcessor deleteFilesProcessor =
+        new DeleteFilesProcessor(table, DUMMY_TASK_NAME, 0, 2);
     try (OneInputStreamOperatorTestHarness<String, Void> testHarness =
-        new OneInputStreamOperatorTestHarness<>(
-            new DeleteFilesProcessor(table, DUMMY_TASK_NAME, 0, 10), 
StringSerializer.INSTANCE)) {
+        new OneInputStreamOperatorTestHarness<>(deleteFilesProcessor, 
StringSerializer.INSTANCE)) {
       testHarness.open();
-      testHarness.processElement(fileName, System.currentTimeMillis());
+
+      for (String file : filesToDelete) {
+        testHarness.processElement(file, System.currentTimeMillis());
+      }
+
+      testHarness.processWatermark(EVENT_TIME);
+      testHarness.endInput();
+
+      // Verify that files are deleted
+      assertThat(listFiles(table)).isEqualTo(TABLE_FILES);
+      assertThat(deleteFilesProcessor.getSucceededCounter().getCount())
+          .isEqualTo(filesToDelete.size());
+      
assertThat(deleteFilesProcessor.getFailedCounter().getCount()).isEqualTo(0);
+      
assertThat(deleteFilesProcessor.getDeleteFileTimeMsHistogram().getStatistics().getMean())
+          .isGreaterThan(0);
+    } finally {
+      deleteFilesProcessor.close();
+    }
+  }
+
+  @Test
+  void testConcurrentDelete() throws Exception {
+    Path root = tablePath(table);
+
+    // Generate 30 test files: delete-0.txt ... delete-29.txt
+    Set<String> targets = Sets.newHashSet();
+    for (int i = 0; i < 30; i++) {
+      targets.add("delete-" + i + ".txt");
+    }
+
+    for (String f : targets) {
+      Files.write(root.resolve(f), f.getBytes(StandardCharsets.UTF_8));
+    }
+    assertThat(listFiles(table)).containsAll(targets);
+
+    DeleteFilesProcessor p1 = new DeleteFilesProcessor(table, DUMMY_TASK_NAME 
+ "-p1", 0, 2);
+    DeleteFilesProcessor p2 = new DeleteFilesProcessor(table, DUMMY_TASK_NAME 
+ "-p2", 0, 2);
+
+    // Two processors that will try to delete the same files concurrently
+    try (OneInputStreamOperatorTestHarness<String, Void> h1 =
+            new OneInputStreamOperatorTestHarness<>(p1, 
StringSerializer.INSTANCE);
+        OneInputStreamOperatorTestHarness<String, Void> h2 =
+            new OneInputStreamOperatorTestHarness<>(p2, 
StringSerializer.INSTANCE)) {
+      h1.open();
+      h2.open();
+
+      // One barrier per file: ensures p1 and p2 try to delete the same file 
at the same time
+      Map<String, CyclicBarrier> barriers = Maps.newHashMap();
+      targets.forEach(f -> barriers.put(f, new CyclicBarrier(2)));
+
+      long ts = System.currentTimeMillis();
+
+      Thread t1 =
+          new Thread(
+              () -> {
+                try {
+                  for (String f : targets) {
+                    barriers.get(f).await(2, TimeUnit.SECONDS);
+                    h1.processElement(f, ts);
+                  }
+                  h1.processWatermark(EVENT_TIME);
+                  h1.endInput();
+                } catch (Exception ignored) {
+                }
+              },
+              "deleter-p1");

Review Comment:
   Do we have to do these in threads? Could we just advance the processors in a 
loop?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to