This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 32ad5d876 [core] Introduce index metadata (#1344)
32ad5d876 is described below
commit 32ad5d87662324a5d17e6501195eef619872f803
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jun 12 15:53:49 2023 +0800
[core] Introduce index metadata (#1344)
---
.../java/org/apache/paimon/utils/IntFileUtils.java | 80 ++++++++++
.../java/org/apache/paimon/utils/IntHashSet.java | 30 +++-
.../java/org/apache/paimon/utils/IntIterator.java | 70 +++++++++
.../org/apache/paimon/utils/IntHashSetTest.java | 2 +-
.../java/org/apache/paimon/AbstractFileStore.java | 16 ++
.../src/main/java/org/apache/paimon/FileStore.java | 3 +
.../src/main/java/org/apache/paimon/Snapshot.java | 19 +++
.../org/apache/paimon/index/HashIndexFile.java | 62 ++++++++
.../org/apache/paimon/index/IndexFileHandler.java | 126 +++++++++++++++
.../org/apache/paimon/index/IndexFileMeta.java | 105 +++++++++++++
.../paimon/index/IndexFileMetaSerializer.java | 51 ++++++
.../java/org/apache/paimon/io/IndexIncrement.java | 64 ++++++++
.../apache/paimon/manifest/IndexManifestEntry.java | 174 +++++++++++++++++++++
.../manifest/IndexManifestEntrySerializer.java | 71 +++++++++
.../apache/paimon/manifest/IndexManifestFile.java | 105 +++++++++++++
.../paimon/operation/FileStoreCommitImpl.java | 75 ++++++++-
.../paimon/table/sink/CommitMessageImpl.java | 39 ++++-
.../paimon/table/sink/CommitMessageSerializer.java | 8 +-
.../apache/paimon/utils/FileStorePathFactory.java | 38 +++++
.../test/java/org/apache/paimon/TestFileStore.java | 27 +++-
.../org/apache/paimon/index/HashIndexFileTest.java | 75 +++++++++
.../paimon/index/IndexFileMetaSerializerTest.java | 44 +++---
.../manifest/IndexManifestEntrySerializerTest.java | 55 +++++++
.../apache/paimon/operation/FileDeletionTest.java | 1 +
.../paimon/operation/FileStoreCommitTest.java | 97 ++++++++++++
.../table/sink/CommitMessageSerializerTest.java | 17 +-
.../apache/paimon/utils/SnapshotManagerTest.java | 2 +
27 files changed, 1411 insertions(+), 45 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/IntFileUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/IntFileUtils.java
new file mode 100644
index 000000000..e1758dd74
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/IntFileUtils.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+
+import it.unimi.dsi.fastutil.io.FastBufferedInputStream;
+import it.unimi.dsi.fastutil.io.FastBufferedOutputStream;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+/** File to store ints. */
+public class IntFileUtils {
+
+ public static IntIterator readInts(FileIO fileIO, Path path) throws
IOException {
+ FastBufferedInputStream in = new
FastBufferedInputStream(fileIO.newInputStream(path));
+ return new IntIterator() {
+
+ @Override
+ public int next() throws IOException {
+ return readInt(in);
+ }
+
+ @Override
+ public void close() throws IOException {
+ in.close();
+ }
+ };
+ }
+
+ public static void writeInts(FileIO fileIO, Path path, IntIterator input)
throws IOException {
+ try (FastBufferedOutputStream out =
+ new
FastBufferedOutputStream(fileIO.newOutputStream(path, false));
+ IntIterator iterator = input) {
+ while (true) {
+ try {
+ writeInt(out, iterator.next());
+ } catch (EOFException ignored) {
+ break;
+ }
+ }
+ }
+ }
+
+ private static int readInt(FastBufferedInputStream in) throws IOException {
+ int ch1 = in.read();
+ int ch2 = in.read();
+ int ch3 = in.read();
+ int ch4 = in.read();
+ if ((ch1 | ch2 | ch3 | ch4) < 0) {
+ throw new EOFException();
+ }
+ return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4);
+ }
+
+ private static void writeInt(FastBufferedOutputStream out, int v) throws
IOException {
+ out.write((v >>> 24) & 0xFF);
+ out.write((v >>> 16) & 0xFF);
+ out.write((v >>> 8) & 0xFF);
+ out.write(v & 0xFF);
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/IntHashSet.java
b/paimon-common/src/main/java/org/apache/paimon/utils/IntHashSet.java
index f5924b42d..2d73bc6d6 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/IntHashSet.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/IntHashSet.java
@@ -18,9 +18,10 @@
package org.apache.paimon.utils;
-import it.unimi.dsi.fastutil.ints.IntArrays;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import java.io.EOFException;
+
/** A hash set for ints. */
public class IntHashSet {
@@ -38,9 +39,28 @@ public class IntHashSet {
set.add(value);
}
- public int[] toSortedInts() {
- int[] ints = set.toIntArray();
- IntArrays.stableSort(ints);
- return ints;
+ public int size() {
+ return set.size();
+ }
+
+ public IntIterator toIntIterator() {
+ it.unimi.dsi.fastutil.ints.IntIterator iterator = set.intIterator();
+ return new IntIterator() {
+
+ @Override
+ public int next() throws EOFException {
+ if (!iterator.hasNext()) {
+ throw new EOFException();
+ }
+ return iterator.nextInt();
+ }
+
+ @Override
+ public void close() {}
+ };
+ }
+
+ public int[] toInts() {
+ return set.toIntArray();
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/IntIterator.java
b/paimon-common/src/main/java/org/apache/paimon/utils/IntIterator.java
new file mode 100644
index 000000000..f2ef79367
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/IntIterator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Iterator for ints. */
+public interface IntIterator extends Closeable {
+
+ int next() throws IOException;
+
+ static int[] toInts(IntIterator input) {
+ return toIntList(input).stream().mapToInt(Integer::intValue).toArray();
+ }
+
+ static List<Integer> toIntList(IntIterator input) {
+ List<Integer> ints = new ArrayList<>();
+ try (IntIterator iterator = input) {
+ while (true) {
+ try {
+ ints.add(iterator.next());
+ } catch (EOFException ignored) {
+ break;
+ }
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ return ints;
+ }
+
+ static IntIterator create(int[] ints) {
+ return new IntIterator() {
+
+ int pos = -1;
+
+ @Override
+ public int next() throws EOFException {
+ if (pos >= ints.length - 1) {
+ throw new EOFException();
+ }
+ return ints[++pos];
+ }
+
+ @Override
+ public void close() {}
+ };
+ }
+}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/utils/IntHashSetTest.java
b/paimon-common/src/test/java/org/apache/paimon/utils/IntHashSetTest.java
index c476ff563..25e3e033f 100644
--- a/paimon-common/src/test/java/org/apache/paimon/utils/IntHashSetTest.java
+++ b/paimon-common/src/test/java/org/apache/paimon/utils/IntHashSetTest.java
@@ -46,6 +46,6 @@ public class IntHashSetTest {
values.forEach(set::add);
int[] expected =
values.stream().mapToInt(Integer::intValue).sorted().toArray();
- assertThat(set.toSortedInts()).containsExactly(expected);
+ assertThat(set.toInts()).containsExactlyInAnyOrder(expected);
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 26e711121..d1025706e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -21,6 +21,9 @@ package org.apache.paimon;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.index.HashIndexFile;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.manifest.IndexManifestFile;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.operation.FileStoreCommitImpl;
@@ -116,6 +119,18 @@ public abstract class AbstractFileStore<T> implements
FileStore<T> {
forWrite ? writeManifestCache : null);
}
+ protected IndexManifestFile.Factory indexManifestFileFactory() {
+ return new IndexManifestFile.Factory(fileIO, options.manifestFormat(),
pathFactory());
+ }
+
+ @Override
+ public IndexFileHandler newIndexFileHandler() {
+ return new IndexFileHandler(
+ snapshotManager(),
+ indexManifestFileFactory().create(),
+ new HashIndexFile(fileIO, pathFactory().indexFileFactory()));
+ }
+
@Override
public RowType partitionType() {
return partitionType;
@@ -137,6 +152,7 @@ public abstract class AbstractFileStore<T> implements
FileStore<T> {
snapshotManager(),
manifestFileFactory(),
manifestListFactory(),
+ indexManifestFileFactory(),
newScan(),
options.bucket(),
options.manifestTargetSize(),
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index 1bc724a4b..d0d1b94c9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -18,6 +18,7 @@
package org.apache.paimon;
+import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.FileStoreExpire;
@@ -51,6 +52,8 @@ public interface FileStore<T> extends Serializable {
FileStoreScan newScan();
+ IndexFileHandler newIndexFileHandler();
+
FileStoreRead<T> newRead();
FileStoreWrite<T> newWrite(String commitUser);
diff --git a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
index 8c9139b94..8cb82922c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
+++ b/paimon-core/src/main/java/org/apache/paimon/Snapshot.java
@@ -73,6 +73,7 @@ public class Snapshot {
private static final String FIELD_BASE_MANIFEST_LIST = "baseManifestList";
private static final String FIELD_DELTA_MANIFEST_LIST =
"deltaManifestList";
private static final String FIELD_CHANGELOG_MANIFEST_LIST =
"changelogManifestList";
+ private static final String FIELD_INDEX_MANIFEST = "indexManifest";
private static final String FIELD_COMMIT_USER = "commitUser";
private static final String FIELD_COMMIT_IDENTIFIER = "commitIdentifier";
private static final String FIELD_COMMIT_KIND = "commitKind";
@@ -110,6 +111,12 @@ public class Snapshot {
@Nullable
private final String changelogManifestList;
+ // a manifest recording all index files of this table
+ // null if no index file
+ @JsonProperty(FIELD_INDEX_MANIFEST)
+ @Nullable
+ private final String indexManifest;
+
@JsonProperty(FIELD_COMMIT_USER)
private final String commitUser;
@@ -164,6 +171,7 @@ public class Snapshot {
String baseManifestList,
String deltaManifestList,
@Nullable String changelogManifestList,
+ @Nullable String indexManifest,
String commitUser,
long commitIdentifier,
CommitKind commitKind,
@@ -180,6 +188,7 @@ public class Snapshot {
baseManifestList,
deltaManifestList,
changelogManifestList,
+ indexManifest,
commitUser,
commitIdentifier,
commitKind,
@@ -199,6 +208,7 @@ public class Snapshot {
@JsonProperty(FIELD_BASE_MANIFEST_LIST) String baseManifestList,
@JsonProperty(FIELD_DELTA_MANIFEST_LIST) String deltaManifestList,
@JsonProperty(FIELD_CHANGELOG_MANIFEST_LIST) @Nullable String
changelogManifestList,
+ @JsonProperty(FIELD_INDEX_MANIFEST) @Nullable String indexManifest,
@JsonProperty(FIELD_COMMIT_USER) String commitUser,
@JsonProperty(FIELD_COMMIT_IDENTIFIER) long commitIdentifier,
@JsonProperty(FIELD_COMMIT_KIND) CommitKind commitKind,
@@ -214,6 +224,7 @@ public class Snapshot {
this.baseManifestList = baseManifestList;
this.deltaManifestList = deltaManifestList;
this.changelogManifestList = changelogManifestList;
+ this.indexManifest = indexManifest;
this.commitUser = commitUser;
this.commitIdentifier = commitIdentifier;
this.commitKind = commitKind;
@@ -257,6 +268,12 @@ public class Snapshot {
return changelogManifestList;
}
+ @JsonGetter(FIELD_INDEX_MANIFEST)
+ @Nullable
+ public String indexManifest() {
+ return indexManifest;
+ }
+
@JsonGetter(FIELD_COMMIT_USER)
public String commitUser() {
return commitUser;
@@ -397,6 +414,7 @@ public class Snapshot {
baseManifestList,
deltaManifestList,
changelogManifestList,
+ indexManifest,
commitUser,
commitIdentifier,
commitKind,
@@ -420,6 +438,7 @@ public class Snapshot {
&& Objects.equals(baseManifestList, that.baseManifestList)
&& Objects.equals(deltaManifestList, that.deltaManifestList)
&& Objects.equals(changelogManifestList,
that.changelogManifestList)
+ && Objects.equals(indexManifest, that.indexManifest)
&& Objects.equals(commitUser, that.commitUser)
&& commitIdentifier == that.commitIdentifier
&& commitKind == that.commitKind
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/HashIndexFile.java
b/paimon-core/src/main/java/org/apache/paimon/index/HashIndexFile.java
new file mode 100644
index 000000000..0c13b69ce
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/index/HashIndexFile.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.index;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.utils.IntIterator;
+import org.apache.paimon.utils.PathFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+
+import static org.apache.paimon.utils.IntFileUtils.readInts;
+import static org.apache.paimon.utils.IntFileUtils.writeInts;
+
+/** Hash index file contains ints. */
+public class HashIndexFile {
+
+ public static final String HASH_INDEX = "HASH";
+
+ private final FileIO fileIO;
+ private final PathFactory pathFactory;
+
+ public HashIndexFile(FileIO fileIO, PathFactory pathFactory) {
+ this.fileIO = fileIO;
+ this.pathFactory = pathFactory;
+ }
+
+ public long fileSize(String fileName) {
+ try {
+ return fileIO.getFileSize(pathFactory.toPath(fileName));
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ public IntIterator read(String fileName) throws IOException {
+ return readInts(fileIO, pathFactory.toPath(fileName));
+ }
+
+ public String write(IntIterator input) throws IOException {
+ Path path = pathFactory.newPath();
+ writeInts(fileIO, path, input);
+ return path.getName();
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
new file mode 100644
index 000000000..725b9772d
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.index;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.IndexManifestFile;
+import org.apache.paimon.utils.IntIterator;
+import org.apache.paimon.utils.SnapshotManager;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
+
+/** Handle index files. */
+public class IndexFileHandler {
+
+ private final SnapshotManager snapshotManager;
+ private final IndexManifestFile indexManifestFile;
+ private final HashIndexFile hashIndex;
+
+ public IndexFileHandler(
+ SnapshotManager snapshotManager,
+ IndexManifestFile indexManifestFile,
+ HashIndexFile hashIndex) {
+ this.snapshotManager = snapshotManager;
+ this.indexManifestFile = indexManifestFile;
+ this.hashIndex = hashIndex;
+ }
+
+ public Optional<IndexFileMeta> scan(
+ long snapshotId, String indexType, BinaryRow partition, int
bucket) {
+ List<IndexManifestEntry> entries = scan(snapshotId, indexType,
partition);
+ List<IndexManifestEntry> result = new ArrayList<>();
+ for (IndexManifestEntry file : entries) {
+ if (file.bucket() == bucket) {
+ result.add(file);
+ }
+ }
+ if (result.size() > 1) {
+ throw new IllegalArgumentException(
+ "Find multiple index files for one bucket: " + result);
+ }
+ return result.isEmpty() ? Optional.empty() :
Optional.of(result.get(0).indexFile());
+ }
+
+ public List<IndexManifestEntry> scan(String indexType, BinaryRow
partition) {
+ Long snapshot = snapshotManager.latestSnapshotId();
+ if (snapshot == null) {
+ return Collections.emptyList();
+ }
+
+ return scan(snapshot, indexType, partition);
+ }
+
+ public List<IndexManifestEntry> scan(long snapshotId, String indexType,
BinaryRow partition) {
+ Snapshot snapshot = snapshotManager.snapshot(snapshotId);
+ String indexManifest = snapshot.indexManifest();
+ if (indexManifest == null) {
+ throw new IllegalArgumentException("Index manifest is null in
snapshot: " + snapshot);
+ }
+
+ List<IndexManifestEntry> allFiles =
indexManifestFile.read(indexManifest);
+ List<IndexManifestEntry> result = new ArrayList<>();
+ for (IndexManifestEntry file : allFiles) {
+ if (file.indexFile().indexType().equals(indexType)
+ && file.partition().equals(partition)) {
+ result.add(file);
+ }
+ }
+
+ return result;
+ }
+
+ public List<Integer> readHashIndexList(IndexFileMeta file) {
+ return IntIterator.toIntList(readHashIndex(file));
+ }
+
+ public IntIterator readHashIndex(IndexFileMeta file) {
+ if (!file.indexType().equals(HASH_INDEX)) {
+ throw new IllegalArgumentException("Input file is not hash index:
" + file.indexType());
+ }
+
+ try {
+ return hashIndex.read(file.fileName());
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ public IndexFileMeta writeHashIndex(int[] ints) {
+ return writeHashIndex(ints.length, IntIterator.create(ints));
+ }
+
+ public IndexFileMeta writeHashIndex(int size, IntIterator iterator) {
+ String file;
+ try {
+ file = hashIndex.write(iterator);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ return new IndexFileMeta(HASH_INDEX, file, hashIndex.fileSize(file),
size);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java
new file mode 100644
index 000000000..6fb3e176b
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.index;
+
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.paimon.utils.SerializationUtils.newStringType;
+
+/** Metadata of index file. */
+public class IndexFileMeta {
+
+ private final String indexType;
+ private final String fileName;
+ private final long fileSize;
+ private final long rowCount;
+
+ public IndexFileMeta(String indexType, String fileName, long fileSize,
long rowCount) {
+ this.indexType = indexType;
+ this.fileName = fileName;
+ this.fileSize = fileSize;
+ this.rowCount = rowCount;
+ }
+
+ public String indexType() {
+ return indexType;
+ }
+
+ public String fileName() {
+ return fileName;
+ }
+
+ public long fileSize() {
+ return fileSize;
+ }
+
+ public long rowCount() {
+ return rowCount;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ IndexFileMeta that = (IndexFileMeta) o;
+ return Objects.equals(indexType, that.indexType)
+ && Objects.equals(fileName, that.fileName)
+ && fileSize == that.fileSize
+ && rowCount == that.rowCount;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(indexType, fileName, fileSize, rowCount);
+ }
+
+ @Override
+ public String toString() {
+ return "IndexManifestEntry{"
+ + "indexType="
+ + indexType
+ + ", fileName='"
+ + fileName
+ + '\''
+ + ", fileSize="
+ + fileSize
+ + ", rowCount="
+ + rowCount
+ + '}';
+ }
+
+ public static RowType schema() {
+ List<DataField> fields = new ArrayList<>();
+ fields.add(new DataField(0, "_INDEX_TYPE", newStringType(false)));
+ fields.add(new DataField(1, "_FILE_NAME", newStringType(false)));
+ fields.add(new DataField(2, "_FILE_SIZE", new BigIntType(false)));
+ fields.add(new DataField(3, "_ROW_COUNT", new BigIntType(false)));
+ return new RowType(fields);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
new file mode 100644
index 000000000..2813dd48b
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.index;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.utils.ObjectSerializer;
+import org.apache.paimon.utils.VersionedObjectSerializer;
+
+/** A {@link VersionedObjectSerializer} for {@link IndexFileMeta}. */
+public class IndexFileMetaSerializer extends ObjectSerializer<IndexFileMeta> {
+
+ public IndexFileMetaSerializer() {
+ super(IndexFileMeta.schema());
+ }
+
+ @Override
+ public InternalRow toRow(IndexFileMeta record) {
+ return GenericRow.of(
+ BinaryString.fromString(record.indexType()),
+ BinaryString.fromString(record.fileName()),
+ record.fileSize(),
+ record.rowCount());
+ }
+
+ @Override
+ public IndexFileMeta fromRow(InternalRow row) {
+ return new IndexFileMeta(
+ row.getString(0).toString(),
+ row.getString(1).toString(),
+ row.getLong(2),
+ row.getLong(3));
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/IndexIncrement.java
b/paimon-core/src/main/java/org/apache/paimon/io/IndexIncrement.java
new file mode 100644
index 000000000..a62d24109
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/io/IndexIncrement.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.index.IndexFileMeta;
+
+import java.util.List;
+import java.util.Objects;
+
+/** Incremental index files. */
+public class IndexIncrement {
+
+ private final List<IndexFileMeta> newIndexFiles;
+
+ public IndexIncrement(List<IndexFileMeta> newIndexFiles) {
+ this.newIndexFiles = newIndexFiles;
+ }
+
+ public List<IndexFileMeta> newIndexFiles() {
+ return newIndexFiles;
+ }
+
+ public boolean isEmpty() {
+ return newIndexFiles.isEmpty();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ IndexIncrement that = (IndexIncrement) o;
+ return Objects.equals(newIndexFiles, that.newIndexFiles);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(newIndexFiles);
+ }
+
+ @Override
+ public String toString() {
+ return "IndexIncrement{" + "newIndexFiles=" + newIndexFiles + '}';
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java
new file mode 100644
index 000000000..e2b136855
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.manifest;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.TinyIntType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.SerializationUtils.newBytesType;
+import static org.apache.paimon.utils.SerializationUtils.newStringType;
+
+/** Manifest entry for index file. */
+public class IndexManifestEntry {
+
+ private final FileKind kind;
+ private final BinaryRow partition;
+ private final int bucket;
+ private final IndexFileMeta indexFile;
+
+ public IndexManifestEntry(
+ FileKind kind, BinaryRow partition, int bucket, IndexFileMeta
indexFile) {
+ this.kind = kind;
+ this.partition = partition;
+ this.bucket = bucket;
+ this.indexFile = indexFile;
+ }
+
+ public IndexManifestEntry toDeleteEntry() {
+ checkArgument(kind == FileKind.ADD);
+ return new IndexManifestEntry(FileKind.DELETE, partition, bucket,
indexFile);
+ }
+
+ public FileKind kind() {
+ return kind;
+ }
+
+ public BinaryRow partition() {
+ return partition;
+ }
+
+ public int bucket() {
+ return bucket;
+ }
+
+ public IndexFileMeta indexFile() {
+ return indexFile;
+ }
+
+ public static RowType schema() {
+ List<DataField> fields = new ArrayList<>();
+ fields.add(new DataField(0, "_KIND", new TinyIntType(false)));
+ fields.add(new DataField(1, "_PARTITION", newBytesType(false)));
+ fields.add(new DataField(2, "_BUCKET", new IntType(false)));
+ fields.add(new DataField(3, "_INDEX_TYPE", newStringType(false)));
+ fields.add(new DataField(4, "_FILE_NAME", newStringType(false)));
+ fields.add(new DataField(5, "_FILE_SIZE", new BigIntType(false)));
+ fields.add(new DataField(6, "_ROW_COUNT", new BigIntType(false)));
+ return new RowType(fields);
+ }
+
+ public Identifier identifier() {
+ return new Identifier(partition, bucket, indexFile.indexType());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ IndexManifestEntry entry = (IndexManifestEntry) o;
+ return bucket == entry.bucket
+ && kind == entry.kind
+ && Objects.equals(partition, entry.partition)
+ && Objects.equals(indexFile, entry.indexFile);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(kind, partition, bucket, indexFile);
+ }
+
+ @Override
+ public String toString() {
+ return "IndexManifestEntry{"
+ + "kind="
+ + kind
+ + ", partition="
+ + partition
+ + ", bucket="
+ + bucket
+ + ", indexFile="
+ + indexFile
+ + '}';
+ }
+
+ /** The {@link Identifier} of a {@link IndexFileMeta}. */
+ public static class Identifier {
+
+ public final BinaryRow partition;
+ public final int bucket;
+ public final String indexType;
+
+ private Integer hash;
+
+ private Identifier(BinaryRow partition, int bucket, String indexType) {
+ this.partition = partition;
+ this.bucket = bucket;
+ this.indexType = indexType;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Identifier that = (Identifier) o;
+ return bucket == that.bucket
+ && Objects.equals(partition, that.partition)
+ && Objects.equals(indexType, that.indexType);
+ }
+
+ @Override
+ public int hashCode() {
+ if (hash == null) {
+ hash = Objects.hash(partition, bucket, indexType);
+ }
+ return hash;
+ }
+
+ @Override
+ public String toString() {
+ return "Identifier{"
+ + "partition="
+ + partition
+ + ", bucket="
+ + bucket
+ + ", indexType='"
+ + indexType
+ + '\''
+ + '}';
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java
new file mode 100644
index 000000000..40c4cb5a8
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.manifest;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.utils.VersionedObjectSerializer;
+
+import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
+import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
+
+/** A {@link VersionedObjectSerializer} for {@link IndexManifestEntry}. */
+public class IndexManifestEntrySerializer extends
VersionedObjectSerializer<IndexManifestEntry> {
+
+ public IndexManifestEntrySerializer() {
+ super(IndexManifestEntry.schema());
+ }
+
+ @Override
+ public int getVersion() {
+ return 1;
+ }
+
+ @Override
+ public InternalRow convertTo(IndexManifestEntry record) {
+ IndexFileMeta indexFile = record.indexFile();
+ return GenericRow.of(
+ record.kind().toByteValue(),
+ serializeBinaryRow(record.partition()),
+ record.bucket(),
+ BinaryString.fromString(indexFile.indexType()),
+ BinaryString.fromString(indexFile.fileName()),
+ indexFile.fileSize(),
+ indexFile.rowCount());
+ }
+
+ @Override
+ public IndexManifestEntry convertFrom(int version, InternalRow row) {
+ if (version != 1) {
+ throw new UnsupportedOperationException("Unsupported version: " +
version);
+ }
+
+ return new IndexManifestEntry(
+ FileKind.fromByteValue(row.getByte(0)),
+ deserializeBinaryRow(row.getBinary(1)),
+ row.getInt(2),
+ new IndexFileMeta(
+ row.getString(3).toString(),
+ row.getString(4).toString(),
+ row.getLong(5),
+ row.getLong(6)));
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java
new file mode 100644
index 000000000..8df6d67ec
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.manifest;
+
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.manifest.IndexManifestEntry.Identifier;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.ObjectsFile;
+import org.apache.paimon.utils.PathFactory;
+import org.apache.paimon.utils.VersionedObjectSerializer;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Index manifest file. */
+public class IndexManifestFile extends ObjectsFile<IndexManifestEntry> {
+
+ private IndexManifestFile(
+ FileIO fileIO,
+ FormatReaderFactory readerFactory,
+ FormatWriterFactory writerFactory,
+ PathFactory pathFactory) {
+ super(
+ fileIO,
+ new IndexManifestEntrySerializer(),
+ readerFactory,
+ writerFactory,
+ pathFactory,
+ null);
+ }
+
+ /** Merge new index files to index manifest. */
+ @Nullable
+ public String merge(
+ @Nullable String previousIndexManifest, List<IndexManifestEntry>
newIndexFiles) {
+ String indexManifest = previousIndexManifest;
+ if (newIndexFiles.size() > 0) {
+ Map<Identifier, IndexManifestEntry> indexEntries = new
LinkedHashMap<>();
+ List<IndexManifestEntry> entries =
+ indexManifest == null ? new ArrayList<>() :
read(indexManifest);
+ entries.addAll(newIndexFiles);
+ for (IndexManifestEntry file : entries) {
+ if (file.kind() == FileKind.ADD) {
+ indexEntries.put(file.identifier(), file);
+ if (file.indexFile().rowCount() == 0) {
+ indexEntries.remove(file.identifier());
+
fileIO.deleteQuietly(pathFactory.toPath(file.indexFile().fileName()));
+ }
+ } else {
+ indexEntries.remove(file.identifier());
+ }
+ }
+ indexManifest = writeWithoutRolling(indexEntries.values());
+ }
+
+ return indexManifest;
+ }
+
+ /** Creator of {@link IndexManifestFile}. */
+ public static class Factory {
+
+ private final FileIO fileIO;
+ private final FileFormat fileFormat;
+ private final FileStorePathFactory pathFactory;
+
+ public Factory(FileIO fileIO, FileFormat fileFormat,
FileStorePathFactory pathFactory) {
+ this.fileIO = fileIO;
+ this.fileFormat = fileFormat;
+ this.pathFactory = pathFactory;
+ }
+
+ public IndexManifestFile create() {
+ RowType schema =
VersionedObjectSerializer.versionType(IndexManifestEntry.schema());
+ return new IndexManifestFile(
+ fileIO,
+ fileFormat.createReaderFactory(schema),
+ fileFormat.createWriterFactory(schema),
+ pathFactory.indexManifestFileFactory());
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index c7628e677..c7e242e80 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -27,6 +27,8 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.IndexManifestFile;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
@@ -98,6 +100,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
private final SnapshotManager snapshotManager;
private final ManifestFile manifestFile;
private final ManifestList manifestList;
+ private final IndexManifestFile indexManifestFile;
private final FileStoreScan scan;
private final int numBucket;
private final MemorySize manifestTargetSize;
@@ -118,6 +121,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
SnapshotManager snapshotManager,
ManifestFile.Factory manifestFileFactory,
ManifestList.Factory manifestListFactory,
+ IndexManifestFile.Factory indexManifestFileFactory,
FileStoreScan scan,
int numBucket,
MemorySize manifestTargetSize,
@@ -134,6 +138,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
this.snapshotManager = snapshotManager;
this.manifestFile = manifestFileFactory.create();
this.manifestList = manifestListFactory.create();
+ this.indexManifestFile = indexManifestFileFactory.create();
this.scan = scan;
this.numBucket = numBucket;
this.manifestTargetSize = manifestTargetSize;
@@ -194,14 +199,19 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
List<ManifestEntry> appendChangelog = new ArrayList<>();
List<ManifestEntry> compactTableFiles = new ArrayList<>();
List<ManifestEntry> compactChangelog = new ArrayList<>();
+ List<IndexManifestEntry> appendIndexFiles = new ArrayList<>();
collectChanges(
committable.fileCommittables(),
appendTableFiles,
appendChangelog,
compactTableFiles,
- compactChangelog);
+ compactChangelog,
+ appendIndexFiles);
- if (!ignoreEmptyCommit || !appendTableFiles.isEmpty() ||
!appendChangelog.isEmpty()) {
+ if (!ignoreEmptyCommit
+ || !appendTableFiles.isEmpty()
+ || !appendChangelog.isEmpty()
+ || !appendIndexFiles.isEmpty()) {
// Optimization for common path.
// Step 1:
// Read manifest entries from changed partitions here and check
for conflicts.
@@ -222,6 +232,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
tryCommit(
appendTableFiles,
appendChangelog,
+ appendIndexFiles,
committable.identifier(),
committable.watermark(),
committable.logOffsets(),
@@ -246,6 +257,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
tryCommit(
compactTableFiles,
compactChangelog,
+ Collections.emptyList(),
committable.identifier(),
committable.watermark(),
committable.logOffsets(),
@@ -271,12 +283,14 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
List<ManifestEntry> appendChangelog = new ArrayList<>();
List<ManifestEntry> compactTableFiles = new ArrayList<>();
List<ManifestEntry> compactChangelog = new ArrayList<>();
+ List<IndexManifestEntry> appendIndexFiles = new ArrayList<>();
collectChanges(
committable.fileCommittables(),
appendTableFiles,
appendChangelog,
compactTableFiles,
- compactChangelog);
+ compactChangelog,
+ appendIndexFiles);
if (!appendChangelog.isEmpty() || !compactChangelog.isEmpty()) {
StringBuilder warnMessage =
@@ -337,6 +351,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
tryOverwrite(
partitionFilter,
appendTableFiles,
+ appendIndexFiles,
committable.identifier(),
committable.watermark(),
committable.logOffsets());
@@ -346,6 +361,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
tryCommit(
compactTableFiles,
Collections.emptyList(),
+ Collections.emptyList(),
committable.identifier(),
committable.watermark(),
committable.logOffsets(),
@@ -373,6 +389,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
tryOverwrite(
partitionFilter,
Collections.emptyList(),
+ Collections.emptyList(),
commitIdentifier,
null,
Collections.emptyMap());
@@ -406,7 +423,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
List<ManifestEntry> appendTableFiles,
List<ManifestEntry> appendChangelog,
List<ManifestEntry> compactTableFiles,
- List<ManifestEntry> compactChangelog) {
+ List<ManifestEntry> compactChangelog,
+ List<IndexManifestEntry> appendIndexFiles) {
for (CommitMessage message : commitMessages) {
CommitMessageImpl commitMessage = (CommitMessageImpl) message;
commitMessage
@@ -432,6 +450,17 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
.compactIncrement()
.changelogFiles()
.forEach(m -> compactChangelog.add(makeEntry(FileKind.ADD,
commitMessage, m)));
+ commitMessage
+ .indexIncrement()
+ .newIndexFiles()
+ .forEach(
+ f ->
+ appendIndexFiles.add(
+ new IndexManifestEntry(
+ FileKind.ADD,
+ commitMessage.partition(),
+ commitMessage.bucket(),
+ f)));
}
}
@@ -443,6 +472,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
private void tryCommit(
List<ManifestEntry> tableFiles,
List<ManifestEntry> changelogFiles,
+ List<IndexManifestEntry> indexFiles,
long identifier,
@Nullable Long watermark,
Map<Integer, Long> logOffsets,
@@ -453,6 +483,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
if (tryCommitOnce(
tableFiles,
changelogFiles,
+ indexFiles,
identifier,
watermark,
logOffsets,
@@ -467,6 +498,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
private void tryOverwrite(
Predicate partitionFilter,
List<ManifestEntry> changes,
+ List<IndexManifestEntry> indexFiles,
long identifier,
@Nullable Long watermark,
Map<Integer, Long> logOffsets) {
@@ -474,6 +506,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
Long latestSnapshotId = snapshotManager.latestSnapshotId();
List<ManifestEntry> changesWithOverwrite = new ArrayList<>();
+ List<IndexManifestEntry> indexChangesWithOverwrite = new
ArrayList<>();
if (latestSnapshotId != null) {
List<ManifestEntry> currentEntries =
scan.withSnapshot(latestSnapshotId)
@@ -489,12 +522,29 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
entry.totalBuckets(),
entry.file()));
}
+
+ // collect index files
+ Snapshot snapshot = snapshotManager.snapshot(latestSnapshotId);
+ if (snapshot.indexManifest() != null) {
+ RowDataToObjectArrayConverter converter =
+ new RowDataToObjectArrayConverter(partitionType);
+ List<IndexManifestEntry> entries =
+ indexManifestFile.read(snapshot.indexManifest());
+ for (IndexManifestEntry entry : entries) {
+ if (partitionFilter == null
+ ||
partitionFilter.test(converter.convert(entry.partition()))) {
+
indexChangesWithOverwrite.add(entry.toDeleteEntry());
+ }
+ }
+ }
}
changesWithOverwrite.addAll(changes);
+ indexChangesWithOverwrite.addAll(indexFiles);
if (tryCommitOnce(
changesWithOverwrite,
Collections.emptyList(),
+ indexChangesWithOverwrite,
identifier,
watermark,
logOffsets,
@@ -510,6 +560,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
public boolean tryCommitOnce(
List<ManifestEntry> tableFiles,
List<ManifestEntry> changelogFiles,
+ List<IndexManifestEntry> indexFiles,
long identifier,
@Nullable Long watermark,
Map<Integer, Long> logOffsets,
@@ -545,12 +596,14 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
String previousChangesListName = null;
String newChangesListName = null;
String changelogListName = null;
+ String newIndexManifest = null;
List<ManifestFileMeta> oldMetas = new ArrayList<>();
List<ManifestFileMeta> newMetas = new ArrayList<>();
List<ManifestFileMeta> changelogMetas = new ArrayList<>();
try {
long previousTotalRecordCount = 0L;
Long currentWatermark = watermark;
+ String previousIndexManifest = null;
if (latestSnapshot != null) {
previousTotalRecordCount =
latestSnapshot.totalRecordCount(scan);
List<ManifestFileMeta> previousManifests =
@@ -567,6 +620,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
? latestWatermark
: Math.max(currentWatermark,
latestWatermark);
}
+ previousIndexManifest = latestSnapshot.indexManifest();
}
// merge manifest files with changes
newMetas.addAll(
@@ -591,6 +645,12 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
changelogListName = manifestList.write(changelogMetas);
}
+ // write new index manifest
+ String indexManifest =
indexManifestFile.merge(previousIndexManifest, indexFiles);
+ if (!Objects.equals(indexManifest, previousIndexManifest)) {
+ newIndexManifest = indexManifest;
+ }
+
// prepare snapshot file
newSnapshot =
new Snapshot(
@@ -599,6 +659,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
previousChangesListName,
newChangesListName,
changelogListName,
+ indexManifest,
commitUser,
identifier,
commitKind,
@@ -614,6 +675,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
previousChangesListName,
newChangesListName,
changelogListName,
+ newIndexManifest,
oldMetas,
newMetas,
changelogMetas);
@@ -695,6 +757,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
previousChangesListName,
newChangesListName,
changelogListName,
+ newIndexManifest,
oldMetas,
newMetas,
changelogMetas);
@@ -813,6 +876,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
String previousChangesListName,
String newChangesListName,
String changelogListName,
+ String newIndexManifest,
List<ManifestFileMeta> oldMetas,
List<ManifestFileMeta> newMetas,
List<ManifestFileMeta> changelogMetas) {
@@ -826,6 +890,9 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
if (changelogListName != null) {
manifestList.delete(changelogListName);
}
+ if (newIndexManifest != null) {
+ indexManifestFile.delete(newIndexManifest);
+ }
// clean up newly merged manifest files
Set<ManifestFileMeta> oldMetaSet = new HashSet<>(oldMetas); // for
faster searching
for (ManifestFileMeta suspect : newMetas) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java
index ee45f121a..1730fd7e6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java
@@ -18,15 +18,18 @@
package org.apache.paimon.table.sink;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataInputViewStreamWrapper;
import org.apache.paimon.io.DataOutputViewStreamWrapper;
+import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.io.NewFilesIncrement;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
+import java.util.Collections;
import java.util.Objects;
import static org.apache.paimon.utils.SerializationUtils.deserializedBytes;
@@ -44,16 +47,33 @@ public class CommitMessageImpl implements CommitMessage {
private transient int bucket;
private transient NewFilesIncrement newFilesIncrement;
private transient CompactIncrement compactIncrement;
+ private transient IndexIncrement indexIncrement;
+ @VisibleForTesting
public CommitMessageImpl(
BinaryRow partition,
int bucket,
NewFilesIncrement newFilesIncrement,
CompactIncrement compactIncrement) {
+ this(
+ partition,
+ bucket,
+ newFilesIncrement,
+ compactIncrement,
+ new IndexIncrement(Collections.emptyList()));
+ }
+
+ public CommitMessageImpl(
+ BinaryRow partition,
+ int bucket,
+ NewFilesIncrement newFilesIncrement,
+ CompactIncrement compactIncrement,
+ IndexIncrement indexIncrement) {
this.partition = partition;
this.bucket = bucket;
this.newFilesIncrement = newFilesIncrement;
this.compactIncrement = compactIncrement;
+ this.indexIncrement = indexIncrement;
}
@Override
@@ -74,8 +94,14 @@ public class CommitMessageImpl implements CommitMessage {
return compactIncrement;
}
+ public IndexIncrement indexIncrement() {
+ return indexIncrement;
+ }
+
public boolean isEmpty() {
- return newFilesIncrement.isEmpty() && compactIncrement.isEmpty();
+ return newFilesIncrement.isEmpty()
+ && compactIncrement.isEmpty()
+ && indexIncrement.isEmpty();
}
private void writeObject(ObjectOutputStream out) throws IOException {
@@ -94,6 +120,7 @@ public class CommitMessageImpl implements CommitMessage {
this.bucket = message.bucket;
this.newFilesIncrement = message.newFilesIncrement;
this.compactIncrement = message.compactIncrement;
+ this.indexIncrement = message.indexIncrement;
}
@Override
@@ -109,12 +136,13 @@ public class CommitMessageImpl implements CommitMessage {
return bucket == that.bucket
&& Objects.equals(partition, that.partition)
&& Objects.equals(newFilesIncrement, that.newFilesIncrement)
- && Objects.equals(compactIncrement, that.compactIncrement);
+ && Objects.equals(compactIncrement, that.compactIncrement)
+ && Objects.equals(indexIncrement, that.indexIncrement);
}
@Override
public int hashCode() {
- return Objects.hash(partition, bucket, newFilesIncrement,
compactIncrement);
+ return Objects.hash(partition, bucket, newFilesIncrement,
compactIncrement, indexIncrement);
}
@Override
@@ -124,7 +152,8 @@ public class CommitMessageImpl implements CommitMessage {
+ "partition = %s, "
+ "bucket = %d, "
+ "newFilesIncrement = %s, "
- + "compactIncrement = %s}",
- partition, bucket, newFilesIncrement, compactIncrement);
+ + "compactIncrement = %s, "
+ + "indexIncrement = %s}",
+ partition, bucket, newFilesIncrement, compactIncrement,
indexIncrement);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
index 38f993a72..da70c766d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
@@ -19,12 +19,14 @@
package org.apache.paimon.table.sink;
import org.apache.paimon.data.serializer.VersionedSerializer;
+import org.apache.paimon.index.IndexFileMetaSerializer;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.io.DataInputDeserializer;
import org.apache.paimon.io.DataInputView;
import org.apache.paimon.io.DataOutputView;
import org.apache.paimon.io.DataOutputViewStreamWrapper;
+import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.io.NewFilesIncrement;
import java.io.ByteArrayOutputStream;
@@ -41,9 +43,11 @@ public class CommitMessageSerializer implements
VersionedSerializer<CommitMessag
private static final int CURRENT_VERSION = 2;
private final DataFileMetaSerializer dataFileSerializer;
+ private final IndexFileMetaSerializer indexEntrySerializer;
public CommitMessageSerializer() {
this.dataFileSerializer = new DataFileMetaSerializer();
+ this.indexEntrySerializer = new IndexFileMetaSerializer();
}
@Override
@@ -75,6 +79,7 @@ public class CommitMessageSerializer implements
VersionedSerializer<CommitMessag
dataFileSerializer.serializeList(message.compactIncrement().compactBefore(),
view);
dataFileSerializer.serializeList(message.compactIncrement().compactAfter(),
view);
dataFileSerializer.serializeList(message.compactIncrement().changelogFiles(),
view);
+
indexEntrySerializer.serializeList(message.indexIncrement().newIndexFiles(),
view);
}
@Override
@@ -116,6 +121,7 @@ public class CommitMessageSerializer implements
VersionedSerializer<CommitMessag
new CompactIncrement(
dataFileSerializer.deserializeList(view),
dataFileSerializer.deserializeList(view),
- dataFileSerializer.deserializeList(view)));
+ dataFileSerializer.deserializeList(view)),
+ new
IndexIncrement(indexEntrySerializer.deserializeList(view)));
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
index 304fbdb82..17d167d72 100644
---
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
@@ -54,6 +54,8 @@ public class FileStorePathFactory {
private final AtomicInteger manifestFileCount;
private final AtomicInteger manifestListCount;
+ private final AtomicInteger indexManifestCount;
+ private final AtomicInteger indexFileCount;
public FileStorePathFactory(Path root) {
this(
@@ -74,6 +76,8 @@ public class FileStorePathFactory {
this.manifestFileCount = new AtomicInteger(0);
this.manifestListCount = new AtomicInteger(0);
+ this.indexManifestCount = new AtomicInteger(0);
+ this.indexFileCount = new AtomicInteger(0);
}
public Path root() {
@@ -169,4 +173,38 @@ public class FileStorePathFactory {
}
};
}
+
+ public PathFactory indexManifestFileFactory() {
+ return new PathFactory() {
+ @Override
+ public Path newPath() {
+ return new Path(
+ root
+ + "/manifest/index-manifest-"
+ + uuid
+ + "-"
+ + indexManifestCount.getAndIncrement());
+ }
+
+ @Override
+ public Path toPath(String fileName) {
+ return new Path(root + "/manifest/" + fileName);
+ }
+ };
+ }
+
+ public PathFactory indexFileFactory() {
+ return new PathFactory() {
+ @Override
+ public Path newPath() {
+ return new Path(
+ root + "/index/index-" + uuid + "-" +
indexFileCount.getAndIncrement());
+ }
+
+ @Override
+ public Path toPath(String fileName) {
+ return new Path(root + "/index/" + fileName);
+ }
+ };
+ }
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 4552008e1..68ce6e190 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -23,7 +23,9 @@ import
org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
@@ -58,6 +60,7 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -156,6 +159,7 @@ public class TestFileStore extends KeyValueFileStore {
false,
null,
watermark,
+ Collections.emptyList(),
(commit, committable) -> commit.commit(committable,
Collections.emptyMap()));
}
@@ -172,6 +176,7 @@ public class TestFileStore extends KeyValueFileStore {
false,
null,
null,
+ Collections.emptyList(),
(commit, committable) -> {
logOffsets.forEach(committable::addLogOffset);
commit.commit(committable, Collections.emptyMap());
@@ -191,6 +196,7 @@ public class TestFileStore extends KeyValueFileStore {
true,
null,
null,
+ Collections.emptyList(),
(commit, committable) ->
commit.overwrite(partition, committable,
Collections.emptyMap()));
}
@@ -212,6 +218,23 @@ public class TestFileStore extends KeyValueFileStore {
return snapshotManager.snapshot(snapshotIdAfterCommit);
}
+ public List<Snapshot> commitDataIndex(
+ KeyValue kv,
+ Function<KeyValue, BinaryRow> partitionCalculator,
+ int bucket,
+ IndexFileMeta... indexFiles)
+ throws Exception {
+ return commitDataImpl(
+ Collections.singletonList(kv),
+ partitionCalculator,
+ ignore -> bucket,
+ false,
+ null,
+ null,
+ Arrays.asList(indexFiles),
+ (commit, committable) -> commit.commit(committable,
Collections.emptyMap()));
+ }
+
public List<Snapshot> commitDataImpl(
List<KeyValue> kvs,
Function<KeyValue, BinaryRow> partitionCalculator,
@@ -219,6 +242,7 @@ public class TestFileStore extends KeyValueFileStore {
boolean emptyWriter,
Long identifier,
Long watermark,
+ List<IndexFileMeta> indexFiles,
BiConsumer<FileStoreCommit, ManifestCommittable> commitFunction)
throws Exception {
AbstractFileStoreWrite<KeyValue> write = newWrite();
@@ -262,7 +286,8 @@ public class TestFileStore extends KeyValueFileStore {
entryWithPartition.getKey(),
entryWithBucket.getKey(),
increment.newFilesIncrement(),
- increment.compactIncrement()));
+ increment.compactIncrement(),
+ new IndexIncrement(indexFiles)));
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/index/HashIndexFileTest.java
b/paimon-core/src/test/java/org/apache/paimon/index/HashIndexFileTest.java
new file mode 100644
index 000000000..9fcc459c3
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/index/HashIndexFileTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.index;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.utils.IntIterator;
+import org.apache.paimon.utils.PathFactory;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link HashIndexFile}. */
+public class HashIndexFileTest {
+
+ @TempDir java.nio.file.Path tempPath;
+
+ @Test
+ public void test() throws IOException {
+ Path dir = new Path(tempPath.toUri());
+ PathFactory pathFactory =
+ new PathFactory() {
+ @Override
+ public Path newPath() {
+ return new Path(dir, UUID.randomUUID().toString());
+ }
+
+ @Override
+ public Path toPath(String fileName) {
+ return new Path(dir, fileName);
+ }
+ };
+
+ HashIndexFile file = new HashIndexFile(LocalFileIO.create(),
pathFactory);
+
+ Random rnd = new Random();
+ List<Integer> random = new ArrayList<>();
+ for (int i = 0; i < rnd.nextInt(100_000); i++) {
+ random.add(rnd.nextInt());
+ }
+
+ String name =
+ file.write(
+
IntIterator.create(random.stream().mapToInt(Integer::intValue).toArray()));
+
+ List<Integer> result = IntIterator.toIntList(file.read(name));
+ assertThat(result).containsExactlyInAnyOrderElementsOf(random);
+
+ assertThat(file.fileSize(name)).isEqualTo(random.size() * 4L);
+ }
+}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/utils/IntHashSetTest.java
b/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
similarity index 52%
copy from
paimon-common/src/test/java/org/apache/paimon/utils/IntHashSetTest.java
copy to
paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
index c476ff563..8f1ef392a 100644
--- a/paimon-common/src/test/java/org/apache/paimon/utils/IntHashSetTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
@@ -16,36 +16,32 @@
* limitations under the License.
*/
-package org.apache.paimon.utils;
+package org.apache.paimon.index;
-import org.junit.jupiter.api.Test;
+import org.apache.paimon.utils.ObjectSerializer;
+import org.apache.paimon.utils.ObjectSerializerTestBase;
-import java.util.HashSet;
import java.util.Random;
-import java.util.Set;
-import static org.assertj.core.api.Assertions.assertThat;
+/** Test for {@link org.apache.paimon.index.IndexFileMetaSerializer}. */
+public class IndexFileMetaSerializerTest extends
ObjectSerializerTestBase<IndexFileMeta> {
-/** Test for {@link IntHashSet}. */
-public class IntHashSetTest {
-
- @Test
- public void testRandom() {
- Set<Integer> values = new HashSet<>();
- Random rnd = new Random();
- for (int i = 0; i < rnd.nextInt(100); i++) {
- values.add(rnd.nextInt());
- }
- if (rnd.nextBoolean()) {
- values.add(0);
- values.add(-1);
- values.add(1);
- }
+ @Override
+ protected ObjectSerializer<IndexFileMeta> serializer() {
+ return new IndexFileMetaSerializer();
+ }
- IntHashSet set = new IntHashSet();
- values.forEach(set::add);
+ @Override
+ protected IndexFileMeta object() {
+ return randomIndexFile();
+ }
- int[] expected =
values.stream().mapToInt(Integer::intValue).sorted().toArray();
- assertThat(set.toSortedInts()).containsExactly(expected);
+ public static IndexFileMeta randomIndexFile() {
+ Random rnd = new Random();
+ return new IndexFileMeta(
+ HashIndexFile.HASH_INDEX,
+ "my_file_name" + rnd.nextLong(),
+ rnd.nextInt(),
+ rnd.nextInt());
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestEntrySerializerTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestEntrySerializerTest.java
new file mode 100644
index 000000000..1ee3d6b5d
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestEntrySerializerTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.manifest;
+
+import org.apache.paimon.index.HashIndexFile;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.utils.ObjectSerializer;
+import org.apache.paimon.utils.ObjectSerializerTestBase;
+
+import java.util.Random;
+
+import static org.apache.paimon.io.DataFileTestUtils.row;
+
+/** Test for {@link IndexManifestEntrySerializer}. */
+public class IndexManifestEntrySerializerTest extends
ObjectSerializerTestBase<IndexManifestEntry> {
+
+ @Override
+ protected ObjectSerializer<IndexManifestEntry> serializer() {
+ return new IndexManifestEntrySerializer();
+ }
+
+ @Override
+ protected IndexManifestEntry object() {
+ return randomIndexEntry();
+ }
+
+ public static IndexManifestEntry randomIndexEntry() {
+ Random rnd = new Random();
+ return new IndexManifestEntry(
+ rnd.nextBoolean() ? FileKind.ADD : FileKind.DELETE,
+ row(rnd.nextInt()),
+ rnd.nextInt(),
+ new IndexFileMeta(
+ HashIndexFile.HASH_INDEX,
+ "my_file_name" + rnd.nextLong(),
+ rnd.nextInt(),
+ rnd.nextInt()));
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index c7469c735..d31332be6 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -434,6 +434,7 @@ public class FileDeletionTest {
.tryCommitOnce(
delete,
Collections.emptyList(),
+ Collections.emptyList(),
commitIdentifier++,
null,
Collections.emptyMap(),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index fbcb4c054..e42dce33a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -26,6 +26,9 @@ import org.apache.paimon.TestKeyValueGenerator;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
import org.apache.paimon.predicate.PredicateBuilder;
@@ -56,12 +59,14 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -501,6 +506,7 @@ public class FileStoreCommitTest {
false,
null,
null,
+ Collections.emptyList(),
(commit, committable) -> commit.commit(committable,
Collections.emptyMap()));
assertThat(store.snapshotManager().latestSnapshotId()).isEqualTo(snapshot.id());
@@ -512,6 +518,7 @@ public class FileStoreCommitTest {
false,
null,
null,
+ Collections.emptyList(),
(commit, committable) -> {
commit.ignoreEmptyCommit(false);
commit.commit(committable, Collections.emptyMap());
@@ -533,6 +540,7 @@ public class FileStoreCommitTest {
false,
(long) i,
null,
+ Collections.emptyList(),
(commit, committable) -> {
commit.commit(committable, Collections.emptyMap());
committables.add(committable);
@@ -679,6 +687,95 @@ public class FileStoreCommitTest {
"Partitions list cannot be empty."));
}
+ @Test
+ public void testIndexFiles() throws Exception {
+ TestFileStore store = createStore(false, 2);
+ IndexFileHandler indexFileHandler = store.newIndexFileHandler();
+
+ KeyValue record1 = gen.next();
+ BinaryRow part1 = gen.getPartition(record1);
+ KeyValue record2 = record1;
+ BinaryRow part2 = part1;
+ while (part1.equals(part2)) {
+ record2 = gen.next();
+ part2 = gen.getPartition(record2);
+ }
+
+ // init write
+ store.commitDataIndex(
+ record1,
+ gen::getPartition,
+ 0,
+ indexFileHandler.writeHashIndex(new int[] {1, 2, 5}));
+ store.commitDataIndex(
+ record1, gen::getPartition, 1,
indexFileHandler.writeHashIndex(new int[] {6, 8}));
+ store.commitDataIndex(
+ record2, gen::getPartition, 2,
indexFileHandler.writeHashIndex(new int[] {3, 5}));
+
+ Snapshot snapshot = store.snapshotManager().latestSnapshot();
+
+ // assert part1
+ List<IndexManifestEntry> part1Index =
+ indexFileHandler.scan(snapshot.id(), HASH_INDEX, part1);
+ assertThat(part1Index.size()).isEqualTo(2);
+
+ assertThat(part1Index.get(0).bucket()).isEqualTo(0);
+
assertThat(indexFileHandler.readHashIndexList(part1Index.get(0).indexFile()))
+ .containsExactlyInAnyOrder(1, 2, 5);
+
+ assertThat(part1Index.get(1).bucket()).isEqualTo(1);
+
assertThat(indexFileHandler.readHashIndexList(part1Index.get(1).indexFile()))
+ .containsExactlyInAnyOrder(6, 8);
+
+ // assert part2
+ List<IndexManifestEntry> part2Index =
+ indexFileHandler.scan(snapshot.id(), HASH_INDEX, part2);
+ assertThat(part2Index.size()).isEqualTo(1);
+ assertThat(part2Index.get(0).bucket()).isEqualTo(2);
+
assertThat(indexFileHandler.readHashIndexList(part2Index.get(0).indexFile()))
+ .containsExactlyInAnyOrder(3, 5);
+
+ // update part1
+ store.commitDataIndex(
+ record1, gen::getPartition, 0,
indexFileHandler.writeHashIndex(new int[] {1, 4}));
+ snapshot = store.snapshotManager().latestSnapshot();
+
+ // assert update part1
+ part1Index = indexFileHandler.scan(snapshot.id(), HASH_INDEX, part1);
+ assertThat(part1Index.size()).isEqualTo(2);
+
+ assertThat(part1Index.get(0).bucket()).isEqualTo(0);
+
assertThat(indexFileHandler.readHashIndexList(part1Index.get(0).indexFile()))
+ .containsExactlyInAnyOrder(1, 4);
+
+ assertThat(part1Index.get(1).bucket()).isEqualTo(1);
+
assertThat(indexFileHandler.readHashIndexList(part1Index.get(1).indexFile()))
+ .containsExactlyInAnyOrder(6, 8);
+
+ // assert scan one bucket
+ Optional<IndexFileMeta> file = indexFileHandler.scan(snapshot.id(),
HASH_INDEX, part1, 0);
+ assertThat(file).isPresent();
+
assertThat(indexFileHandler.readHashIndexList(file.get())).containsExactlyInAnyOrder(1,
4);
+
+ // overwrite one partition
+
store.options().toConfiguration().set(CoreOptions.DYNAMIC_PARTITION_OVERWRITE,
true);
+ store.overwriteData(
+ Collections.singletonList(record1), gen::getPartition, kv ->
0, new HashMap<>());
+ snapshot = store.snapshotManager().latestSnapshot();
+ file = indexFileHandler.scan(snapshot.id(), HASH_INDEX, part1, 0);
+ assertThat(file).isEmpty();
+ file = indexFileHandler.scan(snapshot.id(), HASH_INDEX, part2, 2);
+ assertThat(file).isPresent();
+
+ // overwrite all partitions
+
store.options().toConfiguration().set(CoreOptions.DYNAMIC_PARTITION_OVERWRITE,
false);
+ store.overwriteData(
+ Collections.singletonList(record1), gen::getPartition, kv ->
0, new HashMap<>());
+ snapshot = store.snapshotManager().latestSnapshot();
+ file = indexFileHandler.scan(snapshot.id(), HASH_INDEX, part2, 2);
+ assertThat(file).isEmpty();
+ }
+
private TestFileStore createStore(boolean failing) throws Exception {
return createStore(failing, 1);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
index d789d4176..a6cf04b0e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
@@ -19,12 +19,15 @@
package org.apache.paimon.table.sink;
import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.io.NewFilesIncrement;
import org.junit.jupiter.api.Test;
import java.io.IOException;
+import java.util.Arrays;
+import static
org.apache.paimon.index.IndexFileMetaSerializerTest.randomIndexFile;
import static
org.apache.paimon.manifest.ManifestCommittableSerializerTest.randomCompactIncrement;
import static
org.apache.paimon.manifest.ManifestCommittableSerializerTest.randomNewFilesIncrement;
import static
org.apache.paimon.mergetree.compact.MergeTreeCompactManagerTest.row;
@@ -38,9 +41,15 @@ public class CommitMessageSerializerTest {
CommitMessageSerializer serializer = new CommitMessageSerializer();
NewFilesIncrement newFilesIncrement = randomNewFilesIncrement();
CompactIncrement compactIncrement = randomCompactIncrement();
- CommitMessage committable =
- new CommitMessageImpl(row(0), 1, newFilesIncrement,
compactIncrement);
- CommitMessage newCommittable = serializer.deserialize(2,
serializer.serialize(committable));
- assertThat(newCommittable).isEqualTo(committable);
+ IndexIncrement indexIncrement =
+ new IndexIncrement(Arrays.asList(randomIndexFile(),
randomIndexFile()));
+ CommitMessageImpl committable =
+ new CommitMessageImpl(
+ row(0), 1, newFilesIncrement, compactIncrement,
indexIncrement);
+ CommitMessageImpl newCommittable =
+ (CommitMessageImpl) serializer.deserialize(2,
serializer.serialize(committable));
+
assertThat(newCommittable.compactIncrement()).isEqualTo(committable.compactIncrement());
+
assertThat(newCommittable.newFilesIncrement()).isEqualTo(committable.newFilesIncrement());
+
assertThat(newCommittable.indexIncrement()).isEqualTo(committable.indexIncrement());
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
index a5735cc51..3052b093b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
@@ -66,6 +66,7 @@ public class SnapshotManagerTest {
null,
null,
null,
+ null,
0L,
Snapshot.CommitKind.APPEND,
millis + i * 1000,
@@ -102,6 +103,7 @@ public class SnapshotManagerTest {
null,
null,
null,
+ null,
0L,
Snapshot.CommitKind.APPEND,
i * 1000,