This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new ec198499 [FLINK-28108] Support "ALTER TABLE ... COMPACT" for 
append-only table
ec198499 is described below

commit ec198499edcb5acaa804f21bd60022bdb633c85c
Author: Jane Chan <[email protected]>
AuthorDate: Fri Jul 15 10:43:55 2022 +0800

    [FLINK-28108] Support "ALTER TABLE ... COMPACT" for append-only table
    
    This closes #214
---
 .../store/connector/TableStoreManagedFactory.java  |  13 +-
 .../store/connector/AlterTableCompactITCase.java   |  36 ++-
 .../connector/TableStoreManagedFactoryTest.java    |  16 --
 .../store/file/data/AppendOnlyCompactManager.java  | 128 +++++++--
 .../file/operation/AppendOnlyFileStoreWrite.java   |  12 +-
 .../file/data/AppendOnlyCompactManagerTest.java    |   5 +-
 .../store/file/data/IterativeCompactTaskTest.java  | 297 +++++++++++++++++++++
 7 files changed, 458 insertions(+), 49 deletions(-)

diff --git 
a/flink-table-store-connector/src/main/1.15.0/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
 
b/flink-table-store-connector/src/main/1.15.0/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
index 37dfda61..19358f25 100644
--- 
a/flink-table-store-connector/src/main/1.15.0/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
+++ 
b/flink-table-store-connector/src/main/1.15.0/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
@@ -29,7 +29,6 @@ import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
 import org.apache.flink.table.store.log.LogStoreTableFactory;
 import org.apache.flink.util.Preconditions;
 
-
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.HashMap;
@@ -84,7 +83,8 @@ public class TableStoreManagedFactory extends 
AbstractTableStoreFactory
                 createOptionalLogStoreFactory(context.getClassLoader(), 
enrichedOptions);
         logFactory.ifPresent(
                 factory ->
-                        factory.enrichOptions(new 
TableStoreDynamicContext(context, enrichedOptions))
+                        factory.enrichOptions(
+                                        new TableStoreDynamicContext(context, 
enrichedOptions))
                                 .forEach(enrichedOptions::putIfAbsent));
 
         return enrichedOptions;
@@ -167,20 +167,13 @@ public class TableStoreManagedFactory extends 
AbstractTableStoreFactory
             throw new UncheckedIOException(e);
         }
         createOptionalLogStoreFactory(context)
-                .ifPresent(
-                        factory ->
-                                factory.onDropTable(context, 
ignoreIfNotExists));
+                .ifPresent(factory -> factory.onDropTable(context, 
ignoreIfNotExists));
     }
 
     @Override
     public Map<String, String> onCompactTable(
             Context context, CatalogPartitionSpec catalogPartitionSpec) {
         Map<String, String> newOptions = new 
HashMap<>(context.getCatalogTable().getOptions());
-        if 
(APPEND_ONLY.toString().equals(newOptions.get(CoreOptions.WRITE_MODE.key()))) {
-            throw new UnsupportedOperationException(
-                    "ALTER TABLE COMPACT is not yet supported for append only 
table.");
-        }
-
         newOptions.put(COMPACTION_MANUAL_TRIGGERED.key(), 
String.valueOf(true));
         newOptions.put(
                 COMPACTION_PARTITION_SPEC.key(),
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
index 34b97668..fa9fdc32 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
@@ -66,7 +66,13 @@ public class AlterTableCompactITCase extends 
FileStoreTableITCase {
                         + "shopId INT\n, "
                         + "orderId BIGINT NOT NULL\n, "
                         + "itemId BIGINT)"
-                        + "PARTITIONED BY (dt, hr)");
+                        + "PARTITIONED BY (dt, hr)",
+                "CREATE TABLE IF NOT EXISTS T3 (\n"
+                        + "f0 INT\n, "
+                        + "f1 STRING NOT NULL\n"
+                        + ") WITH (\n"
+                        + "'write-mode' = 'append-only'\n"
+                        + ")");
     }
 
     @Test
@@ -113,6 +119,34 @@ public class AlterTableCompactITCase extends 
FileStoreTableITCase {
                 .isEqualTo(snapshot);
     }
 
+    @Test
+    public void testAppendOnlyTable() {
+        innerTest("INSERT INTO T3 VALUES(1, 'AAA')", 1L, 
Snapshot.CommitKind.APPEND);
+        innerTest("ALTER TABLE T3 COMPACT", 1L, Snapshot.CommitKind.APPEND);
+
+        innerTest("INSERT INTO T3 VALUES(2, 'BBB')", 2L, 
Snapshot.CommitKind.APPEND);
+        innerTest("ALTER TABLE T3 COMPACT", 2L, Snapshot.CommitKind.APPEND);
+
+        innerTest("INSERT INTO T3 VALUES(3, 'CCC')", 3L, 
Snapshot.CommitKind.APPEND);
+        innerTest("ALTER TABLE T3 COMPACT", 3L, Snapshot.CommitKind.APPEND);
+
+        innerTest("INSERT INTO T3 VALUES(4, 'DDD')", 4L, 
Snapshot.CommitKind.APPEND);
+        innerTest("ALTER TABLE T3 COMPACT", 4L, Snapshot.CommitKind.APPEND);
+
+        innerTest("INSERT INTO T3 VALUES(5, 'AAA')", 5L, 
Snapshot.CommitKind.APPEND);
+
+        batchSql("ALTER TABLE T3 SET ('compaction.early-max.file-num' = '5')");
+        innerTest("ALTER TABLE T3 COMPACT", 6L, Snapshot.CommitKind.COMPACT);
+    }
+
+    private void innerTest(
+            String sql, long expectedSnapshotId, Snapshot.CommitKind 
expectedCommitKind) {
+        batchSql(sql);
+        Snapshot snapshot = findLatestSnapshot("T3", true);
+        assertThat(snapshot.id()).isEqualTo(expectedSnapshotId);
+        assertThat(snapshot.commitKind()).isEqualTo(expectedCommitKind);
+    }
+
     private void innerTest(
             String tableName, int batchNum, 
TestKeyValueGenerator.GeneratorMode mode) {
         // increase trigger to avoid auto-compaction
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
index 843b99f5..4cedbbc8 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.types.logical.RowType;
@@ -69,7 +68,6 @@ import static 
org.apache.flink.table.store.file.TestKeyValueGenerator.DEFAULT_PA
 import static 
org.apache.flink.table.store.file.TestKeyValueGenerator.DEFAULT_ROW_TYPE;
 import static 
org.apache.flink.table.store.file.TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED;
 import static 
org.apache.flink.table.store.file.TestKeyValueGenerator.getPrimaryKeys;
-import static org.apache.flink.table.store.file.WriteMode.APPEND_ONLY;
 import static 
org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -246,20 +244,6 @@ public class TableStoreManagedFactoryTest {
                 .containsEntry(COMPACTION_PARTITION_SPEC.key(), 
JsonSerdeUtil.toJson(partSpec));
     }
 
-    @Test
-    public void testOnCompactAppendOnlyTable() {
-        context =
-                createEnrichedContext(
-                        Collections.singletonMap(
-                                CoreOptions.WRITE_MODE.key(), 
APPEND_ONLY.toString()));
-        assertThatThrownBy(
-                        () ->
-                                tableStoreManagedFactory.onCompactTable(
-                                        context, new 
CatalogPartitionSpec(Collections.emptyMap())))
-                .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessage("ALTER TABLE COMPACT is not yet supported for 
append only table.");
-    }
-
     // ~ Tools 
------------------------------------------------------------------
 
     private static ResolvedCatalogTable getDummyTable(Map<String, String> 
tableOptions) {
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManager.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManager.java
index e30b0e43..1f27efe4 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManager.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManager.java
@@ -22,10 +22,15 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.table.store.file.compact.CompactManager;
 import org.apache.flink.table.store.file.compact.CompactResult;
 import org.apache.flink.table.store.file.compact.CompactTask;
+import org.apache.flink.table.store.file.utils.FileUtils;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 
 /** Compact manager for {@link 
org.apache.flink.table.store.file.AppendOnlyFileStore}. */
@@ -62,12 +67,19 @@ public class AppendOnlyCompactManager extends 
CompactManager {
                 .ifPresent(
                         (inputs) ->
                                 taskFuture =
-                                        executor.submit(
-                                                new 
AppendOnlyCompactTask(inputs, rewriter)));
+                                        executor.submit(new 
AutoCompactTask(inputs, rewriter)));
     }
 
     @VisibleForTesting
     Optional<List<DataFileMeta>> pickCompactBefore() {
+        return pick(toCompact, targetFileSize, minFileNum, maxFileNum);
+    }
+
+    private static Optional<List<DataFileMeta>> pick(
+            LinkedList<DataFileMeta> toCompact,
+            long targetFileSize,
+            int minFileNum,
+            int maxFileNum) {
         long totalFileSize = 0L;
         int fileNum = 0;
         LinkedList<DataFileMeta> candidates = new LinkedList<>();
@@ -81,7 +93,7 @@ public class AppendOnlyCompactManager extends CompactManager {
                     || fileNum >= maxFileNum) {
                 return Optional.of(candidates);
             } else if (totalFileSize >= targetFileSize) {
-                // left pointer shift one pos to right
+                // let pointer shift one pos to right
                 DataFileMeta removed = candidates.pollFirst();
                 assert removed != null;
                 totalFileSize -= removed.fileSize();
@@ -97,33 +109,113 @@ public class AppendOnlyCompactManager extends 
CompactManager {
         return toCompact;
     }
 
-    /** A {@link CompactTask} impl for append-only table. */
-    public static class AppendOnlyCompactTask extends CompactTask {
+    /**
+     * A {@link CompactTask} impl for ALTER TABLE COMPACT of append-only table.
+     *
+     * <p>This task accepts a pre-scanned file list as input and pick the 
candidate files to compact
+     * iteratively until reach the end of the input. There might be multiple 
times of rewrite
+     * happens during one task.
+     */
+    public static class IterativeCompactTask extends CompactTask {
+
+        private final long targetFileSize;
+        private final int minFileNum;
+        private final int maxFileNum;
+        private final CompactRewriter rewriter;
+        private final DataFilePathFactory factory;
+
+        public IterativeCompactTask(
+                List<DataFileMeta> inputs,
+                long targetFileSize,
+                int minFileNum,
+                int maxFileNum,
+                CompactRewriter rewriter,
+                DataFilePathFactory factory) {
+            super(inputs);
+            this.targetFileSize = targetFileSize;
+            this.minFileNum = minFileNum;
+            this.maxFileNum = maxFileNum;
+            this.rewriter = rewriter;
+            this.factory = factory;
+        }
+
+        @Override
+        protected CompactResult doCompact(List<DataFileMeta> inputs) throws 
Exception {
+            LinkedList<DataFileMeta> toCompact = new LinkedList<>(inputs);
+            Set<DataFileMeta> compactBefore = new LinkedHashSet<>();
+            List<DataFileMeta> compactAfter = new ArrayList<>();
+            while (!toCompact.isEmpty()) {
+                Optional<List<DataFileMeta>> candidates =
+                        AppendOnlyCompactManager.pick(
+                                toCompact, targetFileSize, minFileNum, 
maxFileNum);
+                if (candidates.isPresent()) {
+                    List<DataFileMeta> before = candidates.get();
+                    compactBefore.addAll(before);
+                    List<DataFileMeta> after = rewriter.rewrite(before);
+                    compactAfter.addAll(after);
+                    DataFileMeta lastFile = after.get(after.size() - 1);
+                    if (lastFile.fileSize() < targetFileSize) {
+                        toCompact.offerFirst(lastFile);
+                    }
+                } else {
+                    break;
+                }
+            }
+            // remove and delete intermediate files
+            Iterator<DataFileMeta> afterIterator = compactAfter.iterator();
+            while (afterIterator.hasNext()) {
+                DataFileMeta file = afterIterator.next();
+                if (compactBefore.contains(file)) {
+                    compactBefore.remove(file);
+                    afterIterator.remove();
+                    delete(file);
+                }
+            }
+            return result(new ArrayList<>(compactBefore), compactAfter);
+        }
+
+        @VisibleForTesting
+        void delete(DataFileMeta tmpFile) {
+            FileUtils.deleteOrWarn(factory.toPath(tmpFile.fileName()));
+        }
+    }
+
+    /**
+     * A {@link CompactTask} impl for append-only table auto-compaction.
+     *
+     * <p>This task accepts an already-picked candidate to perform one-time 
rewrite. And for the
+     * rest of input files, it is the duty of {@link AppendOnlyWriter} to 
invoke the next time
+     * compaction.
+     */
+    public static class AutoCompactTask extends CompactTask {
 
         private final CompactRewriter rewriter;
 
-        public AppendOnlyCompactTask(List<DataFileMeta> toCompact, 
CompactRewriter rewriter) {
+        public AutoCompactTask(List<DataFileMeta> toCompact, CompactRewriter 
rewriter) {
             super(toCompact);
             this.rewriter = rewriter;
         }
 
         @Override
         protected CompactResult doCompact(List<DataFileMeta> inputs) throws 
Exception {
-            List<DataFileMeta> compactAfter = rewriter.rewrite(inputs);
-            return new CompactResult() {
-                @Override
-                public List<DataFileMeta> before() {
-                    return inputs;
-                }
-
-                @Override
-                public List<DataFileMeta> after() {
-                    return compactAfter;
-                }
-            };
+            return result(inputs, rewriter.rewrite(inputs));
         }
     }
 
+    private static CompactResult result(List<DataFileMeta> before, 
List<DataFileMeta> after) {
+        return new CompactResult() {
+            @Override
+            public List<DataFileMeta> before() {
+                return before;
+            }
+
+            @Override
+            public List<DataFileMeta> after() {
+                return after;
+            }
+        };
+    }
+
     /** Compact rewriter for append-only table. */
     public interface CompactRewriter {
         List<DataFileMeta> rewrite(List<DataFileMeta> compactBefore) throws 
Exception;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
index 30456991..b2bed275 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
@@ -95,8 +95,16 @@ public class AppendOnlyFileStoreWrite extends 
AbstractFileStoreWrite<RowData> {
     @Override
     public Callable<CompactResult> createCompactWriter(
             BinaryRowData partition, int bucket, @Nullable List<DataFileMeta> 
compactFiles) {
-        throw new UnsupportedOperationException(
-                "Currently append only write mode does not support 
compaction.");
+        if (compactFiles == null) {
+            compactFiles = scanExistingFileMetas(partition, bucket);
+        }
+        return new AppendOnlyCompactManager.IterativeCompactTask(
+                compactFiles,
+                targetFileSize,
+                minFileNum,
+                maxFileNum,
+                compactRewriter(partition, bucket),
+                pathFactory.createDataFilePathFactory(partition, bucket));
     }
 
     private RecordWriter<RowData> createWriter(
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManagerTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManagerTest.java
index 9979f080..94959520 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManagerTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManagerTest.java
@@ -200,12 +200,13 @@ public class AppendOnlyCompactManagerTest {
         long targetFileSize = 1024;
         AppendOnlyCompactManager manager =
                 new AppendOnlyCompactManager(
-                        null,
+                        null, // not used
                         new LinkedList<>(toCompactBeforePick),
                         minFileNum,
                         maxFileNum,
                         targetFileSize,
-                        null);
+                        null // not used
+                        );
         Optional<List<DataFileMeta>> actual = manager.pickCompactBefore();
         assertThat(actual.isPresent()).isEqualTo(expectedPresent);
         if (expectedPresent) {
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/IterativeCompactTaskTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/IterativeCompactTaskTest.java
new file mode 100644
index 00000000..c5610ca6
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/IterativeCompactTaskTest.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.data;
+
+import org.apache.flink.table.store.file.compact.CompactResult;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.flink.table.store.file.data.DataFileTestUtils.newFile;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/** Test for {@link IterativeCompactTaskTest}. */
+public class IterativeCompactTaskTest {
+
+    private static final long TARGET_FILE_SIZE = 1024L;
+    private static final int MIN_FILE_NUM = 3;
+    private static final int MAX_FILE_NUM = 10;
+
+    @Test
+    public void testNoCompact() {
+        // empty
+        innerTest(
+                Collections.emptyList(),
+                Collections.emptyList(),
+                Collections.emptyList(),
+                Collections.emptyList());
+
+        // single small file
+        innerTest(
+                Collections.singletonList(newFile(1L, 10L)),
+                Collections.emptyList(),
+                Collections.emptyList(),
+                Collections.emptyList());
+
+        // single large file
+        innerTest(
+                Collections.singletonList(newFile(1L, 1024L)),
+                Collections.emptyList(),
+                Collections.emptyList(),
+                Collections.emptyList());
+
+        // almost-full files
+        innerTest(
+                Arrays.asList(newFile(1L, 1024L), newFile(2L, 2048L)),
+                Collections.emptyList(),
+                Collections.emptyList(),
+                Collections.emptyList());
+
+        // large files
+        innerTest(
+                Arrays.asList(newFile(1L, 1000L), newFile(1001L, 1100L)),
+                Collections.emptyList(),
+                Collections.emptyList(),
+                Collections.emptyList());
+    }
+
+    @Test
+    public void testCompactOnce() {
+        // small files on the head
+        innerTest(
+                Arrays.asList(
+                        newFile(1L, 500L),
+                        newFile(501L, 1000L),
+                        newFile(1001L, 1010L),
+                        newFile(1011L, 1024L),
+                        newFile(1025L, 3000L)),
+                Arrays.asList(
+                        newFile(1L, 500L),
+                        newFile(501L, 1000L),
+                        newFile(1001L, 1010L),
+                        newFile(1011L, 1024L)),
+                Collections.singletonList(newFile(1L, 1024L)),
+                Collections.emptyList());
+
+        innerTest(
+                Arrays.asList(
+                        newFile(1L, 500L),
+                        newFile(501L, 1000L),
+                        newFile(1001L, 1010L),
+                        newFile(1011L, 2000L),
+                        newFile(2001L, 3000L)),
+                Arrays.asList(
+                        newFile(1L, 500L),
+                        newFile(501L, 1000L),
+                        newFile(1001L, 1010L),
+                        newFile(1011L, 2000L)),
+                Arrays.asList(newFile(1L, 1024L), newFile(1025L, 2000L)),
+                Collections.emptyList());
+
+        // small files in the middle
+        innerTest(
+                Arrays.asList(
+                        newFile(1L, 2000L),
+                        newFile(2001L, 4000L),
+                        newFile(4001L, 4500L),
+                        newFile(4501L, 4600L),
+                        newFile(4601L, 4700L),
+                        newFile(4701L, 5024L),
+                        newFile(5025L, 7000L)),
+                Arrays.asList(
+                        newFile(4001L, 4500L),
+                        newFile(4501L, 4600L),
+                        newFile(4601L, 4700L),
+                        newFile(4701L, 5024L)),
+                Collections.singletonList(newFile(4001L, 5024L)),
+                Collections.emptyList());
+
+        // small files on the tail
+        innerTest(
+                Arrays.asList(
+                        newFile(1L, 2000L),
+                        newFile(2001L, 4000L),
+                        newFile(4001L, 4010L),
+                        newFile(4011L, 4020L),
+                        newFile(4021L, 4030L),
+                        newFile(4031L, 4040L),
+                        newFile(4041L, 4050L),
+                        newFile(4051L, 4060L),
+                        newFile(4061L, 4070L),
+                        newFile(4071L, 4080L),
+                        newFile(4081L, 4090L),
+                        newFile(4091L, 4110L)),
+                Arrays.asList(
+                        newFile(4001L, 4010L),
+                        newFile(4011L, 4020L),
+                        newFile(4021L, 4030L),
+                        newFile(4031L, 4040L),
+                        newFile(4041L, 4050L),
+                        newFile(4051L, 4060L),
+                        newFile(4061L, 4070L),
+                        newFile(4071L, 4080L),
+                        newFile(4081L, 4090L),
+                        newFile(4091L, 4110L)),
+                Collections.singletonList(newFile(4001L, 4110L)),
+                Collections.emptyList());
+    }
+
+    @Test
+    public void testCompactMultiple() {
+        // continuous compact
+        innerTest(
+                Arrays.asList(
+                        newFile(1L, 2000L),
+                        newFile(2001L, 4000L),
+                        // 4001~4010, ..., 4091~4110
+                        newFile(4001L, 4010L),
+                        newFile(4011L, 4020L),
+                        newFile(4021L, 4030L),
+                        newFile(4031L, 4040L),
+                        newFile(4041L, 4050L),
+                        newFile(4051L, 4060L),
+                        newFile(4061L, 4070L),
+                        newFile(4071L, 4080L),
+                        newFile(4081L, 4090L),
+                        newFile(4091L, 4110L),
+                        // 4001~4110, 5015~5024
+                        newFile(4111L, 5000L),
+                        newFile(5001L, 5014L),
+                        newFile(5015L, 5024L)),
+                Arrays.asList(
+                        newFile(4001L, 4010L),
+                        newFile(4011L, 4020L),
+                        newFile(4021L, 4030L),
+                        newFile(4031L, 4040L),
+                        newFile(4041L, 4050L),
+                        newFile(4051L, 4060L),
+                        newFile(4061L, 4070L),
+                        newFile(4071L, 4080L),
+                        newFile(4081L, 4090L),
+                        newFile(4091L, 4110L),
+                        newFile(4111L, 5000L),
+                        newFile(5001L, 5014L),
+                        newFile(5015L, 5024L)),
+                Collections.singletonList(newFile(4001L, 5024L)),
+                Collections.singletonList(newFile(4001L, 4110L)));
+
+        // alternate compact
+        innerTest(
+                Arrays.asList(
+                        newFile(1L, 2000L),
+                        newFile(2001L, 4000L),
+                        // 4001~4500, ..., 4701~6000
+                        newFile(4001L, 4500L),
+                        newFile(4501L, 4600L),
+                        newFile(4601L, 4700L),
+                        newFile(4701L, 6000L),
+                        newFile(6001L, 7500L),
+                        // 7501~8000, 8201~8900
+                        newFile(7501L, 8000L),
+                        newFile(8001L, 8200L),
+                        newFile(8201L, 8900L),
+                        newFile(8901L, 9550L)),
+                Arrays.asList(
+                        newFile(4001L, 4500L),
+                        newFile(4501L, 4600L),
+                        newFile(4601L, 4700L),
+                        newFile(4701L, 6000L),
+                        newFile(7501L, 8000L),
+                        newFile(8001L, 8200L),
+                        newFile(8201L, 8900L)),
+                Arrays.asList(
+                        newFile(4001L, 5024L),
+                        newFile(5025L, 6000L),
+                        newFile(7501L, 8524L),
+                        newFile(8525L, 8900L)),
+                Collections.emptyList());
+    }
+
+    private void innerTest(
+            List<DataFileMeta> compactFiles,
+            List<DataFileMeta> expectBefore,
+            List<DataFileMeta> expectAfter,
+            List<DataFileMeta> expectDeleted) {
+        MockIterativeCompactTask task =
+                new MockIterativeCompactTask(
+                        compactFiles, TARGET_FILE_SIZE, MIN_FILE_NUM, 
MAX_FILE_NUM, rewriter());
+        try {
+            CompactResult actual = task.doCompact(compactFiles);
+            
assertThat(actual.before()).containsExactlyInAnyOrderElementsOf(expectBefore);
+            
assertThat(actual.after()).containsExactlyInAnyOrderElementsOf(expectAfter);
+
+            // assert the temporary files are deleted
+            
assertThat(task.deleted).containsExactlyInAnyOrderElementsOf(expectDeleted);
+        } catch (Exception e) {
+            fail("This should not happen");
+        }
+    }
+
+    /** A Mock {@link AppendOnlyCompactManager.IterativeCompactTask} to test. 
*/
+    private static class MockIterativeCompactTask
+            extends AppendOnlyCompactManager.IterativeCompactTask {
+
+        private final Set<DataFileMeta> deleted;
+
+        public MockIterativeCompactTask(
+                List<DataFileMeta> inputs,
+                long targetFileSize,
+                int minFileNum,
+                int maxFileNum,
+                AppendOnlyCompactManager.CompactRewriter rewriter) {
+            super(inputs, targetFileSize, minFileNum, maxFileNum, rewriter, 
null);
+            deleted = new HashSet<>();
+        }
+
+        @Override
+        void delete(DataFileMeta tmpFile) {
+            deleted.add(tmpFile);
+        }
+    }
+
+    private AppendOnlyCompactManager.CompactRewriter rewriter() {
+        return compactBefore -> {
+            List<DataFileMeta> compactAfter = new ArrayList<>();
+            long totalFileSize = 0L;
+            long minSeq = -1L;
+            for (int i = 0; i < compactBefore.size(); i++) {
+                DataFileMeta file = compactBefore.get(i);
+                if (i == 0) {
+                    minSeq = file.minSequenceNumber();
+                }
+                totalFileSize += file.fileSize();
+                if (totalFileSize >= TARGET_FILE_SIZE) {
+                    compactAfter.add(newFile(minSeq, minSeq + TARGET_FILE_SIZE 
- 1));
+                    minSeq += TARGET_FILE_SIZE;
+                }
+                if (i == compactBefore.size() - 1 && minSeq <= 
file.maxSequenceNumber()) {
+                    compactAfter.add(newFile(minSeq, 
file.maxSequenceNumber()));
+                }
+            }
+            return compactAfter;
+        };
+    }
+}

Reply via email to