This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 95c540a6e486e34841444e98728119f30f5314e5 Author: yuzelin <33053040+yuze...@users.noreply.github.com> AuthorDate: Wed Sep 17 16:32:52 2025 +0800 [append] Fix error of compaction append table with deletion vectors (#6258) --- .../apache/paimon/append/AppendCompactTask.java | 20 +- .../org/apache/paimon/TestAppendFileStore.java | 4 + .../paimon/append/AppendCompactTaskTest.java | 214 +++++++++++++++++++++ 3 files changed, 230 insertions(+), 8 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java index af8997f981..b3b9a0cb17 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java @@ -38,7 +38,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; -import java.util.stream.Collectors; import static org.apache.paimon.table.BucketMode.UNAWARE_BUCKET; @@ -91,13 +90,18 @@ public class AppendCompactTask { compactBefore.forEach( f -> dvIndexFileMaintainer.notifyRemovedDeletionVector(f.fileName())); List<IndexManifestEntry> indexEntries = dvIndexFileMaintainer.persist(); - Preconditions.checkArgument( - indexEntries.stream().noneMatch(i -> i.kind() == FileKind.ADD)); - List<IndexFileMeta> removed = - indexEntries.stream() - .map(IndexManifestEntry::indexFile) - .collect(Collectors.toList()); - indexIncrement = new IndexIncrement(Collections.emptyList(), removed); + // If compact task didn't compact all files, the remain deletion files will be written + // into new deletion files. + List<IndexFileMeta> newIndexFiles = new ArrayList<>(); + List<IndexFileMeta> deletedIndexFiles = new ArrayList<>(); + for (IndexManifestEntry entry : indexEntries) { + if (entry.kind() == FileKind.ADD) { + newIndexFiles.add(entry.indexFile()); + } else { + deletedIndexFiles.add(entry.indexFile()); + } + } + indexIncrement = new IndexIncrement(newIndexFiles, deletedIndexFiles); } else { compactAfter.addAll( write.compactRewrite(partition, UNAWARE_BUCKET, null, compactBefore)); diff --git a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java index 9caa379884..60a00ab567 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java @@ -93,6 +93,10 @@ public class TestAppendFileStore extends AppendOnlyFileStore { return this.fileIO; } + public TableSchema schema() { + return schema; + } + public FileStoreCommitImpl newCommit() { return super.newCommit(commitUser, null); } diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactTaskTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactTaskTest.java new file mode 100644 index 0000000000..e03dd2cee2 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactTaskTest.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.append; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.TestAppendFileStore; +import org.apache.paimon.TestKeyValueGenerator; +import org.apache.paimon.compact.CompactManager; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.deletionvectors.BucketedDvMaintainer; +import org.apache.paimon.deletionvectors.DeletionVector; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.FileIOFinder; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.IndexIncrement; +import org.apache.paimon.operation.BaseAppendFileStoreWrite; +import org.apache.paimon.operation.FileStoreScan; +import org.apache.paimon.operation.RawFileSplitRead; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.SchemaUtils; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.TraceableFileIO; + +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.function.Function; + +import static org.apache.paimon.io.DataFileTestUtils.newFile; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link AppendCompactTask}. */ +public class AppendCompactTaskTest { + + @TempDir java.nio.file.Path tempDir; + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testAppendCompactionWithDeletionVectors(boolean compactBeforeAllFiles) + throws Exception { + TestAppendFileStore store = + createAppendStore( + tempDir, + Collections.singletonMap( + CoreOptions.DELETION_VECTORS_ENABLED.key(), "true")); + + // Create deletion vectors for two files + // Each file has some deleted rows + Map<String, List<Integer>> dvs = new HashMap<>(); + dvs.put("data-0.orc", Arrays.asList(1, 3, 5)); + dvs.put("data-1.orc", Arrays.asList(2, 4, 6)); + + // Write deletion vectors for all files to simulate existing deletion vectors + CommitMessageImpl commitMessage = store.writeDVIndexFiles(BinaryRow.EMPTY_ROW, 0, dvs); + store.commit(commitMessage); + + List<DataFileMeta> allFiles = + Arrays.asList( + newFile("data-0.orc", 0, 0, 100, 100), + newFile("data-1.orc", 0, 101, 200, 200)); + + List<DataFileMeta> beforeFiles = + compactBeforeAllFiles ? allFiles : Collections.singletonList(allFiles.get(0)); + AppendCompactTask compactTask = new AppendCompactTask(BinaryRow.EMPTY_ROW, beforeFiles); + + FileStoreTable table = + FileStoreTableFactory.create( + store.fileIO(), store.options().path(), store.schema()); + BaseAppendFileStoreWrite write = new NoopAppendWrite(store); + + CommitMessageImpl compactMessage = (CommitMessageImpl) compactTask.doCompact(table, write); + + IndexIncrement indexIncrement = compactMessage.indexIncrement(); + assertThat(indexIncrement.deletedIndexFiles()).isNotEmpty(); + if (compactBeforeAllFiles) { + assertThat(indexIncrement.newIndexFiles()).isEmpty(); + } else { + assertThat(indexIncrement.newIndexFiles()).isNotEmpty(); + } + } + + private static class NoopAppendWrite extends BaseAppendFileStoreWrite { + + public NoopAppendWrite(TestAppendFileStore store) { + this( + store.fileIO(), + store.newRead(), + 0L, + store.schema().logicalRowType(), + store.schema().logicalPartitionType(), + store.pathFactory(), + store.snapshotManager(), + store.newScan(), + store.options(), + BucketedDvMaintainer.factory(store.newIndexFileHandler()), + "test"); + } + + private NoopAppendWrite( + FileIO fileIO, + RawFileSplitRead readForCompact, + long schemaId, + RowType rowType, + RowType partitionType, + FileStorePathFactory pathFactory, + SnapshotManager snapshotManager, + FileStoreScan scan, + CoreOptions options, + @Nullable BucketedDvMaintainer.Factory dvMaintainerFactory, + String tableName) { + super( + fileIO, + readForCompact, + schemaId, + rowType, + partitionType, + pathFactory, + snapshotManager, + scan, + options, + dvMaintainerFactory, + tableName); + } + + @Override + public List<DataFileMeta> compactRewrite( + BinaryRow partition, + int bucket, + @Nullable Function<String, DeletionVector> dvFactory, + List<DataFileMeta> toCompact) + throws Exception { + return Collections.emptyList(); + } + + @Override + protected CompactManager getCompactManager( + BinaryRow partition, + int bucket, + List<DataFileMeta> restoredFiles, + ExecutorService compactExecutor, + @Nullable BucketedDvMaintainer dvMaintainer) { + return null; + } + + @Override + protected Function<WriterContainer<InternalRow>, Boolean> createWriterCleanChecker() { + return null; + } + } + + private TestAppendFileStore createAppendStore( + java.nio.file.Path tempDir, Map<String, String> dynamicOptions) throws Exception { + String root = TraceableFileIO.SCHEME + "://" + tempDir.toString(); + Path path = new Path(tempDir.toUri()); + FileIO fileIO = FileIOFinder.find(new Path(root)); + SchemaManager schemaManage = new SchemaManager(new LocalFileIO(), path); + + Map<String, String> options = new HashMap<>(dynamicOptions); + options.put(CoreOptions.PATH.key(), root); + TableSchema tableSchema = + SchemaUtils.forceCommit( + schemaManage, + new Schema( + TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(), + Collections.emptyList(), + Collections.emptyList(), + options, + null)); + return new TestAppendFileStore( + fileIO, + schemaManage, + new CoreOptions(options), + tableSchema, + RowType.of(), + RowType.of(), + TestKeyValueGenerator.DEFAULT_ROW_TYPE, + (new Path(root)).getName()); + } +}