This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.6
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 8378a1777134695fc91d2501cf6f0568f5a4ab28
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jan 10 13:54:55 2024 +0800

    [core] Fix index file checkFilesExistence in TableCommitImpl (#2668)
---
 .../apache/paimon/table/sink/TableCommitImpl.java  |  4 +++-
 .../apache/paimon/table/sink/TableCommitTest.java  | 22 ++++++++++++++++++----
 2 files changed, 21 insertions(+), 5 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 83914744b..bfc52ec5e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -37,6 +37,7 @@ import org.apache.paimon.utils.ExecutorThreadFactory;
 import org.apache.paimon.utils.FileUtils;
 import org.apache.paimon.utils.IOUtils;
 import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.PathFactory;
 
 import 
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
 
@@ -252,6 +253,7 @@ public class TableCommitImpl implements InnerTableCommit {
     private void checkFilesExistence(List<ManifestCommittable> committables) {
         List<Path> files = new ArrayList<>();
         Map<Pair<BinaryRow, Integer>, DataFilePathFactory> factoryMap = new 
HashMap<>();
+        PathFactory indexFileFactory = commit.pathFactory().indexFileFactory();
         for (ManifestCommittable committable : committables) {
             for (CommitMessage message : committable.fileCommittables()) {
                 CommitMessageImpl msg = (CommitMessageImpl) message;
@@ -270,7 +272,7 @@ public class TableCommitImpl implements InnerTableCommit {
                 msg.compactIncrement().compactAfter().forEach(collector);
                 msg.indexIncrement().newIndexFiles().stream()
                         .map(IndexFileMeta::fileName)
-                        .map(pathFactory::toPath)
+                        .map(indexFileFactory::toPath)
                         .forEach(files::add);
             }
         }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
index cba062894..362b40301 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
@@ -67,19 +67,28 @@ public class TableCommitTest {
     private static final Map<String, Set<Long>> commitCallbackResult = new 
ConcurrentHashMap<>();
 
     @Test
-    public void testCommitCallbackWithFailure() throws Exception {
+    public void testCommitCallbackWithFailureFixedBucket() throws Exception {
+        innerTestCommitCallbackWithFailure(1);
+    }
+
+    @Test
+    public void testCommitCallbackWithFailureDynamicBucket() throws Exception {
+        innerTestCommitCallbackWithFailure(-1);
+    }
+
+    private void innerTestCommitCallbackWithFailure(int bucket) throws 
Exception {
         int numIdentifiers = 30;
         String testId = UUID.randomUUID().toString();
         commitCallbackResult.put(testId, new HashSet<>());
 
         try {
-            testCommitCallbackWithFailureImpl(numIdentifiers, testId);
+            testCommitCallbackWithFailureImpl(bucket, numIdentifiers, testId);
         } finally {
             commitCallbackResult.remove(testId);
         }
     }
 
-    private void testCommitCallbackWithFailureImpl(int numIdentifiers, String 
testId)
+    private void testCommitCallbackWithFailureImpl(int bucket, int 
numIdentifiers, String testId)
             throws Exception {
         String failingName = UUID.randomUUID().toString();
         // no failure when creating table and writing data
@@ -94,6 +103,7 @@ public class TableCommitTest {
 
         Options conf = new Options();
         conf.set(CoreOptions.PATH, path);
+        conf.set(CoreOptions.BUCKET, bucket);
         conf.set(CoreOptions.COMMIT_CALLBACKS, 
TestCommitCallback.class.getName());
         conf.set(
                 CoreOptions.COMMIT_CALLBACK_PARAM
@@ -121,7 +131,11 @@ public class TableCommitTest {
         StreamTableWrite write = table.newWrite(commitUser);
         Map<Long, List<CommitMessage>> commitMessages = new HashMap<>();
         for (int i = 0; i < numIdentifiers; i++) {
-            write.write(GenericRow.of(i, i * 1000L));
+            if (bucket == -1) {
+                write.write(GenericRow.of(i, i * 1000L), 0);
+            } else {
+                write.write(GenericRow.of(i, i * 1000L));
+            }
             commitMessages.put((long) i, write.prepareCommit(true, i));
         }
         write.close();

Reply via email to