This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 95c540a6e486e34841444e98728119f30f5314e5
Author: yuzelin <33053040+yuze...@users.noreply.github.com>
AuthorDate: Wed Sep 17 16:32:52 2025 +0800

    [append] Fix error of compaction append table with deletion vectors (#6258)
---
 .../apache/paimon/append/AppendCompactTask.java    |  20 +-
 .../org/apache/paimon/TestAppendFileStore.java     |   4 +
 .../paimon/append/AppendCompactTaskTest.java       | 214 +++++++++++++++++++++
 3 files changed, 230 insertions(+), 8 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java
index af8997f981..b3b9a0cb17 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java
@@ -38,7 +38,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
-import java.util.stream.Collectors;
 
 import static org.apache.paimon.table.BucketMode.UNAWARE_BUCKET;
 
@@ -91,13 +90,18 @@ public class AppendCompactTask {
             compactBefore.forEach(
                     f -> 
dvIndexFileMaintainer.notifyRemovedDeletionVector(f.fileName()));
             List<IndexManifestEntry> indexEntries = 
dvIndexFileMaintainer.persist();
-            Preconditions.checkArgument(
-                    indexEntries.stream().noneMatch(i -> i.kind() == 
FileKind.ADD));
-            List<IndexFileMeta> removed =
-                    indexEntries.stream()
-                            .map(IndexManifestEntry::indexFile)
-                            .collect(Collectors.toList());
-            indexIncrement = new IndexIncrement(Collections.emptyList(), 
removed);
+            // If compact task didn't compact all files, the remain deletion 
files will be written
+            // into new deletion files.
+            List<IndexFileMeta> newIndexFiles = new ArrayList<>();
+            List<IndexFileMeta> deletedIndexFiles = new ArrayList<>();
+            for (IndexManifestEntry entry : indexEntries) {
+                if (entry.kind() == FileKind.ADD) {
+                    newIndexFiles.add(entry.indexFile());
+                } else {
+                    deletedIndexFiles.add(entry.indexFile());
+                }
+            }
+            indexIncrement = new IndexIncrement(newIndexFiles, 
deletedIndexFiles);
         } else {
             compactAfter.addAll(
                     write.compactRewrite(partition, UNAWARE_BUCKET, null, 
compactBefore));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
index 9caa379884..60a00ab567 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
@@ -93,6 +93,10 @@ public class TestAppendFileStore extends AppendOnlyFileStore 
{
         return this.fileIO;
     }
 
+    public TableSchema schema() {
+        return schema;
+    }
+
     public FileStoreCommitImpl newCommit() {
         return super.newCommit(commitUser, null);
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactTaskTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactTaskTest.java
new file mode 100644
index 0000000000..e03dd2cee2
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactTaskTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.TestAppendFileStore;
+import org.apache.paimon.TestKeyValueGenerator;
+import org.apache.paimon.compact.CompactManager;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
+import org.apache.paimon.deletionvectors.DeletionVector;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileIOFinder;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.IndexIncrement;
+import org.apache.paimon.operation.BaseAppendFileStoreWrite;
+import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.operation.RawFileSplitRead;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TraceableFileIO;
+
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+
+import static org.apache.paimon.io.DataFileTestUtils.newFile;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link AppendCompactTask}. */
+public class AppendCompactTaskTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testAppendCompactionWithDeletionVectors(boolean 
compactBeforeAllFiles)
+            throws Exception {
+        TestAppendFileStore store =
+                createAppendStore(
+                        tempDir,
+                        Collections.singletonMap(
+                                CoreOptions.DELETION_VECTORS_ENABLED.key(), 
"true"));
+
+        // Create deletion vectors for two files
+        // Each file has some deleted rows
+        Map<String, List<Integer>> dvs = new HashMap<>();
+        dvs.put("data-0.orc", Arrays.asList(1, 3, 5));
+        dvs.put("data-1.orc", Arrays.asList(2, 4, 6));
+
+        // Write deletion vectors for all files to simulate existing deletion 
vectors
+        CommitMessageImpl commitMessage = 
store.writeDVIndexFiles(BinaryRow.EMPTY_ROW, 0, dvs);
+        store.commit(commitMessage);
+
+        List<DataFileMeta> allFiles =
+                Arrays.asList(
+                        newFile("data-0.orc", 0, 0, 100, 100),
+                        newFile("data-1.orc", 0, 101, 200, 200));
+
+        List<DataFileMeta> beforeFiles =
+                compactBeforeAllFiles ? allFiles : 
Collections.singletonList(allFiles.get(0));
+        AppendCompactTask compactTask = new 
AppendCompactTask(BinaryRow.EMPTY_ROW, beforeFiles);
+
+        FileStoreTable table =
+                FileStoreTableFactory.create(
+                        store.fileIO(), store.options().path(), 
store.schema());
+        BaseAppendFileStoreWrite write = new NoopAppendWrite(store);
+
+        CommitMessageImpl compactMessage = (CommitMessageImpl) 
compactTask.doCompact(table, write);
+
+        IndexIncrement indexIncrement = compactMessage.indexIncrement();
+        assertThat(indexIncrement.deletedIndexFiles()).isNotEmpty();
+        if (compactBeforeAllFiles) {
+            assertThat(indexIncrement.newIndexFiles()).isEmpty();
+        } else {
+            assertThat(indexIncrement.newIndexFiles()).isNotEmpty();
+        }
+    }
+
+    private static class NoopAppendWrite extends BaseAppendFileStoreWrite {
+
+        public NoopAppendWrite(TestAppendFileStore store) {
+            this(
+                    store.fileIO(),
+                    store.newRead(),
+                    0L,
+                    store.schema().logicalRowType(),
+                    store.schema().logicalPartitionType(),
+                    store.pathFactory(),
+                    store.snapshotManager(),
+                    store.newScan(),
+                    store.options(),
+                    BucketedDvMaintainer.factory(store.newIndexFileHandler()),
+                    "test");
+        }
+
+        private NoopAppendWrite(
+                FileIO fileIO,
+                RawFileSplitRead readForCompact,
+                long schemaId,
+                RowType rowType,
+                RowType partitionType,
+                FileStorePathFactory pathFactory,
+                SnapshotManager snapshotManager,
+                FileStoreScan scan,
+                CoreOptions options,
+                @Nullable BucketedDvMaintainer.Factory dvMaintainerFactory,
+                String tableName) {
+            super(
+                    fileIO,
+                    readForCompact,
+                    schemaId,
+                    rowType,
+                    partitionType,
+                    pathFactory,
+                    snapshotManager,
+                    scan,
+                    options,
+                    dvMaintainerFactory,
+                    tableName);
+        }
+
+        @Override
+        public List<DataFileMeta> compactRewrite(
+                BinaryRow partition,
+                int bucket,
+                @Nullable Function<String, DeletionVector> dvFactory,
+                List<DataFileMeta> toCompact)
+                throws Exception {
+            return Collections.emptyList();
+        }
+
+        @Override
+        protected CompactManager getCompactManager(
+                BinaryRow partition,
+                int bucket,
+                List<DataFileMeta> restoredFiles,
+                ExecutorService compactExecutor,
+                @Nullable BucketedDvMaintainer dvMaintainer) {
+            return null;
+        }
+
+        @Override
+        protected Function<WriterContainer<InternalRow>, Boolean> 
createWriterCleanChecker() {
+            return null;
+        }
+    }
+
+    private TestAppendFileStore createAppendStore(
+            java.nio.file.Path tempDir, Map<String, String> dynamicOptions) 
throws Exception {
+        String root = TraceableFileIO.SCHEME + "://" + tempDir.toString();
+        Path path = new Path(tempDir.toUri());
+        FileIO fileIO = FileIOFinder.find(new Path(root));
+        SchemaManager schemaManage = new SchemaManager(new LocalFileIO(), 
path);
+
+        Map<String, String> options = new HashMap<>(dynamicOptions);
+        options.put(CoreOptions.PATH.key(), root);
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(
+                        schemaManage,
+                        new Schema(
+                                
TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(),
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                options,
+                                null));
+        return new TestAppendFileStore(
+                fileIO,
+                schemaManage,
+                new CoreOptions(options),
+                tableSchema,
+                RowType.of(),
+                RowType.of(),
+                TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+                (new Path(root)).getName());
+    }
+}

Reply via email to