This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 153a219b20 [core] Introduce basic structure for global index write
(#6628)
153a219b20 is described below
commit 153a219b20eaed5d8474a8e3e3901603066559e2
Author: YeJunHao <[email protected]>
AuthorDate: Wed Nov 19 16:03:56 2025 +0800
[core] Introduce basic structure for global index write (#6628)
---
.../paimon/globalindex/GlobalIndexResult.java | 26 ++
.../paimon/globalindex/GlobalIndexWriter.java | 62 ++++
.../{GlobalFileReader.java => GlobalIndexer.java} | 20 +-
...alFileReader.java => GlobalIndexerFactory.java} | 12 +-
.../globalindex/GlobalIndexerFactoryUtils.java | 56 ++++
.../globalindex/bitmap/BitmapGlobalIndex.java | 22 +-
.../BitmapGlobalIndexerFactory.java} | 39 ++-
.../bitmap/BitmapIndexResultWrapper.java | 4 +
.../GlobalIndexFileReader.java} | 6 +-
.../GlobalIndexFileWriter.java} | 13 +-
.../globalindex/wrap/FileIndexWriterWrapper.java | 67 +++++
.../main/java/org/apache/paimon/utils/Range.java | 4 +
....apache.paimon.globalindex.GlobalIndexerFactory | 16 ++
.../bitmapindex/BitmapGlobalIndexTest.java | 314 +++++++++++++++++++++
14 files changed, 617 insertions(+), 44 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexResult.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexResult.java
index f1ac2213cb..3b00f8029c 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexResult.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexResult.java
@@ -18,8 +18,10 @@
package org.apache.paimon.globalindex;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.NoSuchElementException;
+import java.util.Set;
/**
* Global index result represents row ids.
@@ -42,4 +44,28 @@ public interface GlobalIndexResult extends Iterable<Long> {
}
};
}
+
+ default GlobalIndexResult and(GlobalIndexResult other) {
+ Set<Long> set = new HashSet<>();
+ this.forEach(set::add);
+
+ Set<Long> result = new HashSet<>();
+ for (Long l : other) {
+ if (set.contains(l)) {
+ result.add(l);
+ }
+ }
+ return wrap(result);
+ }
+
+ default GlobalIndexResult or(GlobalIndexResult other) {
+ Set<Long> result = new HashSet<>();
+ this.forEach(result::add);
+ other.forEach(result::add);
+ return wrap(result);
+ }
+
+ static GlobalIndexResult wrap(Set<Long> longs) {
+ return longs::iterator;
+ }
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexWriter.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexWriter.java
new file mode 100644
index 0000000000..d04d71aa5e
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexWriter.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex;
+
+import org.apache.paimon.utils.Range;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/** Index writer for global index. */
+public interface GlobalIndexWriter {
+
+ void write(Object key);
+
+ List<ResultEntry> finish();
+
+ /** Write result meta. */
+ class ResultEntry {
+ private final String fileName;
+ private final Range rowRange;
+ @Nullable private final byte[] meta;
+
+ public ResultEntry(String fileName, @Nullable byte[] meta, Range
rowRange) {
+ this.fileName = fileName;
+ this.meta = meta;
+ this.rowRange = rowRange;
+ }
+
+ public String fileName() {
+ return fileName;
+ }
+
+ public Range rowRange() {
+ return rowRange;
+ }
+
+ public byte[] meta() {
+ return meta;
+ }
+
+ public static ResultEntry of(String fileName, byte[] meta, Range
rowRange) {
+ return new ResultEntry(fileName, meta, rowRange);
+ }
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalFileReader.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexer.java
similarity index 53%
copy from
paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalFileReader.java
copy to
paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexer.java
index 7524d1e1f9..6badadc878 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalFileReader.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexer.java
@@ -18,12 +18,24 @@
package org.apache.paimon.globalindex;
-import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataType;
import java.io.IOException;
+import java.util.List;
-/** File reader for global index. */
-public interface GlobalFileReader {
+/** Abstract base class for global indexers. */
+public interface GlobalIndexer {
- SeekableInputStream create(String fileName) throws IOException;
+ GlobalIndexWriter createWriter(GlobalIndexFileWriter fileWriter) throws
IOException;
+
+ GlobalIndexReader createReader(GlobalIndexFileReader fileReader,
List<GlobalIndexMeta> files)
+ throws IOException;
+
+ static GlobalIndexer create(String type, DataType dataType, Options
options) {
+ GlobalIndexerFactory globalIndexerFactory =
GlobalIndexerFactoryUtils.load(type);
+ return globalIndexerFactory.create(dataType, options);
+ }
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalFileReader.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexerFactory.java
similarity index 73%
copy from
paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalFileReader.java
copy to
paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexerFactory.java
index 7524d1e1f9..53154ef50b 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalFileReader.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexerFactory.java
@@ -18,12 +18,14 @@
package org.apache.paimon.globalindex;
-import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fileindex.FileIndexer;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataType;
-import java.io.IOException;
+/** File index factory to construct {@link FileIndexer}. */
+public interface GlobalIndexerFactory {
-/** File reader for global index. */
-public interface GlobalFileReader {
+ String identifier();
- SeekableInputStream create(String fileName) throws IOException;
+ GlobalIndexer create(DataType type, Options options);
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexerFactoryUtils.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexerFactoryUtils.java
new file mode 100644
index 0000000000..43a8768f76
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexerFactoryUtils.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+
+/** Load utils to load GlobalIndexerFactory. */
+public class GlobalIndexerFactoryUtils {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(GlobalIndexerFactoryUtils.class);
+
+ private static final Map<String, GlobalIndexerFactory> factories = new
HashMap<>();
+
+ static {
+ ServiceLoader<GlobalIndexerFactory> serviceLoader =
+ ServiceLoader.load(GlobalIndexerFactory.class);
+
+ for (GlobalIndexerFactory indexerFactory : serviceLoader) {
+ if (factories.put(indexerFactory.identifier(), indexerFactory) !=
null) {
+ LOG.warn(
+ "Found multiple GlobalIndexer for type: "
+ + indexerFactory.identifier()
+ + ", choose one of them");
+ }
+ }
+ }
+
+ public static GlobalIndexerFactory load(String type) {
+ GlobalIndexerFactory globalIndexerFactory = factories.get(type);
+ if (globalIndexerFactory == null) {
+ throw new RuntimeException("Can't find global index for type: " +
type);
+ }
+ return globalIndexerFactory;
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/bitmap/BitmapGlobalIndex.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/bitmap/BitmapGlobalIndex.java
index 6798dfe5d6..19bbcccb71 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/bitmap/BitmapGlobalIndex.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/bitmap/BitmapGlobalIndex.java
@@ -20,14 +20,19 @@ package org.apache.paimon.globalindex.bitmap;
import org.apache.paimon.fileindex.FileIndexReader;
import org.apache.paimon.fileindex.FileIndexResult;
+import org.apache.paimon.fileindex.FileIndexWriter;
import org.apache.paimon.fileindex.bitmap.BitmapFileIndex;
import org.apache.paimon.fileindex.bitmap.BitmapIndexResult;
import org.apache.paimon.fs.SeekableInputStream;
-import org.apache.paimon.globalindex.GlobalFileReader;
import org.apache.paimon.globalindex.GlobalIndexMeta;
import org.apache.paimon.globalindex.GlobalIndexReader;
import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.GlobalIndexer;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
import org.apache.paimon.globalindex.wrap.FileIndexReaderWrapper;
+import org.apache.paimon.globalindex.wrap.FileIndexWriterWrapper;
import org.apache.paimon.utils.Range;
import java.io.IOException;
@@ -36,7 +41,7 @@ import java.util.List;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Bitmap global index. */
-public class BitmapGlobalIndex {
+public class BitmapGlobalIndex implements GlobalIndexer {
private final BitmapFileIndex index;
@@ -44,11 +49,18 @@ public class BitmapGlobalIndex {
this.index = index;
}
- public GlobalIndexReader createReader(GlobalFileReader fileReader,
List<GlobalIndexMeta> files)
- throws IOException {
+ @Override
+ public GlobalIndexWriter createWriter(GlobalIndexFileWriter fileWriter)
throws IOException {
+ FileIndexWriter writer = index.createWriter();
+ return new FileIndexWriterWrapper(
+ fileWriter, writer, BitmapGlobalIndexerFactory.IDENTIFIER);
+ }
+
+ public GlobalIndexReader createReader(
+ GlobalIndexFileReader fileReader, List<GlobalIndexMeta> files)
throws IOException {
checkArgument(files.size() == 1);
GlobalIndexMeta indexMeta = files.get(0);
- SeekableInputStream input = fileReader.create(indexMeta.fileName());
+ SeekableInputStream input =
fileReader.getInputStream(indexMeta.fileName());
FileIndexReader reader = index.createReader(input, 0, (int)
indexMeta.fileSize());
return new FileIndexReaderWrapper(
reader, r -> toGlobalResult(indexMeta.rowIdRange(), r), input);
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexResult.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/bitmap/BitmapGlobalIndexerFactory.java
similarity index 51%
copy from
paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexResult.java
copy to
paimon-common/src/main/java/org/apache/paimon/globalindex/bitmap/BitmapGlobalIndexerFactory.java
index f1ac2213cb..efbe0bbd2f 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexResult.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/bitmap/BitmapGlobalIndexerFactory.java
@@ -16,30 +16,27 @@
* limitations under the License.
*/
-package org.apache.paimon.globalindex;
+package org.apache.paimon.globalindex.bitmap;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
+import org.apache.paimon.fileindex.bitmap.BitmapFileIndex;
+import org.apache.paimon.globalindex.GlobalIndexer;
+import org.apache.paimon.globalindex.GlobalIndexerFactory;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataType;
-/**
- * Global index result represents row ids.
- *
- * <p>TODO introduce ranges interface
- */
-public interface GlobalIndexResult extends Iterable<Long> {
+/** Factory for creating bitmap global indexers. */
+public class BitmapGlobalIndexerFactory implements GlobalIndexerFactory {
- static GlobalIndexResult createEmpty() {
- return () ->
- new Iterator<Long>() {
- @Override
- public boolean hasNext() {
- return false;
- }
+ public static final String IDENTIFIER = "bitmap";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
- @Override
- public Long next() {
- throw new NoSuchElementException();
- }
- };
+ @Override
+ public GlobalIndexer create(DataType type, Options options) {
+ BitmapFileIndex bitmapFileIndex = new BitmapFileIndex(type, options);
+ return new BitmapGlobalIndex(bitmapFileIndex);
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/bitmap/BitmapIndexResultWrapper.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/bitmap/BitmapIndexResultWrapper.java
index 6b10f010d3..badbcfa219 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/bitmap/BitmapIndexResultWrapper.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/bitmap/BitmapIndexResultWrapper.java
@@ -57,4 +57,8 @@ public class BitmapIndexResultWrapper implements
GlobalIndexResult {
return new BitmapIndexResultWrapper(
new BitmapIndexResult(() -> bitmapOfRange(0, range.to -
range.from)), range.from);
}
+
+ public BitmapIndexResult getBitmapIndexResult() {
+ return result;
+ }
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalFileReader.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/io/GlobalIndexFileReader.java
similarity index 85%
copy from
paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalFileReader.java
copy to
paimon-common/src/main/java/org/apache/paimon/globalindex/io/GlobalIndexFileReader.java
index 7524d1e1f9..7c88057fb4 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalFileReader.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/io/GlobalIndexFileReader.java
@@ -16,14 +16,14 @@
* limitations under the License.
*/
-package org.apache.paimon.globalindex;
+package org.apache.paimon.globalindex.io;
import org.apache.paimon.fs.SeekableInputStream;
import java.io.IOException;
/** File reader for global index. */
-public interface GlobalFileReader {
+public interface GlobalIndexFileReader {
- SeekableInputStream create(String fileName) throws IOException;
+ SeekableInputStream getInputStream(String fileName) throws IOException;
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalFileReader.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/io/GlobalIndexFileWriter.java
similarity index 76%
rename from
paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalFileReader.java
rename to
paimon-common/src/main/java/org/apache/paimon/globalindex/io/GlobalIndexFileWriter.java
index 7524d1e1f9..850cbb9451 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalFileReader.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/io/GlobalIndexFileWriter.java
@@ -16,14 +16,15 @@
* limitations under the License.
*/
-package org.apache.paimon.globalindex;
-
-import org.apache.paimon.fs.SeekableInputStream;
+package org.apache.paimon.globalindex.io;
import java.io.IOException;
+import java.io.OutputStream;
+
+/** Global index writer io operators. */
+public interface GlobalIndexFileWriter {
-/** File reader for global index. */
-public interface GlobalFileReader {
+ String newFileName(String prefix);
- SeekableInputStream create(String fileName) throws IOException;
+ OutputStream newOutputStream(String fileName) throws IOException;
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/wrap/FileIndexWriterWrapper.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/wrap/FileIndexWriterWrapper.java
new file mode 100644
index 0000000000..b7bbb5b2a7
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/wrap/FileIndexWriterWrapper.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.wrap;
+
+import org.apache.paimon.fileindex.FileIndexWriter;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
+import org.apache.paimon.utils.Range;
+
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+
+/** A {@link GlobalIndexReader} wrapper for {@link FileIndexWriter}. */
+public class FileIndexWriterWrapper implements GlobalIndexWriter {
+
+ private final GlobalIndexFileWriter fileWriter;
+ private final FileIndexWriter writer;
+ private final String indexType;
+ private long count = 0;
+
+ public FileIndexWriterWrapper(
+ GlobalIndexFileWriter fileWriter, FileIndexWriter writer, String
indexType) {
+ this.fileWriter = fileWriter;
+ this.writer = writer;
+ this.indexType = indexType;
+ }
+
+ @Override
+ public void write(Object key) {
+ count++;
+ writer.write(key);
+ }
+
+ @Override
+ public List<ResultEntry> finish() {
+ if (count > 0) {
+ String fileName = fileWriter.newFileName(indexType);
+ try (OutputStream outputStream =
fileWriter.newOutputStream(fileName)) {
+ outputStream.write(writer.serializedBytes());
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to write global index file:
" + fileName, e);
+ }
+ return Collections.singletonList(
+ ResultEntry.of(fileName, null, new Range(0, count - 1)));
+ } else {
+ return Collections.emptyList();
+ }
+ }
+}
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
b/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
index 57a35e59d6..3a77b309f2 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
@@ -37,6 +37,10 @@ public class Range {
return to - from + 1;
}
+ public Range addOffset(long offset) {
+ return new Range(from + offset, to + offset);
+ }
+
public boolean isBefore(Range other) {
return to < other.from;
}
diff --git
a/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory
b/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory
new file mode 100644
index 0000000000..e1e63b1057
--- /dev/null
+++
b/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.paimon.globalindex.bitmap.BitmapGlobalIndexerFactory
diff --git
a/paimon-common/src/test/java/org/apache/paimon/globalindex/bitmapindex/BitmapGlobalIndexTest.java
b/paimon-common/src/test/java/org/apache/paimon/globalindex/bitmapindex/BitmapGlobalIndexTest.java
new file mode 100644
index 0000000000..871ecafc3c
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/globalindex/bitmapindex/BitmapGlobalIndexTest.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.bitmapindex;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.fileindex.bitmap.BitmapFileIndex;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.globalindex.GlobalIndexMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.bitmap.BitmapGlobalIndex;
+import org.apache.paimon.globalindex.bitmap.BitmapIndexResultWrapper;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RoaringBitmap32;
+
+import org.junit.Rule;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.function.Consumer;
+
+/** Tests for {@link BitmapGlobalIndex}. */
+public class BitmapGlobalIndexTest {
+
+ @TempDir private File tempDir;
+
+ @Rule public TemporaryFolder folder = new TemporaryFolder();
+
+ @Test
+ public void testV1() throws Exception {
+ testIntType(BitmapFileIndex.VERSION_1);
+ testStringType(BitmapFileIndex.VERSION_1);
+ testBooleanType(BitmapFileIndex.VERSION_1);
+ testHighCardinality(BitmapFileIndex.VERSION_1, 1000000, 100000, null);
+ testStringTypeWithReusing(BitmapFileIndex.VERSION_1);
+ testAllNull(BitmapFileIndex.VERSION_1);
+ }
+
+ @Test
+ public void testV2() throws Exception {
+ testIntType(BitmapFileIndex.VERSION_2);
+ testStringType(BitmapFileIndex.VERSION_2);
+ testBooleanType(BitmapFileIndex.VERSION_2);
+ testHighCardinality(BitmapFileIndex.VERSION_2, 1000000, 100000, null);
+ testStringTypeWithReusing(BitmapFileIndex.VERSION_2);
+ testAllNull(BitmapFileIndex.VERSION_2);
+ }
+
+ private void testStringType(int version) throws Exception {
+ FieldRef fieldRef = new FieldRef(0, "", DataTypes.STRING());
+ BinaryString a = BinaryString.fromString("a");
+ BinaryString b = BinaryString.fromString("b");
+ Object[] dataColumn = {a, null, b, null, a};
+ GlobalIndexReader reader =
+ createTestReaderOnWriter(
+ version,
+ null,
+ DataTypes.STRING(),
+ writer -> {
+ for (Object o : dataColumn) {
+ writer.write(o);
+ }
+ });
+ assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, a))
+ .getBitmapIndexResult()
+ .get()
+ .equals(RoaringBitmap32.bitmapOf(0, 4));
+ assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, b))
+ .getBitmapIndexResult()
+ .get()
+ .equals(RoaringBitmap32.bitmapOf(2));
+ assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef))
+ .getBitmapIndexResult()
+ .get()
+ .equals(RoaringBitmap32.bitmapOf(1, 3));
+ assert ((BitmapIndexResultWrapper) reader.visitIn(fieldRef,
Arrays.asList(a, b)))
+ .getBitmapIndexResult()
+ .get()
+ .equals(RoaringBitmap32.bitmapOf(0, 2, 4));
+ assert !reader.visitEqual(fieldRef,
BinaryString.fromString("c")).iterator().hasNext();
+ }
+
+ private void testIntType(int version) throws Exception {
+ FieldRef fieldRef = new FieldRef(0, "", DataTypes.INT());
+ Object[] dataColumn = {0, 1, null};
+ GlobalIndexReader reader =
+ createTestReaderOnWriter(
+ version,
+ null,
+ DataTypes.INT(),
+ writer -> {
+ for (Object o : dataColumn) {
+ writer.write(o);
+ }
+ });
+ assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, 0))
+ .getBitmapIndexResult()
+ .get()
+ .equals(RoaringBitmap32.bitmapOf(0));
+ assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, 1))
+ .getBitmapIndexResult()
+ .get()
+ .equals(RoaringBitmap32.bitmapOf(1));
+ assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef))
+ .getBitmapIndexResult()
+ .get()
+ .equals(RoaringBitmap32.bitmapOf(2));
+ assert ((BitmapIndexResultWrapper) reader.visitIn(fieldRef,
Arrays.asList(0, 1, 2)))
+ .getBitmapIndexResult()
+ .get()
+ .equals(RoaringBitmap32.bitmapOf(0, 1));
+
+ assert !reader.visitEqual(fieldRef, 2).iterator().hasNext();
+ }
+
+ private void testBooleanType(int version) throws Exception {
+ FieldRef fieldRef = new FieldRef(0, "", DataTypes.BOOLEAN());
+ Object[] dataColumn = {Boolean.TRUE, Boolean.FALSE, Boolean.TRUE,
Boolean.FALSE, null};
+ GlobalIndexReader reader =
+ createTestReaderOnWriter(
+ version,
+ null,
+ DataTypes.BOOLEAN(),
+ writer -> {
+ for (Object o : dataColumn) {
+ writer.write(o);
+ }
+ });
+ assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef,
Boolean.TRUE))
+ .getBitmapIndexResult()
+ .get()
+ .equals(RoaringBitmap32.bitmapOf(0, 2));
+ assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef))
+ .getBitmapIndexResult()
+ .get()
+ .equals(RoaringBitmap32.bitmapOf(4));
+ }
+
+ private void testHighCardinality(
+ int version, int rowCount, int approxCardinality, Integer
secondaryBlockSize)
+ throws Exception {
+ FieldRef fieldRef = new FieldRef(0, "", DataTypes.STRING());
+ RoaringBitmap32 middleBm = new RoaringBitmap32();
+ RoaringBitmap32 nullBm = new RoaringBitmap32();
+ long time1 = System.currentTimeMillis();
+ String prefix = "ssssssssss";
+ GlobalIndexReader reader =
+ createTestReaderOnWriter(
+ version,
+ secondaryBlockSize,
+ DataTypes.STRING(),
+ writer -> {
+ for (int i = 0; i < rowCount; i++) {
+
+ int sid = (int) (Math.random() *
approxCardinality);
+ if (sid == approxCardinality / 2) {
+ middleBm.add(i);
+ } else if (Math.random() < 0.01) {
+ nullBm.add(i);
+ writer.write(null);
+ continue;
+ }
+ writer.write(BinaryString.fromString(prefix +
sid));
+ }
+ });
+ System.out.println("write time: " + (System.currentTimeMillis() -
time1));
+ long time2 = System.currentTimeMillis();
+ GlobalIndexResult result =
+ reader.visitEqual(
+ fieldRef, BinaryString.fromString(prefix +
(approxCardinality / 2)));
+ RoaringBitmap32 resultBm = ((BitmapIndexResultWrapper)
result).getBitmapIndexResult().get();
+ System.out.println("read time: " + (System.currentTimeMillis() -
time2));
+ assert resultBm.equals(middleBm);
+ long time3 = System.currentTimeMillis();
+ GlobalIndexResult resultNull = reader.visitIsNull(fieldRef);
+ RoaringBitmap32 resultNullBm =
+ ((BitmapIndexResultWrapper)
resultNull).getBitmapIndexResult().get();
+ System.out.println("read null bitmap time: " +
(System.currentTimeMillis() - time3));
+ assert resultNullBm.equals(nullBm);
+ }
+
+ private GlobalIndexReader createTestReaderOnWriter(
+ int writerVersion,
+ Integer indexBlockSize,
+ DataType dataType,
+ Consumer<GlobalIndexWriter> consumer)
+ throws Exception {
+ Options options = new Options();
+ options.setInteger(BitmapFileIndex.VERSION, writerVersion);
+ if (indexBlockSize != null) {
+ options.setInteger(BitmapFileIndex.INDEX_BLOCK_SIZE,
indexBlockSize);
+ }
+ BitmapFileIndex bitmapFileIndex = new BitmapFileIndex(dataType,
options);
+ BitmapGlobalIndex bitmapGlobalIndex = new
BitmapGlobalIndex(bitmapFileIndex);
+ final FileIO fileIO = new LocalFileIO();
+ GlobalIndexFileWriter fileWriter =
+ new GlobalIndexFileWriter() {
+ @Override
+ public String newFileName(String prefix) {
+ return prefix + UUID.randomUUID();
+ }
+
+ @Override
+ public OutputStream newOutputStream(String fileName)
throws IOException {
+ return fileIO.newOutputStream(new
Path(tempDir.toString(), fileName), true);
+ }
+ };
+ GlobalIndexWriter globalIndexWriter =
bitmapGlobalIndex.createWriter(fileWriter);
+ consumer.accept(globalIndexWriter);
+ String fileName = globalIndexWriter.finish().get(0).fileName();
+ Path path = new Path(tempDir.toString(), fileName);
+ long fileSize = fileIO.getFileSize(path);
+ Range range = new Range(0, Long.MAX_VALUE);
+
+ GlobalIndexFileReader fileReader =
+ prefix -> fileIO.newInputStream(new Path(tempDir.toString(),
prefix));
+
+ GlobalIndexMeta globalIndexMeta = new GlobalIndexMeta(fileName,
fileSize, range, null);
+
+ return bitmapGlobalIndex.createReader(
+ fileReader, Collections.singletonList(globalIndexMeta));
+ }
+
+ private void testStringTypeWithReusing(int version) throws Exception {
+ FieldRef fieldRef = new FieldRef(0, "", DataTypes.STRING());
+ BinaryString a = BinaryString.fromString("a");
+ BinaryString b = BinaryString.fromString("b");
+ BinaryString c = BinaryString.fromString("a");
+ GlobalIndexReader reader =
+ createTestReaderOnWriter(
+ version,
+ null,
+ DataTypes.STRING(),
+ writer -> {
+ writer.write(a);
+ writer.write(null);
+ a.pointTo(b.getSegments(), b.getOffset(),
b.getSizeInBytes());
+ writer.write(null);
+ writer.write(a);
+ writer.write(null);
+ a.pointTo(c.getSegments(), c.getOffset(),
c.getSizeInBytes());
+ writer.write(null);
+ });
+ assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, a))
+ .getBitmapIndexResult()
+ .get()
+ .equals(RoaringBitmap32.bitmapOf(0));
+ assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, b))
+ .getBitmapIndexResult()
+ .get()
+ .equals(RoaringBitmap32.bitmapOf(3));
+ assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef))
+ .getBitmapIndexResult()
+ .get()
+ .equals(RoaringBitmap32.bitmapOf(1, 2, 4, 5));
+ assert ((BitmapIndexResultWrapper) reader.visitIn(fieldRef,
Arrays.asList(a, b)))
+ .getBitmapIndexResult()
+ .get()
+ .equals(RoaringBitmap32.bitmapOf(0, 3));
+ assert !reader.visitEqual(fieldRef,
BinaryString.fromString("c")).iterator().hasNext();
+ }
+
+ private void testAllNull(int version) throws Exception {
+ FieldRef fieldRef = new FieldRef(0, "", DataTypes.INT());
+ Object[] dataColumn = {null, null, null};
+ GlobalIndexReader reader =
+ createTestReaderOnWriter(
+ version,
+ null,
+ DataTypes.INT(),
+ writer -> {
+ for (Object o : dataColumn) {
+ writer.write(o);
+ }
+ });
+ assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef))
+ .getBitmapIndexResult()
+ .get()
+ .equals(RoaringBitmap32.bitmapOf(0, 1, 2));
+ assert !reader.visitIsNotNull(fieldRef).iterator().hasNext();
+ }
+}