This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new ec198499 [FLINK-28108] Support "ALTER TABLE ... COMPACT" for
append-only table
ec198499 is described below
commit ec198499edcb5acaa804f21bd60022bdb633c85c
Author: Jane Chan <[email protected]>
AuthorDate: Fri Jul 15 10:43:55 2022 +0800
[FLINK-28108] Support "ALTER TABLE ... COMPACT" for append-only table
This closes #214
---
.../store/connector/TableStoreManagedFactory.java | 13 +-
.../store/connector/AlterTableCompactITCase.java | 36 ++-
.../connector/TableStoreManagedFactoryTest.java | 16 --
.../store/file/data/AppendOnlyCompactManager.java | 128 +++++++--
.../file/operation/AppendOnlyFileStoreWrite.java | 12 +-
.../file/data/AppendOnlyCompactManagerTest.java | 5 +-
.../store/file/data/IterativeCompactTaskTest.java | 297 +++++++++++++++++++++
7 files changed, 458 insertions(+), 49 deletions(-)
diff --git
a/flink-table-store-connector/src/main/1.15.0/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
b/flink-table-store-connector/src/main/1.15.0/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
index 37dfda61..19358f25 100644
---
a/flink-table-store-connector/src/main/1.15.0/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
+++
b/flink-table-store-connector/src/main/1.15.0/org/apache/flink/table/store/connector/TableStoreManagedFactory.java
@@ -29,7 +29,6 @@ import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
import org.apache.flink.table.store.log.LogStoreTableFactory;
import org.apache.flink.util.Preconditions;
-
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashMap;
@@ -84,7 +83,8 @@ public class TableStoreManagedFactory extends
AbstractTableStoreFactory
createOptionalLogStoreFactory(context.getClassLoader(),
enrichedOptions);
logFactory.ifPresent(
factory ->
- factory.enrichOptions(new
TableStoreDynamicContext(context, enrichedOptions))
+ factory.enrichOptions(
+ new TableStoreDynamicContext(context,
enrichedOptions))
.forEach(enrichedOptions::putIfAbsent));
return enrichedOptions;
@@ -167,20 +167,13 @@ public class TableStoreManagedFactory extends
AbstractTableStoreFactory
throw new UncheckedIOException(e);
}
createOptionalLogStoreFactory(context)
- .ifPresent(
- factory ->
- factory.onDropTable(context,
ignoreIfNotExists));
+ .ifPresent(factory -> factory.onDropTable(context,
ignoreIfNotExists));
}
@Override
public Map<String, String> onCompactTable(
Context context, CatalogPartitionSpec catalogPartitionSpec) {
Map<String, String> newOptions = new
HashMap<>(context.getCatalogTable().getOptions());
- if
(APPEND_ONLY.toString().equals(newOptions.get(CoreOptions.WRITE_MODE.key()))) {
- throw new UnsupportedOperationException(
- "ALTER TABLE COMPACT is not yet supported for append only
table.");
- }
-
newOptions.put(COMPACTION_MANUAL_TRIGGERED.key(),
String.valueOf(true));
newOptions.put(
COMPACTION_PARTITION_SPEC.key(),
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
index 34b97668..fa9fdc32 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AlterTableCompactITCase.java
@@ -66,7 +66,13 @@ public class AlterTableCompactITCase extends
FileStoreTableITCase {
+ "shopId INT\n, "
+ "orderId BIGINT NOT NULL\n, "
+ "itemId BIGINT)"
- + "PARTITIONED BY (dt, hr)");
+ + "PARTITIONED BY (dt, hr)",
+ "CREATE TABLE IF NOT EXISTS T3 (\n"
+ + "f0 INT\n, "
+ + "f1 STRING NOT NULL\n"
+ + ") WITH (\n"
+ + "'write-mode' = 'append-only'\n"
+ + ")");
}
@Test
@@ -113,6 +119,34 @@ public class AlterTableCompactITCase extends
FileStoreTableITCase {
.isEqualTo(snapshot);
}
+ @Test
+ public void testAppendOnlyTable() {
+ innerTest("INSERT INTO T3 VALUES(1, 'AAA')", 1L,
Snapshot.CommitKind.APPEND);
+ innerTest("ALTER TABLE T3 COMPACT", 1L, Snapshot.CommitKind.APPEND);
+
+ innerTest("INSERT INTO T3 VALUES(2, 'BBB')", 2L,
Snapshot.CommitKind.APPEND);
+ innerTest("ALTER TABLE T3 COMPACT", 2L, Snapshot.CommitKind.APPEND);
+
+ innerTest("INSERT INTO T3 VALUES(3, 'CCC')", 3L,
Snapshot.CommitKind.APPEND);
+ innerTest("ALTER TABLE T3 COMPACT", 3L, Snapshot.CommitKind.APPEND);
+
+ innerTest("INSERT INTO T3 VALUES(4, 'DDD')", 4L,
Snapshot.CommitKind.APPEND);
+ innerTest("ALTER TABLE T3 COMPACT", 4L, Snapshot.CommitKind.APPEND);
+
+ innerTest("INSERT INTO T3 VALUES(5, 'AAA')", 5L,
Snapshot.CommitKind.APPEND);
+
+ batchSql("ALTER TABLE T3 SET ('compaction.early-max.file-num' = '5')");
+ innerTest("ALTER TABLE T3 COMPACT", 6L, Snapshot.CommitKind.COMPACT);
+ }
+
+ private void innerTest(
+ String sql, long expectedSnapshotId, Snapshot.CommitKind
expectedCommitKind) {
+ batchSql(sql);
+ Snapshot snapshot = findLatestSnapshot("T3", true);
+ assertThat(snapshot.id()).isEqualTo(expectedSnapshotId);
+ assertThat(snapshot.commitKind()).isEqualTo(expectedCommitKind);
+ }
+
private void innerTest(
String tableName, int batchNum,
TestKeyValueGenerator.GeneratorMode mode) {
// increase trigger to avoid auto-compaction
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
index 843b99f5..4cedbbc8 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/TableStoreManagedFactoryTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.types.logical.RowType;
@@ -69,7 +68,6 @@ import static
org.apache.flink.table.store.file.TestKeyValueGenerator.DEFAULT_PA
import static
org.apache.flink.table.store.file.TestKeyValueGenerator.DEFAULT_ROW_TYPE;
import static
org.apache.flink.table.store.file.TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED;
import static
org.apache.flink.table.store.file.TestKeyValueGenerator.getPrimaryKeys;
-import static org.apache.flink.table.store.file.WriteMode.APPEND_ONLY;
import static
org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -246,20 +244,6 @@ public class TableStoreManagedFactoryTest {
.containsEntry(COMPACTION_PARTITION_SPEC.key(),
JsonSerdeUtil.toJson(partSpec));
}
- @Test
- public void testOnCompactAppendOnlyTable() {
- context =
- createEnrichedContext(
- Collections.singletonMap(
- CoreOptions.WRITE_MODE.key(),
APPEND_ONLY.toString()));
- assertThatThrownBy(
- () ->
- tableStoreManagedFactory.onCompactTable(
- context, new
CatalogPartitionSpec(Collections.emptyMap())))
- .isInstanceOf(UnsupportedOperationException.class)
- .hasMessage("ALTER TABLE COMPACT is not yet supported for
append only table.");
- }
-
// ~ Tools
------------------------------------------------------------------
private static ResolvedCatalogTable getDummyTable(Map<String, String>
tableOptions) {
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManager.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManager.java
index e30b0e43..1f27efe4 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManager.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManager.java
@@ -22,10 +22,15 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.store.file.compact.CompactManager;
import org.apache.flink.table.store.file.compact.CompactResult;
import org.apache.flink.table.store.file.compact.CompactTask;
+import org.apache.flink.table.store.file.utils.FileUtils;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
/** Compact manager for {@link
org.apache.flink.table.store.file.AppendOnlyFileStore}. */
@@ -62,12 +67,19 @@ public class AppendOnlyCompactManager extends
CompactManager {
.ifPresent(
(inputs) ->
taskFuture =
- executor.submit(
- new
AppendOnlyCompactTask(inputs, rewriter)));
+ executor.submit(new
AutoCompactTask(inputs, rewriter)));
}
@VisibleForTesting
Optional<List<DataFileMeta>> pickCompactBefore() {
+ return pick(toCompact, targetFileSize, minFileNum, maxFileNum);
+ }
+
+ private static Optional<List<DataFileMeta>> pick(
+ LinkedList<DataFileMeta> toCompact,
+ long targetFileSize,
+ int minFileNum,
+ int maxFileNum) {
long totalFileSize = 0L;
int fileNum = 0;
LinkedList<DataFileMeta> candidates = new LinkedList<>();
@@ -81,7 +93,7 @@ public class AppendOnlyCompactManager extends CompactManager {
|| fileNum >= maxFileNum) {
return Optional.of(candidates);
} else if (totalFileSize >= targetFileSize) {
- // left pointer shift one pos to right
+ // let pointer shift one pos to right
DataFileMeta removed = candidates.pollFirst();
assert removed != null;
totalFileSize -= removed.fileSize();
@@ -97,33 +109,113 @@ public class AppendOnlyCompactManager extends
CompactManager {
return toCompact;
}
- /** A {@link CompactTask} impl for append-only table. */
- public static class AppendOnlyCompactTask extends CompactTask {
+ /**
+ * A {@link CompactTask} impl for ALTER TABLE COMPACT of append-only table.
+ *
+ * <p>This task accepts a pre-scanned file list as input and pick the
candidate files to compact
+ * iteratively until reach the end of the input. There might be multiple
times of rewrite
+ * happens during one task.
+ */
+ public static class IterativeCompactTask extends CompactTask {
+
+ private final long targetFileSize;
+ private final int minFileNum;
+ private final int maxFileNum;
+ private final CompactRewriter rewriter;
+ private final DataFilePathFactory factory;
+
+ public IterativeCompactTask(
+ List<DataFileMeta> inputs,
+ long targetFileSize,
+ int minFileNum,
+ int maxFileNum,
+ CompactRewriter rewriter,
+ DataFilePathFactory factory) {
+ super(inputs);
+ this.targetFileSize = targetFileSize;
+ this.minFileNum = minFileNum;
+ this.maxFileNum = maxFileNum;
+ this.rewriter = rewriter;
+ this.factory = factory;
+ }
+
+ @Override
+ protected CompactResult doCompact(List<DataFileMeta> inputs) throws
Exception {
+ LinkedList<DataFileMeta> toCompact = new LinkedList<>(inputs);
+ Set<DataFileMeta> compactBefore = new LinkedHashSet<>();
+ List<DataFileMeta> compactAfter = new ArrayList<>();
+ while (!toCompact.isEmpty()) {
+ Optional<List<DataFileMeta>> candidates =
+ AppendOnlyCompactManager.pick(
+ toCompact, targetFileSize, minFileNum,
maxFileNum);
+ if (candidates.isPresent()) {
+ List<DataFileMeta> before = candidates.get();
+ compactBefore.addAll(before);
+ List<DataFileMeta> after = rewriter.rewrite(before);
+ compactAfter.addAll(after);
+ DataFileMeta lastFile = after.get(after.size() - 1);
+ if (lastFile.fileSize() < targetFileSize) {
+ toCompact.offerFirst(lastFile);
+ }
+ } else {
+ break;
+ }
+ }
+ // remove and delete intermediate files
+ Iterator<DataFileMeta> afterIterator = compactAfter.iterator();
+ while (afterIterator.hasNext()) {
+ DataFileMeta file = afterIterator.next();
+ if (compactBefore.contains(file)) {
+ compactBefore.remove(file);
+ afterIterator.remove();
+ delete(file);
+ }
+ }
+ return result(new ArrayList<>(compactBefore), compactAfter);
+ }
+
+ @VisibleForTesting
+ void delete(DataFileMeta tmpFile) {
+ FileUtils.deleteOrWarn(factory.toPath(tmpFile.fileName()));
+ }
+ }
+
+ /**
+ * A {@link CompactTask} impl for append-only table auto-compaction.
+ *
+ * <p>This task accepts an already-picked candidate to perform one-time
rewrite. And for the
+ * rest of input files, it is the duty of {@link AppendOnlyWriter} to
invoke the next time
+ * compaction.
+ */
+ public static class AutoCompactTask extends CompactTask {
private final CompactRewriter rewriter;
- public AppendOnlyCompactTask(List<DataFileMeta> toCompact,
CompactRewriter rewriter) {
+ public AutoCompactTask(List<DataFileMeta> toCompact, CompactRewriter
rewriter) {
super(toCompact);
this.rewriter = rewriter;
}
@Override
protected CompactResult doCompact(List<DataFileMeta> inputs) throws
Exception {
- List<DataFileMeta> compactAfter = rewriter.rewrite(inputs);
- return new CompactResult() {
- @Override
- public List<DataFileMeta> before() {
- return inputs;
- }
-
- @Override
- public List<DataFileMeta> after() {
- return compactAfter;
- }
- };
+ return result(inputs, rewriter.rewrite(inputs));
}
}
+ private static CompactResult result(List<DataFileMeta> before,
List<DataFileMeta> after) {
+ return new CompactResult() {
+ @Override
+ public List<DataFileMeta> before() {
+ return before;
+ }
+
+ @Override
+ public List<DataFileMeta> after() {
+ return after;
+ }
+ };
+ }
+
/** Compact rewriter for append-only table. */
public interface CompactRewriter {
List<DataFileMeta> rewrite(List<DataFileMeta> compactBefore) throws
Exception;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
index 30456991..b2bed275 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
@@ -95,8 +95,16 @@ public class AppendOnlyFileStoreWrite extends
AbstractFileStoreWrite<RowData> {
@Override
public Callable<CompactResult> createCompactWriter(
BinaryRowData partition, int bucket, @Nullable List<DataFileMeta>
compactFiles) {
- throw new UnsupportedOperationException(
- "Currently append only write mode does not support
compaction.");
+ if (compactFiles == null) {
+ compactFiles = scanExistingFileMetas(partition, bucket);
+ }
+ return new AppendOnlyCompactManager.IterativeCompactTask(
+ compactFiles,
+ targetFileSize,
+ minFileNum,
+ maxFileNum,
+ compactRewriter(partition, bucket),
+ pathFactory.createDataFilePathFactory(partition, bucket));
}
private RecordWriter<RowData> createWriter(
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManagerTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManagerTest.java
index 9979f080..94959520 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManagerTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManagerTest.java
@@ -200,12 +200,13 @@ public class AppendOnlyCompactManagerTest {
long targetFileSize = 1024;
AppendOnlyCompactManager manager =
new AppendOnlyCompactManager(
- null,
+ null, // not used
new LinkedList<>(toCompactBeforePick),
minFileNum,
maxFileNum,
targetFileSize,
- null);
+ null // not used
+ );
Optional<List<DataFileMeta>> actual = manager.pickCompactBefore();
assertThat(actual.isPresent()).isEqualTo(expectedPresent);
if (expectedPresent) {
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/IterativeCompactTaskTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/IterativeCompactTaskTest.java
new file mode 100644
index 00000000..c5610ca6
--- /dev/null
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/IterativeCompactTaskTest.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.data;
+
+import org.apache.flink.table.store.file.compact.CompactResult;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.flink.table.store.file.data.DataFileTestUtils.newFile;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/** Test for {@link IterativeCompactTaskTest}. */
+public class IterativeCompactTaskTest {
+
+ private static final long TARGET_FILE_SIZE = 1024L;
+ private static final int MIN_FILE_NUM = 3;
+ private static final int MAX_FILE_NUM = 10;
+
+ @Test
+ public void testNoCompact() {
+ // empty
+ innerTest(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList());
+
+ // single small file
+ innerTest(
+ Collections.singletonList(newFile(1L, 10L)),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList());
+
+ // single large file
+ innerTest(
+ Collections.singletonList(newFile(1L, 1024L)),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList());
+
+ // almost-full files
+ innerTest(
+ Arrays.asList(newFile(1L, 1024L), newFile(2L, 2048L)),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList());
+
+ // large files
+ innerTest(
+ Arrays.asList(newFile(1L, 1000L), newFile(1001L, 1100L)),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList());
+ }
+
+ @Test
+ public void testCompactOnce() {
+ // small files on the head
+ innerTest(
+ Arrays.asList(
+ newFile(1L, 500L),
+ newFile(501L, 1000L),
+ newFile(1001L, 1010L),
+ newFile(1011L, 1024L),
+ newFile(1025L, 3000L)),
+ Arrays.asList(
+ newFile(1L, 500L),
+ newFile(501L, 1000L),
+ newFile(1001L, 1010L),
+ newFile(1011L, 1024L)),
+ Collections.singletonList(newFile(1L, 1024L)),
+ Collections.emptyList());
+
+ innerTest(
+ Arrays.asList(
+ newFile(1L, 500L),
+ newFile(501L, 1000L),
+ newFile(1001L, 1010L),
+ newFile(1011L, 2000L),
+ newFile(2001L, 3000L)),
+ Arrays.asList(
+ newFile(1L, 500L),
+ newFile(501L, 1000L),
+ newFile(1001L, 1010L),
+ newFile(1011L, 2000L)),
+ Arrays.asList(newFile(1L, 1024L), newFile(1025L, 2000L)),
+ Collections.emptyList());
+
+ // small files in the middle
+ innerTest(
+ Arrays.asList(
+ newFile(1L, 2000L),
+ newFile(2001L, 4000L),
+ newFile(4001L, 4500L),
+ newFile(4501L, 4600L),
+ newFile(4601L, 4700L),
+ newFile(4701L, 5024L),
+ newFile(5025L, 7000L)),
+ Arrays.asList(
+ newFile(4001L, 4500L),
+ newFile(4501L, 4600L),
+ newFile(4601L, 4700L),
+ newFile(4701L, 5024L)),
+ Collections.singletonList(newFile(4001L, 5024L)),
+ Collections.emptyList());
+
+ // small files on the tail
+ innerTest(
+ Arrays.asList(
+ newFile(1L, 2000L),
+ newFile(2001L, 4000L),
+ newFile(4001L, 4010L),
+ newFile(4011L, 4020L),
+ newFile(4021L, 4030L),
+ newFile(4031L, 4040L),
+ newFile(4041L, 4050L),
+ newFile(4051L, 4060L),
+ newFile(4061L, 4070L),
+ newFile(4071L, 4080L),
+ newFile(4081L, 4090L),
+ newFile(4091L, 4110L)),
+ Arrays.asList(
+ newFile(4001L, 4010L),
+ newFile(4011L, 4020L),
+ newFile(4021L, 4030L),
+ newFile(4031L, 4040L),
+ newFile(4041L, 4050L),
+ newFile(4051L, 4060L),
+ newFile(4061L, 4070L),
+ newFile(4071L, 4080L),
+ newFile(4081L, 4090L),
+ newFile(4091L, 4110L)),
+ Collections.singletonList(newFile(4001L, 4110L)),
+ Collections.emptyList());
+ }
+
+ @Test
+ public void testCompactMultiple() {
+ // continuous compact
+ innerTest(
+ Arrays.asList(
+ newFile(1L, 2000L),
+ newFile(2001L, 4000L),
+ // 4001~4010, ..., 4091~4110
+ newFile(4001L, 4010L),
+ newFile(4011L, 4020L),
+ newFile(4021L, 4030L),
+ newFile(4031L, 4040L),
+ newFile(4041L, 4050L),
+ newFile(4051L, 4060L),
+ newFile(4061L, 4070L),
+ newFile(4071L, 4080L),
+ newFile(4081L, 4090L),
+ newFile(4091L, 4110L),
+ // 4001~4110, 5015~5024
+ newFile(4111L, 5000L),
+ newFile(5001L, 5014L),
+ newFile(5015L, 5024L)),
+ Arrays.asList(
+ newFile(4001L, 4010L),
+ newFile(4011L, 4020L),
+ newFile(4021L, 4030L),
+ newFile(4031L, 4040L),
+ newFile(4041L, 4050L),
+ newFile(4051L, 4060L),
+ newFile(4061L, 4070L),
+ newFile(4071L, 4080L),
+ newFile(4081L, 4090L),
+ newFile(4091L, 4110L),
+ newFile(4111L, 5000L),
+ newFile(5001L, 5014L),
+ newFile(5015L, 5024L)),
+ Collections.singletonList(newFile(4001L, 5024L)),
+ Collections.singletonList(newFile(4001L, 4110L)));
+
+ // alternate compact
+ innerTest(
+ Arrays.asList(
+ newFile(1L, 2000L),
+ newFile(2001L, 4000L),
+ // 4001~4500, ..., 4701~6000
+ newFile(4001L, 4500L),
+ newFile(4501L, 4600L),
+ newFile(4601L, 4700L),
+ newFile(4701L, 6000L),
+ newFile(6001L, 7500L),
+ // 7501~8000, 8201~8900
+ newFile(7501L, 8000L),
+ newFile(8001L, 8200L),
+ newFile(8201L, 8900L),
+ newFile(8901L, 9550L)),
+ Arrays.asList(
+ newFile(4001L, 4500L),
+ newFile(4501L, 4600L),
+ newFile(4601L, 4700L),
+ newFile(4701L, 6000L),
+ newFile(7501L, 8000L),
+ newFile(8001L, 8200L),
+ newFile(8201L, 8900L)),
+ Arrays.asList(
+ newFile(4001L, 5024L),
+ newFile(5025L, 6000L),
+ newFile(7501L, 8524L),
+ newFile(8525L, 8900L)),
+ Collections.emptyList());
+ }
+
+ private void innerTest(
+ List<DataFileMeta> compactFiles,
+ List<DataFileMeta> expectBefore,
+ List<DataFileMeta> expectAfter,
+ List<DataFileMeta> expectDeleted) {
+ MockIterativeCompactTask task =
+ new MockIterativeCompactTask(
+ compactFiles, TARGET_FILE_SIZE, MIN_FILE_NUM,
MAX_FILE_NUM, rewriter());
+ try {
+ CompactResult actual = task.doCompact(compactFiles);
+
assertThat(actual.before()).containsExactlyInAnyOrderElementsOf(expectBefore);
+
assertThat(actual.after()).containsExactlyInAnyOrderElementsOf(expectAfter);
+
+ // assert the temporary files are deleted
+
assertThat(task.deleted).containsExactlyInAnyOrderElementsOf(expectDeleted);
+ } catch (Exception e) {
+ fail("This should not happen");
+ }
+ }
+
+ /** A Mock {@link AppendOnlyCompactManager.IterativeCompactTask} to test.
*/
+ private static class MockIterativeCompactTask
+ extends AppendOnlyCompactManager.IterativeCompactTask {
+
+ private final Set<DataFileMeta> deleted;
+
+ public MockIterativeCompactTask(
+ List<DataFileMeta> inputs,
+ long targetFileSize,
+ int minFileNum,
+ int maxFileNum,
+ AppendOnlyCompactManager.CompactRewriter rewriter) {
+ super(inputs, targetFileSize, minFileNum, maxFileNum, rewriter,
null);
+ deleted = new HashSet<>();
+ }
+
+ @Override
+ void delete(DataFileMeta tmpFile) {
+ deleted.add(tmpFile);
+ }
+ }
+
+ private AppendOnlyCompactManager.CompactRewriter rewriter() {
+ return compactBefore -> {
+ List<DataFileMeta> compactAfter = new ArrayList<>();
+ long totalFileSize = 0L;
+ long minSeq = -1L;
+ for (int i = 0; i < compactBefore.size(); i++) {
+ DataFileMeta file = compactBefore.get(i);
+ if (i == 0) {
+ minSeq = file.minSequenceNumber();
+ }
+ totalFileSize += file.fileSize();
+ if (totalFileSize >= TARGET_FILE_SIZE) {
+ compactAfter.add(newFile(minSeq, minSeq + TARGET_FILE_SIZE
- 1));
+ minSeq += TARGET_FILE_SIZE;
+ }
+ if (i == compactBefore.size() - 1 && minSeq <=
file.maxSequenceNumber()) {
+ compactAfter.add(newFile(minSeq,
file.maxSequenceNumber()));
+ }
+ }
+ return compactAfter;
+ };
+ }
+}