This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.6 in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit 8378a1777134695fc91d2501cf6f0568f5a4ab28 Author: Jingsong Lee <[email protected]> AuthorDate: Wed Jan 10 13:54:55 2024 +0800 [core] Fix index file checkFilesExistence in TableCommitImpl (#2668) --- .../apache/paimon/table/sink/TableCommitImpl.java | 4 +++- .../apache/paimon/table/sink/TableCommitTest.java | 22 ++++++++++++++++++---- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java index 83914744b..bfc52ec5e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java @@ -37,6 +37,7 @@ import org.apache.paimon.utils.ExecutorThreadFactory; import org.apache.paimon.utils.FileUtils; import org.apache.paimon.utils.IOUtils; import org.apache.paimon.utils.Pair; +import org.apache.paimon.utils.PathFactory; import org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors; @@ -252,6 +253,7 @@ public class TableCommitImpl implements InnerTableCommit { private void checkFilesExistence(List<ManifestCommittable> committables) { List<Path> files = new ArrayList<>(); Map<Pair<BinaryRow, Integer>, DataFilePathFactory> factoryMap = new HashMap<>(); + PathFactory indexFileFactory = commit.pathFactory().indexFileFactory(); for (ManifestCommittable committable : committables) { for (CommitMessage message : committable.fileCommittables()) { CommitMessageImpl msg = (CommitMessageImpl) message; @@ -270,7 +272,7 @@ public class TableCommitImpl implements InnerTableCommit { msg.compactIncrement().compactAfter().forEach(collector); msg.indexIncrement().newIndexFiles().stream() .map(IndexFileMeta::fileName) - .map(pathFactory::toPath) + .map(indexFileFactory::toPath) .forEach(files::add); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java index cba062894..362b40301 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java @@ -67,19 +67,28 @@ public class TableCommitTest { private static final Map<String, Set<Long>> commitCallbackResult = new ConcurrentHashMap<>(); @Test - public void testCommitCallbackWithFailure() throws Exception { + public void testCommitCallbackWithFailureFixedBucket() throws Exception { + innerTestCommitCallbackWithFailure(1); + } + + @Test + public void testCommitCallbackWithFailureDynamicBucket() throws Exception { + innerTestCommitCallbackWithFailure(-1); + } + + private void innerTestCommitCallbackWithFailure(int bucket) throws Exception { int numIdentifiers = 30; String testId = UUID.randomUUID().toString(); commitCallbackResult.put(testId, new HashSet<>()); try { - testCommitCallbackWithFailureImpl(numIdentifiers, testId); + testCommitCallbackWithFailureImpl(bucket, numIdentifiers, testId); } finally { commitCallbackResult.remove(testId); } } - private void testCommitCallbackWithFailureImpl(int numIdentifiers, String testId) + private void testCommitCallbackWithFailureImpl(int bucket, int numIdentifiers, String testId) throws Exception { String failingName = UUID.randomUUID().toString(); // no failure when creating table and writing data @@ -94,6 +103,7 @@ public class TableCommitTest { Options conf = new Options(); conf.set(CoreOptions.PATH, path); + conf.set(CoreOptions.BUCKET, bucket); conf.set(CoreOptions.COMMIT_CALLBACKS, TestCommitCallback.class.getName()); conf.set( CoreOptions.COMMIT_CALLBACK_PARAM @@ -121,7 +131,11 @@ public class TableCommitTest { StreamTableWrite write = table.newWrite(commitUser); Map<Long, List<CommitMessage>> commitMessages = new HashMap<>(); for (int i = 0; i < numIdentifiers; i++) { - write.write(GenericRow.of(i, i * 1000L)); + if (bucket == -1) { + write.write(GenericRow.of(i, i * 1000L), 0); + } else { + write.write(GenericRow.of(i, i * 1000L)); + } commitMessages.put((long) i, write.prepareCommit(true, i)); } write.close();
