This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 8b080e1 [FLINK-25689] Introduce atomic commit
8b080e1 is described below
commit 8b080e108d3ac3ff4425f2deb41c154e0e0c5504
Author: tsreaper <[email protected]>
AuthorDate: Fri Jan 21 13:38:38 2022 +0800
[FLINK-25689] Introduce atomic commit
This closes #12
---
.../flink/table/store/file/FileStoreOptions.java | 52 +++
.../apache/flink/table/store/file/KeyValue.java | 39 +++
.../apache/flink/table/store/file/Snapshot.java | 132 ++++++++
.../store/file/manifest/ManifestCommittable.java | 30 ++
.../table/store/file/manifest/ManifestEntry.java | 17 +-
.../table/store/file/manifest/ManifestFile.java | 21 +-
.../store/file/manifest/ManifestFileMeta.java | 94 ++++++
.../table/store/file/manifest/ManifestList.java | 21 +-
.../table/store/file/mergetree/sst/SstFile.java | 13 +-
.../store/file/operation/FileStoreCommitImpl.java | 375 +++++++++++++++++++++
.../store/file/operation/FileStoreScanImpl.java | 142 ++++++++
.../store/file/utils/FileStorePathFactory.java | 44 ++-
.../flink/table/store/file/utils/FileUtils.java | 28 ++
.../table/store/file/TestKeyValueGenerator.java | 6 +-
.../store/file/manifest/ManifestFileMetaTest.java | 227 +++++++++++++
.../store/file/manifest/ManifestFileTest.java | 5 +-
.../store/file/manifest/ManifestListTest.java | 5 +-
.../file/mergetree/sst/SstTestDataGenerator.java | 4 +-
.../file/operation/FileStoreCommitTestBase.java | 268 +++++++++++++++
.../store/file/operation/TestCommitThread.java | 251 ++++++++++++++
20 files changed, 1732 insertions(+), 42 deletions(-)
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
new file mode 100644
index 0000000..028b678
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.ReadableConfig;
+
+/** Options for {@link FileStore}. */
+public class FileStoreOptions {
+
+ public static final ConfigOption<Integer> BUCKET =
+ ConfigOptions.key("bucket")
+ .intType()
+ .defaultValue(1)
+ .withDescription("Bucket number for file store.");
+
+ public static final ConfigOption<MemorySize> MANIFEST_TARGET_FILE_SIZE =
+ ConfigOptions.key("manifest.target-file-size")
+ .memoryType()
+ .defaultValue(MemorySize.ofMebiBytes(8))
+ .withDescription("Suggested file size of a manifest
file.");
+
+ public final int bucket;
+ public final MemorySize manifestSuggestedSize;
+
+ public FileStoreOptions(int bucket, MemorySize manifestSuggestedSize) {
+ this.bucket = bucket;
+ this.manifestSuggestedSize = manifestSuggestedSize;
+ }
+
+ public FileStoreOptions(ReadableConfig config) {
+ this(config.get(BUCKET), config.get(MANIFEST_TARGET_FILE_SIZE));
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
index d5ff507..8461d64 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
@@ -18,13 +18,17 @@
package org.apache.flink.table.store.file;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TinyIntType;
import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
/**
* A key value, including user key, sequence number, value kind and value.
This object can be
@@ -73,4 +77,39 @@ public class KeyValue {
fields.addAll(valueType.getFields());
return new RowType(fields);
}
+
+ @VisibleForTesting
+ public KeyValue copy(RowDataSerializer keySerializer, RowDataSerializer
valueSerializer) {
+ return new KeyValue()
+ .replace(
+ keySerializer.copy(key),
+ sequenceNumber,
+ valueKind,
+ valueSerializer.copy(value));
+ }
+
+ @VisibleForTesting
+ public String toString(RowType keyType, RowType valueType) {
+ String keyString = rowDataToString(key, keyType);
+ String valueString = rowDataToString(value, valueType);
+ return "{kind: "
+ + valueKind.name()
+ + ", seq: "
+ + sequenceNumber
+ + ", key: ("
+ + keyString
+ + "), value: ("
+ + valueString
+ + ")}";
+ }
+
+ public static String rowDataToString(RowData row, RowType type) {
+ return IntStream.range(0, type.getFieldCount())
+ .mapToObj(
+ i ->
+ String.valueOf(
+
RowData.createFieldGetter(type.getTypeAt(i), i)
+ .getFieldOrNull(row)))
+ .collect(Collectors.joining(", "));
+ }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
new file mode 100644
index 0000000..77f79e6
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/Snapshot.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+
+/** This file is the entrance to all data committed at some specific time
point. */
+public class Snapshot {
+
+ public static final long FIRST_SNAPSHOT_ID = 1;
+
+ private static final String FIELD_ID = "id";
+ private static final String FIELD_MANIFEST_LIST = "manifestList";
+ private static final String FIELD_COMMIT_USER = "commitUser";
+ private static final String FIELD_COMMIT_DIGEST = "commitDigest";
+ private static final String FIELD_COMMIT_KIND = "commitKind";
+
+ @JsonProperty(FIELD_ID)
+ private final long id;
+
+ @JsonProperty(FIELD_MANIFEST_LIST)
+ private final String manifestList;
+
+ @JsonProperty(FIELD_COMMIT_USER)
+ private final String commitUser;
+
+ // for deduplication
+ @JsonProperty(FIELD_COMMIT_DIGEST)
+ private final String commitDigest;
+
+ @JsonProperty(FIELD_COMMIT_KIND)
+ private final CommitKind commitKind;
+
+ @JsonCreator
+ public Snapshot(
+ @JsonProperty(FIELD_ID) long id,
+ @JsonProperty(FIELD_MANIFEST_LIST) String manifestList,
+ @JsonProperty(FIELD_COMMIT_USER) String commitUser,
+ @JsonProperty(FIELD_COMMIT_DIGEST) String commitDigest,
+ @JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind) {
+ this.id = id;
+ this.manifestList = manifestList;
+ this.commitUser = commitUser;
+ this.commitDigest = commitDigest;
+ this.commitKind = commitKind;
+ }
+
+ @JsonGetter(FIELD_ID)
+ public long id() {
+ return id;
+ }
+
+ @JsonGetter(FIELD_MANIFEST_LIST)
+ public String manifestList() {
+ return manifestList;
+ }
+
+ @JsonGetter(FIELD_COMMIT_USER)
+ public String commitUser() {
+ return commitUser;
+ }
+
+ @JsonGetter(FIELD_COMMIT_DIGEST)
+ public String commitDigest() {
+ return commitDigest;
+ }
+
+ @JsonGetter(FIELD_COMMIT_KIND)
+ public CommitKind commitKind() {
+ return commitKind;
+ }
+
+ public String toJson() {
+ try {
+ return new ObjectMapper().writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static Snapshot fromJson(String json) {
+ try {
+ return new ObjectMapper().readValue(json, Snapshot.class);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static Snapshot fromPath(Path path) {
+ try {
+ String json = FileUtils.readFileUtf8(path);
+ return Snapshot.fromJson(json);
+ } catch (IOException e) {
+ throw new RuntimeException("Fails to read snapshot from path " +
path, e);
+ }
+ }
+
+ /** Type of changes in this snapshot. */
+ public enum CommitKind {
+
+ /** Changes flushed from the mem table. */
+ APPEND,
+
+ /** Changes by compacting existing sst files. */
+ COMPACT
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
index a02fd75..4fcd763 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestCommittable.java
@@ -98,4 +98,34 @@ public class ManifestCommittable {
public int hashCode() {
return Objects.hash(newFiles, compactBefore, compactAfter);
}
+
+ @Override
+ public String toString() {
+ return "new files:\n"
+ + filesToString(newFiles)
+ + "compact before:\n"
+ + filesToString(compactBefore)
+ + "compact after:\n"
+ + filesToString(compactAfter);
+ }
+
+ private static String filesToString(Map<BinaryRowData, Map<Integer,
List<SstFileMeta>>> files) {
+ StringBuilder builder = new StringBuilder();
+ for (Map.Entry<BinaryRowData, Map<Integer, List<SstFileMeta>>>
entryWithPartition :
+ files.entrySet()) {
+ for (Map.Entry<Integer, List<SstFileMeta>> entryWithBucket :
+ entryWithPartition.getValue().entrySet()) {
+ for (SstFileMeta sst : entryWithBucket.getValue()) {
+ builder.append(" * partition: ")
+ .append(entryWithPartition.getKey())
+ .append(", bucket: ")
+ .append(entryWithBucket.getKey())
+ .append(", file: ")
+ .append(sst.fileName())
+ .append("\n");
+ }
+ }
+ }
+ return builder.toString();
+ }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntry.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntry.java
index 206b8e1..170b024 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntry.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestEntry.java
@@ -73,7 +73,7 @@ public class ManifestEntry {
}
public Identifier identifier() {
- return new Identifier(partition, bucket, file.fileName());
+ return new Identifier(partition, bucket, file.level(),
file.fileName());
}
public static RowType schema(RowType partitionType, RowType keyType,
RowType rowType) {
@@ -114,13 +114,15 @@ public class ManifestEntry {
* file.
*/
public static class Identifier {
- private final BinaryRowData partition;
- private final int bucket;
- private final String fileName;
+ public final BinaryRowData partition;
+ public final int bucket;
+ public final int level;
+ public final String fileName;
- private Identifier(BinaryRowData partition, int bucket, String
fileName) {
+ private Identifier(BinaryRowData partition, int bucket, int level,
String fileName) {
this.partition = partition;
this.bucket = bucket;
+ this.level = level;
this.fileName = fileName;
}
@@ -132,17 +134,18 @@ public class ManifestEntry {
Identifier that = (Identifier) o;
return Objects.equals(partition, that.partition)
&& bucket == that.bucket
+ && level == that.level
&& Objects.equals(fileName, that.fileName);
}
@Override
public int hashCode() {
- return Objects.hash(partition, bucket, fileName);
+ return Objects.hash(partition, bucket, level, fileName);
}
@Override
public String toString() {
- return String.format("{%s, %d, %s}", partition, bucket, fileName);
+ return String.format("{%s, %d, %d, %s}", partition, bucket, level,
fileName);
}
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
index 13d8be3..1b2bb92 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java
@@ -32,9 +32,6 @@ import org.apache.flink.table.store.file.utils.FileUtils;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.util.List;
@@ -44,8 +41,6 @@ import java.util.List;
*/
public class ManifestFile {
- private static final Logger LOG =
LoggerFactory.getLogger(ManifestFile.class);
-
private final RowType partitionType;
private final ManifestEntrySerializer serializer;
private final BulkFormat<RowData, FileSourceSplit> readerFactory;
@@ -66,9 +61,13 @@ public class ManifestFile {
this.pathFactory = pathFactory;
}
- public List<ManifestEntry> read(String fileName) throws IOException {
- return FileUtils.readListFromFile(
- pathFactory.toManifestFilePath(fileName), serializer,
readerFactory);
+ public List<ManifestEntry> read(String fileName) {
+ try {
+ return FileUtils.readListFromFile(
+ pathFactory.toManifestFilePath(fileName), serializer,
readerFactory);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to read manifest file " +
fileName, e);
+ }
}
/**
@@ -76,7 +75,7 @@ public class ManifestFile {
*
* <p>NOTE: This method is atomic.
*/
- public ManifestFileMeta write(List<ManifestEntry> entries) throws
IOException {
+ public ManifestFileMeta write(List<ManifestEntry> entries) {
Preconditions.checkArgument(
entries.size() > 0, "Manifest entries to write must not be
empty.");
@@ -84,9 +83,9 @@ public class ManifestFile {
try {
return write(entries, path);
} catch (Throwable e) {
- LOG.warn("Exception occurs when writing manifest file " + path +
". Cleaning up.", e);
FileUtils.deleteOrWarn(path);
- throw e;
+ throw new RuntimeException(
+ "Exception occurs when writing manifest file " + path + ".
Clean up.", e);
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
index 38f9467..0c4aa8f 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFileMeta.java
@@ -23,10 +23,13 @@ import
org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.util.Preconditions;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
/** Metadata of a manifest file. */
@@ -118,4 +121,95 @@ public class ManifestFileMeta {
numDeletedFiles,
Arrays.toString(partitionStats));
}
+
+ /**
+ * Merge several {@link ManifestFileMeta}s. {@link ManifestEntry}s
representing first adding and
+ * then deleting the same sst file will cancel each other.
+ *
+ * <p>NOTE: This method is atomic.
+ */
+ public static List<ManifestFileMeta> merge(
+ List<ManifestFileMeta> metas, ManifestFile manifestFile, long
suggestedMetaSize) {
+ List<ManifestFileMeta> result = new ArrayList<>();
+ // these are the newly created manifest files, clean them up if
exception occurs
+ List<ManifestFileMeta> newMetas = new ArrayList<>();
+ List<ManifestFileMeta> candidate = new ArrayList<>();
+ long totalSize = 0;
+
+ try {
+ for (ManifestFileMeta manifest : metas) {
+ totalSize += manifest.fileSize;
+ candidate.add(manifest);
+ if (totalSize >= suggestedMetaSize) {
+ // reach suggested file size, perform merging and produce
new file
+ merge(candidate, manifestFile, result, newMetas);
+ candidate.clear();
+ totalSize = 0;
+ }
+ }
+ if (!candidate.isEmpty()) {
+ // merge the last bit of metas
+ merge(candidate, manifestFile, result, newMetas);
+ }
+ } catch (Throwable e) {
+ // exception occurs, clean up and rethrow
+ for (ManifestFileMeta manifest : newMetas) {
+ manifestFile.delete(manifest.fileName);
+ }
+ throw e;
+ }
+
+ return result;
+ }
+
+ private static void merge(
+ List<ManifestFileMeta> metas,
+ ManifestFile manifestFile,
+ List<ManifestFileMeta> result,
+ List<ManifestFileMeta> newMetas) {
+ if (metas.size() > 1) {
+ ManifestFileMeta newMeta = merge(metas, manifestFile);
+ result.add(newMeta);
+ newMetas.add(newMeta);
+ } else {
+ result.addAll(metas);
+ }
+ }
+
+ private static ManifestFileMeta merge(List<ManifestFileMeta> metas,
ManifestFile manifestFile) {
+ Preconditions.checkArgument(
+ metas.size() > 1, "Number of ManifestFileMeta <= 1. This is a
bug.");
+
+ Map<ManifestEntry.Identifier, ManifestEntry> map = new
LinkedHashMap<>();
+ for (ManifestFileMeta manifest : metas) {
+ for (ManifestEntry entry : manifestFile.read(manifest.fileName)) {
+ ManifestEntry.Identifier identifier = entry.identifier();
+ switch (entry.kind()) {
+ case ADD:
+ Preconditions.checkState(
+ !map.containsKey(identifier),
+ "Trying to add file %s which is already added.
Manifest might be corrupted.",
+ identifier);
+ map.put(identifier, entry);
+ break;
+ case DELETE:
+ // each sst file will only be added once and deleted
once,
+ // if we know that it is added before then both add
and delete entry can be
+ // removed because there won't be further operations
on this file,
+ // otherwise we have to keep the delete entry because
the add entry must be
+ // in the previous manifest files
+ if (map.containsKey(identifier)) {
+ map.remove(identifier);
+ } else {
+ map.put(identifier, entry);
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unknown value kind " + entry.kind().name());
+ }
+ }
+ }
+ return manifestFile.write(new ArrayList<>(map.values()));
+ }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
index 73d9ac1..1b008dd 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestList.java
@@ -31,9 +31,6 @@ import org.apache.flink.table.store.file.utils.FileUtils;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.util.List;
@@ -43,8 +40,6 @@ import java.util.List;
*/
public class ManifestList {
- private static final Logger LOG =
LoggerFactory.getLogger(ManifestList.class);
-
private final ManifestFileMetaSerializer serializer;
private final BulkFormat<RowData, FileSourceSplit> readerFactory;
private final BulkWriter.Factory<RowData> writerFactory;
@@ -59,9 +54,13 @@ public class ManifestList {
this.pathFactory = pathFactory;
}
- public List<ManifestFileMeta> read(String fileName) throws IOException {
- return FileUtils.readListFromFile(
- pathFactory.toManifestListPath(fileName), serializer,
readerFactory);
+ public List<ManifestFileMeta> read(String fileName) {
+ try {
+ return FileUtils.readListFromFile(
+ pathFactory.toManifestListPath(fileName), serializer,
readerFactory);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to read manifest list " +
fileName, e);
+ }
}
/**
@@ -69,7 +68,7 @@ public class ManifestList {
*
* <p>NOTE: This method is atomic.
*/
- public String write(List<ManifestFileMeta> metas) throws IOException {
+ public String write(List<ManifestFileMeta> metas) {
Preconditions.checkArgument(
metas.size() > 0, "Manifest file metas to write must not be
empty.");
@@ -77,9 +76,9 @@ public class ManifestList {
try {
return write(metas, path);
} catch (Throwable e) {
- LOG.warn("Exception occurs when writing manifest list " + path +
". Cleaning up.", e);
FileUtils.deleteOrWarn(path);
- throw e;
+ throw new RuntimeException(
+ "Exception occurs when writing manifest list " + path + ".
Clean up.", e);
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFile.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFile.java
index d73ff45..caae17b 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFile.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFile.java
@@ -222,11 +222,22 @@ public class SstFile {
this.maxKey = null;
this.minSequenceNumber = Long.MAX_VALUE;
this.maxSequenceNumber = Long.MIN_VALUE;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating new sst file " + path);
+ }
}
private void write(KeyValue kv) throws IOException {
- writer.addElement(serializer.toRow(kv));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Writing key-value to sst file "
+ + path
+ + ", kv: "
+ + kv.toString(keyType, valueType));
+ }
+ writer.addElement(serializer.toRow(kv));
rowCount++;
if (minKey == null) {
minKey = keySerializer.toBinaryRow(kv.key()).copy();
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
new file mode 100644
index 0000000..8cb9fc0
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreCommitImpl.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
+import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.FileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Default implementation of {@link FileStoreCommit}.
+ *
+ * <p>This class provides an atomic commit method to the user.
+ *
+ * <ol>
+ * <li>Before calling {@link FileStoreCommitImpl#commit}, if user cannot
determine if this commit
+ * is done before, user should first call {@link
FileStoreCommitImpl#filterCommitted}.
+ * <li>Before committing, it will first check for conflicts by checking if
all files to be removed
+ * currently exists.
+ * <li>After that it use the external {@link FileStoreCommitImpl#lock} (if
provided) or the atomic
+ * rename of the file system to ensure atomicity.
+ * <li>If commit fails due to conflicts or exception it tries its best to
clean up and aborts.
+ * <li>If atomic rename fails it tries again after reading the latest
snapshot from step 2.
+ * </ol>
+ */
+public class FileStoreCommitImpl implements FileStoreCommit {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FileStoreCommitImpl.class);
+
+ private final String commitUser;
+ private final ManifestCommittableSerializer committableSerializer;
+
+ private final FileStorePathFactory pathFactory;
+ private final ManifestFile manifestFile;
+ private final ManifestList manifestList;
+ private final FileStoreOptions fileStoreOptions;
+ private final FileStoreScan scan;
+
+ @Nullable private Lock lock;
+
+ public FileStoreCommitImpl(
+ String commitUser,
+ ManifestCommittableSerializer committableSerializer,
+ FileStorePathFactory pathFactory,
+ ManifestFile manifestFile,
+ ManifestList manifestList,
+ FileStoreOptions fileStoreOptions,
+ FileStoreScan scan) {
+ this.commitUser = commitUser;
+ this.committableSerializer = committableSerializer;
+
+ this.pathFactory = pathFactory;
+ this.manifestFile = manifestFile;
+ this.manifestList = manifestList;
+ this.fileStoreOptions = fileStoreOptions;
+ this.scan = scan;
+
+ this.lock = null;
+ }
+
+ @Override
+ public FileStoreCommit withLock(Lock lock) {
+ this.lock = lock;
+ return this;
+ }
+
+ @Override
+ public List<ManifestCommittable> filterCommitted(List<ManifestCommittable>
committableList) {
+ committableList = new ArrayList<>(committableList);
+
+ // filter out commits with no new files
+ committableList.removeIf(committable ->
committable.newFiles().isEmpty());
+
+ // if there is no previous snapshots then nothing should be filtered
+ Long latestSnapshotId = pathFactory.latestSnapshotId();
+ if (latestSnapshotId == null) {
+ return committableList;
+ }
+
+ // check if a committable is already committed by its hash
+ Map<String, ManifestCommittable> digests = new LinkedHashMap<>();
+ for (ManifestCommittable committable : committableList) {
+ digests.put(digestManifestCommittable(committable), committable);
+ }
+
+ for (long id = latestSnapshotId; id >= Snapshot.FIRST_SNAPSHOT_ID;
id--) {
+ Path snapshotPath = pathFactory.toSnapshotPath(id);
+ Snapshot snapshot = Snapshot.fromPath(snapshotPath);
+ if (commitUser.equals(snapshot.commitUser())) {
+ if (digests.containsKey(snapshot.commitDigest())) {
+ digests.remove(snapshot.commitDigest());
+ } else {
+ // early exit, because committableList must be the latest
commits by this
+ // commit user
+ break;
+ }
+ }
+ }
+
+ return new ArrayList<>(digests.values());
+ }
+
+ @Override
+ public void commit(ManifestCommittable committable, Map<String, String>
properties) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ready to commit\n" + committable.toString());
+ }
+
+ String hash = digestManifestCommittable(committable);
+
+ List<ManifestEntry> appendChanges =
collectChanges(committable.newFiles(), ValueKind.ADD);
+ if (!appendChanges.isEmpty()) {
+ tryCommit(appendChanges, hash, Snapshot.CommitKind.APPEND);
+ }
+
+ List<ManifestEntry> compactChanges = new ArrayList<>();
+ compactChanges.addAll(collectChanges(committable.compactBefore(),
ValueKind.DELETE));
+ compactChanges.addAll(collectChanges(committable.compactAfter(),
ValueKind.ADD));
+ if (!compactChanges.isEmpty()) {
+ tryCommit(compactChanges, hash, Snapshot.CommitKind.COMPACT);
+ }
+ }
+
+ @Override
+ public void overwrite(
+ Map<String, String> partition,
+ ManifestCommittable committable,
+ Map<String, String> properties) {
+ throw new UnsupportedOperationException();
+ }
+
+ private String digestManifestCommittable(ManifestCommittable committable) {
+ try {
+ return new String(
+ Base64.getEncoder()
+ .encode(
+ MessageDigest.getInstance("MD5")
+
.digest(committableSerializer.serialize(committable))));
+ } catch (NoSuchAlgorithmException e) {
+ throw new RuntimeException("MD5 algorithm not found. This is
impossible.", e);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Failed to serialize ManifestCommittable. This is
unexpected.", e);
+ }
+ }
+
+ private List<ManifestEntry> collectChanges(
+ Map<BinaryRowData, Map<Integer, List<SstFileMeta>>> map, ValueKind
kind) {
+ List<ManifestEntry> changes = new ArrayList<>();
+ for (Map.Entry<BinaryRowData, Map<Integer, List<SstFileMeta>>>
entryWithPartition :
+ map.entrySet()) {
+ for (Map.Entry<Integer, List<SstFileMeta>> entryWithBucket :
+ entryWithPartition.getValue().entrySet()) {
+ changes.addAll(
+ entryWithBucket.getValue().stream()
+ .map(
+ file ->
+ new ManifestEntry(
+ kind,
+
entryWithPartition.getKey(),
+
entryWithBucket.getKey(),
+
fileStoreOptions.bucket,
+ file))
+ .collect(Collectors.toList()));
+ }
+ }
+ return changes;
+ }
+
+ private void tryCommit(
+ List<ManifestEntry> changes, String hash, Snapshot.CommitKind
commitKind) {
+ while (true) {
+ Long latestSnapshotId = pathFactory.latestSnapshotId();
+ long newSnapshotId =
+ latestSnapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID :
latestSnapshotId + 1;
+ Path newSnapshotPath = pathFactory.toSnapshotPath(newSnapshotId);
+ Path tmpSnapshotPath =
pathFactory.toTmpSnapshotPath(newSnapshotId);
+
+ Snapshot latestSnapshot = null;
+ if (latestSnapshotId != null) {
+ noConflictsOrFail(latestSnapshotId, changes);
+ latestSnapshot =
Snapshot.fromPath(pathFactory.toSnapshotPath(latestSnapshotId));
+ }
+
+ Snapshot newSnapshot;
+ String manifestListName = null;
+ List<ManifestFileMeta> oldMetas = new ArrayList<>();
+ List<ManifestFileMeta> newMetas = new ArrayList<>();
+ try {
+ if (latestSnapshot != null) {
+ // read all previous manifest files
+
oldMetas.addAll(manifestList.read(latestSnapshot.manifestList()));
+ // merge manifest files
+ newMetas.addAll(
+ ManifestFileMeta.merge(
+ oldMetas,
+ manifestFile,
+
fileStoreOptions.manifestSuggestedSize.getBytes()));
+ }
+ // write all changes to manifest file
+ newMetas.add(manifestFile.write(changes));
+ // prepare snapshot file
+ manifestListName = manifestList.write(newMetas);
+ newSnapshot =
+ new Snapshot(newSnapshotId, manifestListName,
commitUser, hash, commitKind);
+ FileUtils.writeFileUtf8(tmpSnapshotPath, newSnapshot.toJson());
+ } catch (Throwable e) {
+ // fails when preparing for commit, we should clean up
+ cleanUpTmpSnapshot(tmpSnapshotPath, manifestListName,
oldMetas, newMetas);
+ throw new RuntimeException(
+ String.format(
+ "Exception occurs when preparing snapshot #%d
(path %s) by user %s "
+ + "with hash %s and kind %s. Clean
up.",
+ newSnapshotId,
+ newSnapshotPath.toString(),
+ commitUser,
+ hash,
+ commitKind.name()),
+ e);
+ }
+
+ boolean success;
+ try {
+ FileSystem fs = tmpSnapshotPath.getFileSystem();
+ // atomic rename
+ if (lock != null) {
+ success =
+ lock.runWithLock(
+ () ->
+ // fs.rename may not returns false
if target file
+ // already exists, or even not
atomic
+ // as we're relying on external
locking, we can first
+ // check if file exist then rename
to work around this
+ // case
+ !fs.exists(newSnapshotPath)
+ &&
fs.rename(tmpSnapshotPath, newSnapshotPath));
+ } else {
+ success = fs.rename(tmpSnapshotPath, newSnapshotPath);
+ }
+ } catch (Throwable e) {
+ // exception when performing the atomic rename,
+ // we cannot clean up because we can't determine the success
+ throw new RuntimeException(
+ String.format(
+ "Exception occurs when committing snapshot #%d
(path %s) by user %s "
+ + "with hash %s and kind %s. "
+ + "Cannot clean up because we can't
determine the success.",
+ newSnapshotId,
+ newSnapshotPath.toString(),
+ commitUser,
+ hash,
+ commitKind.name()),
+ e);
+ }
+
+ if (success) {
+ return;
+ }
+
+ // atomic rename fails, clean up and try again
+ LOG.warn(
+ String.format(
+ "Atomic rename failed for snapshot #%d (path %s)
by user %s "
+ + "with hash %s and kind %s. "
+ + "Clean up and try again.",
+ newSnapshotId,
+ newSnapshotPath.toString(),
+ commitUser,
+ hash,
+ commitKind.name()));
+ cleanUpTmpSnapshot(tmpSnapshotPath, manifestListName, oldMetas,
newMetas);
+ }
+ }
+
+ private void noConflictsOrFail(long snapshotId, List<ManifestEntry>
changes) {
+ Set<ManifestEntry.Identifier> removedFiles =
+ changes.stream()
+ .filter(e -> e.kind().equals(ValueKind.DELETE))
+ .map(ManifestEntry::identifier)
+ .collect(Collectors.toSet());
+ if (removedFiles.isEmpty()) {
+ // early exit for append only changes
+ return;
+ }
+
+ try {
+ // TODO use partition filter of scan when implemented
+ for (ManifestEntry entry :
scan.withSnapshot(snapshotId).plan().files()) {
+ removedFiles.remove(entry.identifier());
+ }
+ } catch (Throwable e) {
+ throw new RuntimeException("Cannot determine if conflicts exist.",
e);
+ }
+
+ if (!removedFiles.isEmpty()) {
+ throw new RuntimeException(
+ "Conflicts detected on:\n"
+ + removedFiles.stream()
+ .map(
+ i ->
+
pathFactory.getPartitionString(i.partition)
+ + ", bucket "
+ + i.bucket
+ + ", level "
+ + i.level
+ + ", file "
+ + i.fileName)
+ .collect(Collectors.joining("\n")));
+ }
+ }
+
+ private void cleanUpTmpSnapshot(
+ Path tmpSnapshotPath,
+ String manifestListName,
+ List<ManifestFileMeta> oldMetas,
+ List<ManifestFileMeta> newMetas) {
+ // clean up tmp snapshot file
+ FileUtils.deleteOrWarn(tmpSnapshotPath);
+ // clean up newly created manifest list
+ if (manifestListName != null) {
+ manifestList.delete(manifestListName);
+ }
+ // clean up newly merged manifest files
+ Set<ManifestFileMeta> oldMetaSet = new HashSet<>(oldMetas); // for
faster searching
+ for (ManifestFileMeta suspect : newMetas) {
+ if (!oldMetaSet.contains(suspect)) {
+ manifestList.delete(suspect.fileName());
+ }
+ }
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
new file mode 100644
index 0000000..da72dbf
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreScanImpl.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
+import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.predicate.Predicate;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Default implementation of {@link FileStoreScan}. */
+public class FileStoreScanImpl implements FileStoreScan {
+
+ private final FileStorePathFactory pathFactory;
+ private final ManifestFile manifestFile;
+ private final ManifestList manifestList;
+
+ private Long snapshotId;
+ private List<ManifestFileMeta> manifests;
+
+ public FileStoreScanImpl(
+ FileStorePathFactory pathFactory,
+ ManifestFile manifestFile,
+ ManifestList manifestList) {
+ this.pathFactory = pathFactory;
+ this.manifestFile = manifestFile;
+ this.manifestList = manifestList;
+
+ this.snapshotId = null;
+ this.manifests = new ArrayList<>();
+ }
+
+ @Override
+ public FileStoreScan withPartitionFilter(Predicate predicate) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public FileStoreScan withKeyFilter(Predicate predicate) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public FileStoreScan withValueFilter(Predicate predicate) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public FileStoreScan withBucket(int bucket) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public FileStoreScan withSnapshot(long snapshotId) {
+ this.snapshotId = snapshotId;
+ Snapshot snapshot =
Snapshot.fromPath(pathFactory.toSnapshotPath(snapshotId));
+ this.manifests = manifestList.read(snapshot.manifestList());
+ return this;
+ }
+
+ @Override
+ public FileStoreScan withManifestList(List<ManifestFileMeta> manifests) {
+ this.manifests = manifests;
+ return this;
+ }
+
+ @Override
+ public Plan plan() {
+ List<ManifestEntry> files = scan();
+
+ return new Plan() {
+ @Nullable
+ @Override
+ public Long snapshotId() {
+ return snapshotId;
+ }
+
+ @Override
+ public List<ManifestEntry> files() {
+ return files;
+ }
+ };
+ }
+
+ private List<ManifestEntry> scan() {
+ Map<ManifestEntry.Identifier, ManifestEntry> map = new
LinkedHashMap<>();
+ for (ManifestFileMeta manifest : manifests) {
+ // TODO read each manifest file concurrently
+ for (ManifestEntry entry : manifestFile.read(manifest.fileName()))
{
+ ManifestEntry.Identifier identifier = entry.identifier();
+ switch (entry.kind()) {
+ case ADD:
+ Preconditions.checkState(
+ !map.containsKey(identifier),
+ "Trying to add file %s which is already added.
"
+ + "Manifest might be corrupted.",
+ identifier);
+ map.put(identifier, entry);
+ break;
+ case DELETE:
+ Preconditions.checkState(
+ map.containsKey(identifier),
+ "Trying to delete file %s which is not
previously added. "
+ + "Manifest might be corrupted.",
+ identifier);
+ map.remove(identifier);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unknown value kind " + entry.kind().name());
+ }
+ }
+ }
+ return new ArrayList<>(map.values());
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
index 0aaef19..9aa9844 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java
@@ -21,8 +21,11 @@ package org.apache.flink.table.store.file.utils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
import org.apache.flink.connector.file.table.RowDataPartitionComputer;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.mergetree.sst.SstPathFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
@@ -30,11 +33,20 @@ import
org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter;
import org.apache.flink.table.utils.PartitionPathUtils;
import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
import java.util.UUID;
/** Factory which produces {@link Path}s for each type of files. */
public class FileStorePathFactory {
+ private static final Logger LOG =
LoggerFactory.getLogger(FileStorePathFactory.class);
+ private static final String SNAPSHOT_PREFIX = "snapshot-";
+
private final Path root;
private final String uuid;
private final RowDataPartitionComputer partitionComputer;
@@ -82,7 +94,11 @@ public class FileStorePathFactory {
}
public Path toSnapshotPath(long id) {
- return new Path(root + "/snapshot/snapshot-" + id);
+ return new Path(root + "/snapshot/" + SNAPSHOT_PREFIX + id);
+ }
+
+ public Path toTmpSnapshotPath(long id) {
+ return new Path(root + "/snapshot/." + SNAPSHOT_PREFIX + id + "-" +
UUID.randomUUID());
}
public SstPathFactory createSstPathFactory(BinaryRowData partition, int
bucket) {
@@ -96,6 +112,32 @@ public class FileStorePathFactory {
partition, "Partition row data is null. This
is unexpected.")));
}
+ @Nullable
+ public Long latestSnapshotId() {
+ // TODO add a `bestEffort` argument and read from a best-effort
CURRENT file if true
+ try {
+ Path snapshotDir = new Path(root + "/snapshot");
+ FileSystem fs = snapshotDir.getFileSystem();
+ FileStatus[] statuses = fs.listStatus(snapshotDir);
+
+ long latestId = Snapshot.FIRST_SNAPSHOT_ID - 1;
+ for (FileStatus status : statuses) {
+ String fileName = status.getPath().getName();
+ if (fileName.startsWith(SNAPSHOT_PREFIX)) {
+ try {
+ long id =
Long.parseLong(fileName.substring(SNAPSHOT_PREFIX.length()));
+ latestId = Math.max(latestId, id);
+ } catch (NumberFormatException e) {
+ LOG.warn("Invalid snapshot file name found " +
fileName, e);
+ }
+ }
+ }
+ return latestId < Snapshot.FIRST_SNAPSHOT_ID ? null : latestId;
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to find latest snapshot id", e);
+ }
+ }
+
@VisibleForTesting
public String uuid() {
return uuid;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java
index b787b9a..33f78b5 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java
@@ -23,6 +23,8 @@ import
org.apache.flink.connector.base.source.reader.SourceReaderOptions;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.src.util.Utils;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.RowData;
@@ -30,7 +32,11 @@ import org.apache.flink.table.data.RowData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.BufferedReader;
import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
@@ -63,6 +69,28 @@ public class FileUtils {
return path.getFileSystem().getFileStatus(path).getLen();
}
+ public static String readFileUtf8(Path file) throws IOException {
+ try (FSDataInputStream in = file.getFileSystem().open(file)) {
+ BufferedReader reader =
+ new BufferedReader(new InputStreamReader(in,
StandardCharsets.UTF_8));
+ StringBuilder builder = new StringBuilder();
+ String line;
+ while ((line = reader.readLine()) != null) {
+ builder.append(line);
+ }
+ return builder.toString();
+ }
+ }
+
+ public static void writeFileUtf8(Path file, String content) throws
IOException {
+ try (FSDataOutputStream out =
+ file.getFileSystem().create(file,
FileSystem.WriteMode.NO_OVERWRITE)) {
+ OutputStreamWriter writer = new OutputStreamWriter(out,
StandardCharsets.UTF_8);
+ writer.write(content);
+ writer.close();
+ }
+ }
+
public static void deleteOrWarn(Path file) {
try {
FileSystem fs = file.getFileSystem();
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java
index 6a39165..3f6ad2f 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestKeyValueGenerator.java
@@ -78,7 +78,7 @@ public class TestKeyValueGenerator {
private static final RowDataSerializer PARTITION_SERIALIZER =
new RowDataSerializer(PARTITION_TYPE);
public static final RowDataSerializer KEY_SERIALIZER = new
RowDataSerializer(KEY_TYPE);
- private static final RecordComparator KEY_COMPARATOR =
+ public static final RecordComparator KEY_COMPARATOR =
(a, b) -> {
int firstResult = a.getInt(0) - b.getInt(0);
if (firstResult != 0) {
@@ -169,10 +169,6 @@ public class TestKeyValueGenerator {
});
}
- public int compareKeys(BinaryRowData a, BinaryRowData b) {
- return KEY_COMPARATOR.compare(a, b);
- }
-
private Order pick(List<Order> list) {
int idx = random.nextInt(list.size());
Order tmp = list.get(idx);
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
new file mode 100644
index 0000000..027dda5
--- /dev/null
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileMetaTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.manifest;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.writer.BinaryRowWriter;
+import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ManifestFileMeta}. */
+public class ManifestFileMetaTest {
+
+ private static final RowType PARTITION_TYPE = RowType.of(new IntType());
+ private static final RowType KEY_TYPE = RowType.of(new IntType());
+ private static final RowType ROW_TYPE = RowType.of(new BigIntType());
+
+ private final FileFormat avro;
+
+ @TempDir java.nio.file.Path tempDir;
+ private ManifestFile manifestFile;
+
+ public ManifestFileMetaTest() {
+ this.avro =
+ FileFormat.fromIdentifier(
+ ManifestFileMetaTest.class.getClassLoader(), "avro",
new Configuration());
+ }
+
+ @BeforeEach
+ public void beforeEach() {
+ manifestFile = createManifestFile(tempDir.toString());
+ }
+
+ @Test
+ public void testMerge() {
+ List<ManifestFileMeta> input = new ArrayList<>();
+ List<ManifestFileMeta> expected = new ArrayList<>();
+ createData(input, expected);
+
+ List<ManifestFileMeta> actual = ManifestFileMeta.merge(input,
manifestFile, 500);
+ assertThat(actual).hasSameSizeAs(expected);
+
+ // these three manifest files are merged from the input
+ assertSameContent(expected.get(0), actual.get(0), manifestFile);
+ assertSameContent(expected.get(1), actual.get(1), manifestFile);
+ assertSameContent(expected.get(4), actual.get(4), manifestFile);
+
+ // these two manifest files should be kept without modification
+ assertThat(actual.get(2)).isEqualTo(input.get(5));
+ assertThat(actual.get(3)).isEqualTo(input.get(6));
+ }
+
+ private void assertSameContent(
+ ManifestFileMeta expected, ManifestFileMeta actual, ManifestFile
manifestFile) {
+ // check meta
+ assertThat(actual.numAddedFiles()).isEqualTo(expected.numAddedFiles());
+
assertThat(actual.numDeletedFiles()).isEqualTo(expected.numDeletedFiles());
+
assertThat(actual.partitionStats()).isEqualTo(expected.partitionStats());
+
+ // check content
+ assertThat(manifestFile.read(actual.fileName()))
+ .isEqualTo(manifestFile.read(expected.fileName()));
+ }
+
+ @RepeatedTest(10)
+ public void testCleanUpForException() throws IOException {
+ FailingAtomicRenameFileSystem.resetFailCounter(1);
+ FailingAtomicRenameFileSystem.setFailPossibility(10);
+
+ List<ManifestFileMeta> input = new ArrayList<>();
+ createData(input, null);
+ ManifestFile failingManifestFile =
+ createManifestFile(
+ FailingAtomicRenameFileSystem.SCHEME + "://" +
tempDir.toString());
+
+ try {
+ ManifestFileMeta.merge(input, failingManifestFile, 500);
+ } catch (Throwable e) {
+ assertThat(e)
+ .hasRootCauseExactlyInstanceOf(
+
FailingAtomicRenameFileSystem.ArtificialException.class);
+ // old files should be kept untouched, while new files should be
cleaned up
+ Path manifestDir = new Path(tempDir.toString() + "/manifest");
+ FileSystem fs = manifestDir.getFileSystem();
+ assertThat(
+ new TreeSet<>(
+ Arrays.stream(fs.listStatus(manifestDir))
+ .map(s -> s.getPath().getName())
+ .collect(Collectors.toList())))
+ .isEqualTo(
+ new TreeSet<>(
+ input.stream()
+ .map(ManifestFileMeta::fileName)
+ .collect(Collectors.toList())));
+ }
+ }
+
+ private ManifestFile createManifestFile(String path) {
+ return new ManifestFile(
+ PARTITION_TYPE,
+ KEY_TYPE,
+ ROW_TYPE,
+ avro,
+ new FileStorePathFactory(new Path(path), PARTITION_TYPE,
"default"));
+ }
+
+ private void createData(List<ManifestFileMeta> input,
List<ManifestFileMeta> expected) {
+ // suggested size 500
+ // file sizes:
+ // 200, 300, -- multiple files exactly the suggested size
+ // 100, 200, 300, -- multiple files exceeding the suggested size
+ // 500, -- single file exactly the suggested size
+ // 600, -- single file exceeding the suggested size
+ // 100, 200 -- not enough sizes, but the last bit
+
+ input.add(makeManifest(makeEntry(true, "A"), makeEntry(true, "B")));
+ input.add(makeManifest(makeEntry(true, "C"), makeEntry(false, "B"),
makeEntry(true, "D")));
+
+ input.add(makeManifest(makeEntry(false, "A")));
+ input.add(makeManifest(makeEntry(true, "E"), makeEntry(true, "F")));
+ input.add(makeManifest(makeEntry(true, "G"), makeEntry(false, "E"),
makeEntry(false, "G")));
+
+ input.add(
+ makeManifest(
+ makeEntry(false, "C"),
+ makeEntry(false, "F"),
+ makeEntry(true, "H"),
+ makeEntry(true, "I"),
+ makeEntry(false, "H")));
+
+ input.add(
+ makeManifest(
+ makeEntry(false, "I"),
+ makeEntry(true, "J"),
+ makeEntry(true, "K"),
+ makeEntry(false, "J"),
+ makeEntry(false, "K"),
+ makeEntry(true, "L")));
+
+ input.add(makeManifest(makeEntry(true, "M")));
+ input.add(makeManifest(makeEntry(false, "M"), makeEntry(true, "N")));
+
+ if (expected == null) {
+ return;
+ }
+
+ expected.add(
+ makeManifest(makeEntry(true, "A"), makeEntry(true, "C"),
makeEntry(true, "D")));
+ expected.add(makeManifest(makeEntry(false, "A"), makeEntry(true,
"F")));
+ expected.add(input.get(5));
+ expected.add(input.get(6));
+ expected.add(makeManifest(makeEntry(true, "N")));
+ }
+
+ private ManifestFileMeta makeManifest(ManifestEntry... entries) {
+ ManifestFileMeta writtenMeta =
manifestFile.write(Arrays.asList(entries));
+ return new ManifestFileMeta(
+ writtenMeta.fileName(),
+ entries.length * 100, // for testing purpose
+ writtenMeta.numAddedFiles(),
+ writtenMeta.numDeletedFiles(),
+ writtenMeta.partitionStats());
+ }
+
+ private ManifestEntry makeEntry(boolean isAdd, String fileName) {
+ BinaryRowData binaryRowData = new BinaryRowData(1);
+ BinaryRowWriter writer = new BinaryRowWriter(binaryRowData);
+ writer.writeInt(0, 0);
+ writer.complete();
+
+ return new ManifestEntry(
+ isAdd ? ValueKind.ADD : ValueKind.DELETE,
+ binaryRowData, // not used
+ 0, // not used
+ 0, // not used
+ new SstFileMeta(
+ fileName,
+ 0, // not used
+ 0, // not used
+ binaryRowData, // not used
+ binaryRowData, // not used
+ new FieldStats[] {new FieldStats(null, null, 0)}, //
not used
+ 0, // not used
+ 0, // not used
+ 0 // not used
+ ));
+ }
+}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
index b9f768a..ee09d9a 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestFileTest.java
@@ -46,7 +46,7 @@ public class ManifestFileTest {
@TempDir java.nio.file.Path tempDir;
@RepeatedTest(10)
- public void testWriteAndReadManifestFile() throws IOException {
+ public void testWriteAndReadManifestFile() {
List<ManifestEntry> entries = generateData();
ManifestFileMeta meta = gen.createManifestFileMeta(entries);
ManifestFile manifestFile = createManifestFile(tempDir.toString());
@@ -71,7 +71,8 @@ public class ManifestFileTest {
manifestFile.write(entries);
} catch (Throwable e) {
assertThat(e)
-
.isExactlyInstanceOf(FailingAtomicRenameFileSystem.ArtificialException.class);
+ .hasRootCauseExactlyInstanceOf(
+
FailingAtomicRenameFileSystem.ArtificialException.class);
Path manifestDir = new Path(tempDir.toString() + "/manifest");
FileSystem fs = manifestDir.getFileSystem();
assertThat(fs.listStatus(manifestDir)).isEmpty();
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
index c9117a1..e9df1c1 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestListTest.java
@@ -47,7 +47,7 @@ public class ManifestListTest {
@TempDir java.nio.file.Path tempDir;
@RepeatedTest(10)
- public void testWriteAndReadManifestList() throws IOException {
+ public void testWriteAndReadManifestList() {
List<ManifestFileMeta> metas = generateData();
ManifestList manifestList = createManifestList(tempDir.toString());
@@ -69,7 +69,8 @@ public class ManifestListTest {
manifestList.write(metas);
} catch (Throwable e) {
assertThat(e)
-
.isExactlyInstanceOf(FailingAtomicRenameFileSystem.ArtificialException.class);
+ .hasRootCauseExactlyInstanceOf(
+
FailingAtomicRenameFileSystem.ArtificialException.class);
Path manifestDir = new Path(tempDir.toString() + "/manifest");
FileSystem fs = manifestDir.getFileSystem();
assertThat(fs.listStatus(manifestDir)).isEmpty();
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstTestDataGenerator.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstTestDataGenerator.java
index f1ef606..bb39d7d 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstTestDataGenerator.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstTestDataGenerator.java
@@ -109,10 +109,10 @@ public class SstTestDataGenerator {
BinaryRowData value = (BinaryRowData) kv.value();
totalSize += key.getSizeInBytes() + value.getSizeInBytes();
collector.collect(value);
- if (minKey == null || gen.compareKeys(key, minKey) < 0) {
+ if (minKey == null ||
TestKeyValueGenerator.KEY_COMPARATOR.compare(key, minKey) < 0) {
minKey = key;
}
- if (maxKey == null || gen.compareKeys(key, maxKey) > 0) {
+ if (maxKey == null ||
TestKeyValueGenerator.KEY_COMPARATOR.compare(key, maxKey) > 0) {
maxKey = key;
}
minSequenceNumber = Math.min(minSequenceNumber,
kv.sequenceNumber());
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTestBase.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTestBase.java
new file mode 100644
index 0000000..0c8f678
--- /dev/null
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreCommitTestBase.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.mergetree.sst.SstFile;
+import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.RecordReaderIterator;
+import org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FileStoreCommitImpl}. */
+public abstract class FileStoreCommitTestBase {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FileStoreCommitTestBase.class);
+
+ private final FileFormat avro =
+ FileFormat.fromIdentifier(
+ FileStoreCommitTestBase.class.getClassLoader(), "avro",
new Configuration());
+
+ private TestKeyValueGenerator gen;
+ @TempDir java.nio.file.Path tempDir;
+
+ @BeforeEach
+ public void beforeEach() throws IOException {
+ gen = new TestKeyValueGenerator();
+ Path root = new Path(tempDir.toString());
+ root.getFileSystem().mkdirs(new Path(root + "/snapshot"));
+ }
+
+ protected abstract String getSchema();
+
+ @RepeatedTest(10)
+ public void testSingleCommitUser() throws Exception {
+ testRandomConcurrentNoConflict(1);
+ }
+
+ @RepeatedTest(10)
+ public void testManyCommitUsersNoConflict() throws Exception {
+ testRandomConcurrentNoConflict(ThreadLocalRandom.current().nextInt(3)
+ 2);
+ }
+
+ protected void testRandomConcurrentNoConflict(int numThreads) throws
Exception {
+ // prepare test data
+ Map<BinaryRowData, List<KeyValue>> data =
+ generateData(ThreadLocalRandom.current().nextInt(1000) + 1);
+ logData(
+ () ->
+ data.values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList()),
+ "input");
+ Map<BinaryRowData, BinaryRowData> expected =
+ toKvMap(
+ data.values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList()));
+
+ List<Map<BinaryRowData, List<KeyValue>>> dataPerThread = new
ArrayList<>();
+ for (int i = 0; i < numThreads; i++) {
+ dataPerThread.add(new HashMap<>());
+ }
+ for (Map.Entry<BinaryRowData, List<KeyValue>> entry : data.entrySet())
{
+ dataPerThread
+ .get(ThreadLocalRandom.current().nextInt(numThreads))
+ .put(entry.getKey(), entry.getValue());
+ }
+
+ // concurrent commits
+ List<TestCommitThread> threads = new ArrayList<>();
+ for (int i = 0; i < numThreads; i++) {
+ TestCommitThread thread =
+ new TestCommitThread(
+ dataPerThread.get(i), createTestPathFactory(),
createSafePathFactory());
+ thread.start();
+ threads.add(thread);
+ }
+ for (TestCommitThread thread : threads) {
+ thread.join();
+ }
+
+ // read actual data and compare
+ Map<BinaryRowData, BinaryRowData> actual =
toKvMap(readKvsFromLatestSnapshot());
+ logData(() -> kvMapToKvList(expected), "expected");
+ logData(() -> kvMapToKvList(actual), "actual");
+ assertThat(actual).isEqualTo(expected);
+ }
+
+ private Map<BinaryRowData, List<KeyValue>> generateData(int numRecords) {
+ Map<BinaryRowData, List<KeyValue>> data = new HashMap<>();
+ for (int i = 0; i < numRecords; i++) {
+ KeyValue kv = gen.next();
+ data.compute(gen.getPartition(kv), (p, l) -> l == null ? new
ArrayList<>() : l).add(kv);
+ }
+ return data;
+ }
+
+ private List<KeyValue> readKvsFromLatestSnapshot() throws IOException {
+ FileStorePathFactory pathFactory = createSafePathFactory();
+ Long latestSnapshotId = pathFactory.latestSnapshotId();
+ assertThat(latestSnapshotId).isNotNull();
+
+ ManifestFile manifestFile =
+ new ManifestFile(
+ TestKeyValueGenerator.PARTITION_TYPE,
+ TestKeyValueGenerator.KEY_TYPE,
+ TestKeyValueGenerator.ROW_TYPE,
+ avro,
+ pathFactory);
+ ManifestList manifestList =
+ new ManifestList(TestKeyValueGenerator.PARTITION_TYPE, avro,
pathFactory);
+
+ List<KeyValue> kvs = new ArrayList<>();
+ List<ManifestEntry> entries =
+ new FileStoreScanImpl(pathFactory, manifestFile, manifestList)
+ .withSnapshot(latestSnapshotId)
+ .plan()
+ .files();
+ for (ManifestEntry entry : entries) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Reading actual key-values from file " +
entry.file().fileName());
+ }
+ SstFile sstFile =
+ new SstFile(
+ TestKeyValueGenerator.KEY_TYPE,
+ TestKeyValueGenerator.ROW_TYPE,
+ avro,
+
pathFactory.createSstPathFactory(entry.partition(), 0),
+ 1024 * 1024 // not used
+ );
+ RecordReaderIterator iterator =
+ new
RecordReaderIterator(sstFile.read(entry.file().fileName()));
+ while (iterator.hasNext()) {
+ kvs.add(
+ iterator.next()
+ .copy(
+ TestKeyValueGenerator.KEY_SERIALIZER,
+ TestKeyValueGenerator.ROW_SERIALIZER));
+ }
+ }
+
+ gen.sort(kvs);
+ logData(() -> kvs, "raw read results");
+ return kvs;
+ }
+
+ private Map<BinaryRowData, BinaryRowData> toKvMap(List<KeyValue> kvs) {
+ Map<BinaryRowData, BinaryRowData> result = new HashMap<>();
+ for (KeyValue kv : kvs) {
+ BinaryRowData key =
TestKeyValueGenerator.KEY_SERIALIZER.toBinaryRow(kv.key()).copy();
+ BinaryRowData value =
+
TestKeyValueGenerator.ROW_SERIALIZER.toBinaryRow(kv.value()).copy();
+ switch (kv.valueKind()) {
+ case ADD:
+ result.put(key, value);
+ break;
+ case DELETE:
+ result.remove(key);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unknown value kind " + kv.valueKind().name());
+ }
+ }
+ return result;
+ }
+
+ private FileStorePathFactory createTestPathFactory() {
+ return new FileStorePathFactory(
+ new Path(getSchema() + "://" + tempDir.toString()),
+ TestKeyValueGenerator.PARTITION_TYPE,
+ "default");
+ }
+
+ private FileStorePathFactory createSafePathFactory() {
+ return new FileStorePathFactory(
+ new Path(TestAtomicRenameFileSystem.SCHEME + "://" +
tempDir.toString()),
+ TestKeyValueGenerator.PARTITION_TYPE,
+ "default");
+ }
+
+ private List<KeyValue> kvMapToKvList(Map<BinaryRowData, BinaryRowData>
map) {
+ return map.entrySet().stream()
+ .map(e -> new KeyValue().replace(e.getKey(), -1,
ValueKind.ADD, e.getValue()))
+ .collect(Collectors.toList());
+ }
+
+ private void logData(Supplier<List<KeyValue>> supplier, String name) {
+ if (!LOG.isDebugEnabled()) {
+ return;
+ }
+
+ LOG.debug("========== Beginning of " + name + " ==========");
+ for (KeyValue kv : supplier.get()) {
+ LOG.debug(kv.toString(TestKeyValueGenerator.KEY_TYPE,
TestKeyValueGenerator.ROW_TYPE));
+ }
+ LOG.debug("========== End of " + name + " ==========");
+ }
+
+ /** Tests for {@link FileStoreCommitImpl} with {@link
TestAtomicRenameFileSystem}. */
+ public static class WithTestAtomicRenameFileSystem extends
FileStoreCommitTestBase {
+
+ @Override
+ protected String getSchema() {
+ return TestAtomicRenameFileSystem.SCHEME;
+ }
+ }
+
+ /** Tests for {@link FileStoreCommitImpl} with {@link
FailingAtomicRenameFileSystem}. */
+ public static class WithFailingAtomicRenameFileSystem extends
FileStoreCommitTestBase {
+
+ @BeforeEach
+ @Override
+ public void beforeEach() throws IOException {
+ super.beforeEach();
+ FailingAtomicRenameFileSystem.resetFailCounter(100);
+ FailingAtomicRenameFileSystem.setFailPossibility(5000);
+ }
+
+ @Override
+ protected String getSchema() {
+ return FailingAtomicRenameFileSystem.SCHEME;
+ }
+ }
+}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
new file mode 100644
index 0000000..f7f4c86
--- /dev/null
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.operation;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.FileFormat;
+import org.apache.flink.table.store.file.FileStoreOptions;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.TestKeyValueGenerator;
+import org.apache.flink.table.store.file.manifest.ManifestCommittable;
+import
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
+import org.apache.flink.table.store.file.manifest.ManifestEntry;
+import org.apache.flink.table.store.file.manifest.ManifestFile;
+import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.mergetree.MergeTree;
+import org.apache.flink.table.store.file.mergetree.MergeTreeOptions;
+import org.apache.flink.table.store.file.mergetree.MergeTreeWriter;
+import
org.apache.flink.table.store.file.mergetree.compact.DeduplicateAccumulator;
+import org.apache.flink.table.store.file.mergetree.sst.SstFile;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+/** Testing {@link Thread}s to perform concurrent commits. */
+public class TestCommitThread extends Thread {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TestCommitThread.class);
+
+ private static final MergeTreeOptions MERGE_TREE_OPTIONS;
+ private static final long SUGGESTED_SST_FILE_SIZE = 1024;
+
+ static {
+ Configuration mergeTreeConf = new Configuration();
+ mergeTreeConf.set(MergeTreeOptions.WRITE_BUFFER_SIZE,
MemorySize.parse("16 kb"));
+ mergeTreeConf.set(MergeTreeOptions.PAGE_SIZE, MemorySize.parse("4
kb"));
+ MERGE_TREE_OPTIONS = new MergeTreeOptions(mergeTreeConf);
+ }
+
+ private final FileFormat avro =
+ FileFormat.fromIdentifier(
+ FileStoreCommitTestBase.class.getClassLoader(), "avro",
new Configuration());
+
+ private final Map<BinaryRowData, List<KeyValue>> data;
+ private final FileStorePathFactory safePathFactory;
+
+ private final Map<BinaryRowData, MergeTreeWriter> writers;
+
+ private final FileStoreScan scan;
+ private final FileStoreCommit commit;
+
+ public TestCommitThread(
+ Map<BinaryRowData, List<KeyValue>> data,
+ FileStorePathFactory testPathFactory,
+ FileStorePathFactory safePathFactory) {
+ this.data = data;
+ this.safePathFactory = safePathFactory;
+
+ this.writers = new HashMap<>();
+
+ this.scan =
+ new FileStoreScanImpl(
+ safePathFactory,
+ createManifestFile(safePathFactory),
+ createManifestList(safePathFactory));
+
+ ManifestCommittableSerializer serializer =
+ new ManifestCommittableSerializer(
+ TestKeyValueGenerator.PARTITION_TYPE,
+ TestKeyValueGenerator.KEY_TYPE,
+ TestKeyValueGenerator.ROW_TYPE);
+ ManifestFile testManifestFile = createManifestFile(testPathFactory);
+ ManifestList testManifestList = createManifestList(testPathFactory);
+ Configuration fileStoreConf = new Configuration();
+ fileStoreConf.set(FileStoreOptions.BUCKET, 1);
+ fileStoreConf.set(
+ FileStoreOptions.MANIFEST_TARGET_FILE_SIZE,
+ MemorySize.parse((ThreadLocalRandom.current().nextInt(16) + 1)
+ "kb"));
+ FileStoreOptions fileStoreOptions = new
FileStoreOptions(fileStoreConf);
+ FileStoreScanImpl testScan =
+ new FileStoreScanImpl(testPathFactory, testManifestFile,
testManifestList);
+ this.commit =
+ new FileStoreCommitImpl(
+ UUID.randomUUID().toString(),
+ serializer,
+ testPathFactory,
+ testManifestFile,
+ testManifestList,
+ fileStoreOptions,
+ testScan);
+ }
+
+ private ManifestFile createManifestFile(FileStorePathFactory pathFactory) {
+ return new ManifestFile(
+ TestKeyValueGenerator.PARTITION_TYPE,
+ TestKeyValueGenerator.KEY_TYPE,
+ TestKeyValueGenerator.ROW_TYPE,
+ avro,
+ pathFactory);
+ }
+
+ private ManifestList createManifestList(FileStorePathFactory pathFactory) {
+ return new ManifestList(TestKeyValueGenerator.PARTITION_TYPE, avro,
pathFactory);
+ }
+
+ @Override
+ public void run() {
+ while (!data.isEmpty()) {
+ ManifestCommittable committable;
+ try {
+ committable = createCommittable();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ boolean shouldCheckFilter = false;
+ while (true) {
+ try {
+ if (shouldCheckFilter) {
+ if
(commit.filterCommitted(Collections.singletonList(committable))
+ .isEmpty()) {
+ break;
+ }
+ }
+ commit.commit(committable, Collections.emptyMap());
+ break;
+ } catch (Throwable e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.warn(
+ "["
+ + Thread.currentThread().getName()
+ + "] Failed to commit because of
exception, try again",
+ e);
+ }
+ writers.clear();
+ shouldCheckFilter = true;
+ }
+ }
+ }
+
+ for (MergeTreeWriter writer : writers.values()) {
+ try {
+ writer.sync();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ writer.close();
+ }
+ }
+
+ private ManifestCommittable createCommittable() throws Exception {
+ int numWrites = ThreadLocalRandom.current().nextInt(3) + 1;
+ for (int i = 0; i < numWrites && !data.isEmpty(); i++) {
+ writeData();
+ }
+
+ ManifestCommittable committable = new ManifestCommittable();
+ for (Map.Entry<BinaryRowData, MergeTreeWriter> entry :
writers.entrySet()) {
+ committable.add(entry.getKey(), 0,
entry.getValue().prepareCommit());
+ }
+ return committable;
+ }
+
+ private void writeData() throws Exception {
+ List<KeyValue> changes = new ArrayList<>();
+ BinaryRowData partition = pickData(changes);
+ MergeTreeWriter writer =
+ writers.compute(partition, (k, v) -> v == null ?
createWriter(k) : v);
+ for (KeyValue kv : changes) {
+ writer.write(kv.valueKind(), kv.key(), kv.value());
+ }
+ }
+
+ private BinaryRowData pickData(List<KeyValue> changes) {
+ List<BinaryRowData> keys = new ArrayList<>(data.keySet());
+ BinaryRowData partition =
keys.get(ThreadLocalRandom.current().nextInt(keys.size()));
+ List<KeyValue> remaining = data.get(partition);
+ int numChanges = ThreadLocalRandom.current().nextInt(Math.min(100,
remaining.size() + 1));
+ changes.addAll(remaining.subList(0, numChanges));
+ if (numChanges == remaining.size()) {
+ data.remove(partition);
+ } else {
+ remaining.subList(0, numChanges).clear();
+ }
+ return partition;
+ }
+
+ private MergeTreeWriter createWriter(BinaryRowData partition) {
+ SstFile sstFile =
+ new SstFile(
+ TestKeyValueGenerator.KEY_TYPE,
+ TestKeyValueGenerator.ROW_TYPE,
+ avro,
+ safePathFactory.createSstPathFactory(partition, 0),
+ SUGGESTED_SST_FILE_SIZE);
+ ExecutorService service =
+ Executors.newSingleThreadExecutor(
+ r -> {
+ Thread t = new Thread(r);
+ t.setName(Thread.currentThread().getName() +
"-writer-service-pool");
+ return t;
+ });
+ MergeTree mergeTree =
+ new MergeTree(
+ MERGE_TREE_OPTIONS,
+ sstFile,
+ TestKeyValueGenerator.KEY_COMPARATOR,
+ service,
+ new DeduplicateAccumulator());
+ Long latestSnapshotId = safePathFactory.latestSnapshotId();
+ if (latestSnapshotId == null) {
+ return (MergeTreeWriter)
mergeTree.createWriter(Collections.emptyList());
+ } else {
+ return (MergeTreeWriter)
+ mergeTree.createWriter(
+
scan.withSnapshot(latestSnapshotId).plan().files().stream()
+ .filter(e ->
partition.equals(e.partition()))
+ .map(ManifestEntry::file)
+ .collect(Collectors.toList()));
+ }
+ }
+}