This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 153a219b20 [core] Introduce basic structure for global index write 
(#6628)
153a219b20 is described below

commit 153a219b20eaed5d8474a8e3e3901603066559e2
Author: YeJunHao <[email protected]>
AuthorDate: Wed Nov 19 16:03:56 2025 +0800

    [core] Introduce basic structure for global index write (#6628)
---
 .../paimon/globalindex/GlobalIndexResult.java      |  26 ++
 .../paimon/globalindex/GlobalIndexWriter.java      |  62 ++++
 .../{GlobalFileReader.java => GlobalIndexer.java}  |  20 +-
 ...alFileReader.java => GlobalIndexerFactory.java} |  12 +-
 .../globalindex/GlobalIndexerFactoryUtils.java     |  56 ++++
 .../globalindex/bitmap/BitmapGlobalIndex.java      |  22 +-
 .../BitmapGlobalIndexerFactory.java}               |  39 ++-
 .../bitmap/BitmapIndexResultWrapper.java           |   4 +
 .../GlobalIndexFileReader.java}                    |   6 +-
 .../GlobalIndexFileWriter.java}                    |  13 +-
 .../globalindex/wrap/FileIndexWriterWrapper.java   |  67 +++++
 .../main/java/org/apache/paimon/utils/Range.java   |   4 +
 ....apache.paimon.globalindex.GlobalIndexerFactory |  16 ++
 .../bitmapindex/BitmapGlobalIndexTest.java         | 314 +++++++++++++++++++++
 14 files changed, 617 insertions(+), 44 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexResult.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexResult.java
index f1ac2213cb..3b00f8029c 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexResult.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexResult.java
@@ -18,8 +18,10 @@
 
 package org.apache.paimon.globalindex;
 
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import java.util.Set;
 
 /**
  * Global index result represents row ids.
@@ -42,4 +44,28 @@ public interface GlobalIndexResult extends Iterable<Long> {
                     }
                 };
     }
+
+    default GlobalIndexResult and(GlobalIndexResult other) {
+        Set<Long> set = new HashSet<>();
+        this.forEach(set::add);
+
+        Set<Long> result = new HashSet<>();
+        for (Long l : other) {
+            if (set.contains(l)) {
+                result.add(l);
+            }
+        }
+        return wrap(result);
+    }
+
+    default GlobalIndexResult or(GlobalIndexResult other) {
+        Set<Long> result = new HashSet<>();
+        this.forEach(result::add);
+        other.forEach(result::add);
+        return wrap(result);
+    }
+
+    static GlobalIndexResult wrap(Set<Long> longs) {
+        return longs::iterator;
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexWriter.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexWriter.java
new file mode 100644
index 0000000000..d04d71aa5e
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexWriter.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex;
+
+import org.apache.paimon.utils.Range;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/** Index writer for global index. */
+public interface GlobalIndexWriter {
+
+    void write(Object key);
+
+    List<ResultEntry> finish();
+
+    /** Write result meta. */
+    class ResultEntry {
+        private final String fileName;
+        private final Range rowRange;
+        @Nullable private final byte[] meta;
+
+        public ResultEntry(String fileName, @Nullable byte[] meta, Range 
rowRange) {
+            this.fileName = fileName;
+            this.meta = meta;
+            this.rowRange = rowRange;
+        }
+
+        public String fileName() {
+            return fileName;
+        }
+
+        public Range rowRange() {
+            return rowRange;
+        }
+
+        public byte[] meta() {
+            return meta;
+        }
+
+        public static ResultEntry of(String fileName, byte[] meta, Range 
rowRange) {
+            return new ResultEntry(fileName, meta, rowRange);
+        }
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalFileReader.java
 b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexer.java
similarity index 53%
copy from 
paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalFileReader.java
copy to 
paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexer.java
index 7524d1e1f9..6badadc878 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalFileReader.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexer.java
@@ -18,12 +18,24 @@
 
 package org.apache.paimon.globalindex;
 
-import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataType;
 
 import java.io.IOException;
+import java.util.List;
 
-/** File reader for global index. */
-public interface GlobalFileReader {
+/** Abstract base class for global indexers. */
+public interface GlobalIndexer {
 
-    SeekableInputStream create(String fileName) throws IOException;
+    GlobalIndexWriter createWriter(GlobalIndexFileWriter fileWriter) throws 
IOException;
+
+    GlobalIndexReader createReader(GlobalIndexFileReader fileReader, 
List<GlobalIndexMeta> files)
+            throws IOException;
+
+    static GlobalIndexer create(String type, DataType dataType, Options 
options) {
+        GlobalIndexerFactory globalIndexerFactory = 
GlobalIndexerFactoryUtils.load(type);
+        return globalIndexerFactory.create(dataType, options);
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalFileReader.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexerFactory.java
similarity index 73%
copy from 
paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalFileReader.java
copy to 
paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexerFactory.java
index 7524d1e1f9..53154ef50b 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalFileReader.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexerFactory.java
@@ -18,12 +18,14 @@
 
 package org.apache.paimon.globalindex;
 
-import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fileindex.FileIndexer;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataType;
 
-import java.io.IOException;
+/** File index factory to construct {@link FileIndexer}. */
+public interface GlobalIndexerFactory {
 
-/** File reader for global index. */
-public interface GlobalFileReader {
+    String identifier();
 
-    SeekableInputStream create(String fileName) throws IOException;
+    GlobalIndexer create(DataType type, Options options);
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexerFactoryUtils.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexerFactoryUtils.java
new file mode 100644
index 0000000000..43a8768f76
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexerFactoryUtils.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+
+/** Load utils to load GlobalIndexerFactory. */
+public class GlobalIndexerFactoryUtils {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(GlobalIndexerFactoryUtils.class);
+
+    private static final Map<String, GlobalIndexerFactory> factories = new 
HashMap<>();
+
+    static {
+        ServiceLoader<GlobalIndexerFactory> serviceLoader =
+                ServiceLoader.load(GlobalIndexerFactory.class);
+
+        for (GlobalIndexerFactory indexerFactory : serviceLoader) {
+            if (factories.put(indexerFactory.identifier(), indexerFactory) != 
null) {
+                LOG.warn(
+                        "Found multiple GlobalIndexer for type: "
+                                + indexerFactory.identifier()
+                                + ", choose one of them");
+            }
+        }
+    }
+
+    public static GlobalIndexerFactory load(String type) {
+        GlobalIndexerFactory globalIndexerFactory = factories.get(type);
+        if (globalIndexerFactory == null) {
+            throw new RuntimeException("Can't find global index for type: " + 
type);
+        }
+        return globalIndexerFactory;
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/bitmap/BitmapGlobalIndex.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/bitmap/BitmapGlobalIndex.java
index 6798dfe5d6..19bbcccb71 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/bitmap/BitmapGlobalIndex.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/bitmap/BitmapGlobalIndex.java
@@ -20,14 +20,19 @@ package org.apache.paimon.globalindex.bitmap;
 
 import org.apache.paimon.fileindex.FileIndexReader;
 import org.apache.paimon.fileindex.FileIndexResult;
+import org.apache.paimon.fileindex.FileIndexWriter;
 import org.apache.paimon.fileindex.bitmap.BitmapFileIndex;
 import org.apache.paimon.fileindex.bitmap.BitmapIndexResult;
 import org.apache.paimon.fs.SeekableInputStream;
-import org.apache.paimon.globalindex.GlobalFileReader;
 import org.apache.paimon.globalindex.GlobalIndexMeta;
 import org.apache.paimon.globalindex.GlobalIndexReader;
 import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.GlobalIndexer;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
 import org.apache.paimon.globalindex.wrap.FileIndexReaderWrapper;
+import org.apache.paimon.globalindex.wrap.FileIndexWriterWrapper;
 import org.apache.paimon.utils.Range;
 
 import java.io.IOException;
@@ -36,7 +41,7 @@ import java.util.List;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Bitmap global index. */
-public class BitmapGlobalIndex {
+public class BitmapGlobalIndex implements GlobalIndexer {
 
     private final BitmapFileIndex index;
 
@@ -44,11 +49,18 @@ public class BitmapGlobalIndex {
         this.index = index;
     }
 
-    public GlobalIndexReader createReader(GlobalFileReader fileReader, 
List<GlobalIndexMeta> files)
-            throws IOException {
+    @Override
+    public GlobalIndexWriter createWriter(GlobalIndexFileWriter fileWriter) 
throws IOException {
+        FileIndexWriter writer = index.createWriter();
+        return new FileIndexWriterWrapper(
+                fileWriter, writer, BitmapGlobalIndexerFactory.IDENTIFIER);
+    }
+
+    public GlobalIndexReader createReader(
+            GlobalIndexFileReader fileReader, List<GlobalIndexMeta> files) 
throws IOException {
         checkArgument(files.size() == 1);
         GlobalIndexMeta indexMeta = files.get(0);
-        SeekableInputStream input = fileReader.create(indexMeta.fileName());
+        SeekableInputStream input = 
fileReader.getInputStream(indexMeta.fileName());
         FileIndexReader reader = index.createReader(input, 0, (int) 
indexMeta.fileSize());
         return new FileIndexReaderWrapper(
                 reader, r -> toGlobalResult(indexMeta.rowIdRange(), r), input);
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexResult.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/bitmap/BitmapGlobalIndexerFactory.java
similarity index 51%
copy from 
paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexResult.java
copy to 
paimon-common/src/main/java/org/apache/paimon/globalindex/bitmap/BitmapGlobalIndexerFactory.java
index f1ac2213cb..efbe0bbd2f 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexResult.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/bitmap/BitmapGlobalIndexerFactory.java
@@ -16,30 +16,27 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.globalindex;
+package org.apache.paimon.globalindex.bitmap;
 
-import java.util.Iterator;
-import java.util.NoSuchElementException;
+import org.apache.paimon.fileindex.bitmap.BitmapFileIndex;
+import org.apache.paimon.globalindex.GlobalIndexer;
+import org.apache.paimon.globalindex.GlobalIndexerFactory;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataType;
 
-/**
- * Global index result represents row ids.
- *
- * <p>TODO introduce ranges interface
- */
-public interface GlobalIndexResult extends Iterable<Long> {
+/** Factory for creating bitmap global indexers. */
+public class BitmapGlobalIndexerFactory implements GlobalIndexerFactory {
 
-    static GlobalIndexResult createEmpty() {
-        return () ->
-                new Iterator<Long>() {
-                    @Override
-                    public boolean hasNext() {
-                        return false;
-                    }
+    public static final String IDENTIFIER = "bitmap";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
 
-                    @Override
-                    public Long next() {
-                        throw new NoSuchElementException();
-                    }
-                };
+    @Override
+    public GlobalIndexer create(DataType type, Options options) {
+        BitmapFileIndex bitmapFileIndex = new BitmapFileIndex(type, options);
+        return new BitmapGlobalIndex(bitmapFileIndex);
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/bitmap/BitmapIndexResultWrapper.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/bitmap/BitmapIndexResultWrapper.java
index 6b10f010d3..badbcfa219 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/bitmap/BitmapIndexResultWrapper.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/bitmap/BitmapIndexResultWrapper.java
@@ -57,4 +57,8 @@ public class BitmapIndexResultWrapper implements 
GlobalIndexResult {
         return new BitmapIndexResultWrapper(
                 new BitmapIndexResult(() -> bitmapOfRange(0, range.to - 
range.from)), range.from);
     }
+
+    public BitmapIndexResult getBitmapIndexResult() {
+        return result;
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalFileReader.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/io/GlobalIndexFileReader.java
similarity index 85%
copy from 
paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalFileReader.java
copy to 
paimon-common/src/main/java/org/apache/paimon/globalindex/io/GlobalIndexFileReader.java
index 7524d1e1f9..7c88057fb4 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalFileReader.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/io/GlobalIndexFileReader.java
@@ -16,14 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.globalindex;
+package org.apache.paimon.globalindex.io;
 
 import org.apache.paimon.fs.SeekableInputStream;
 
 import java.io.IOException;
 
 /** File reader for global index. */
-public interface GlobalFileReader {
+public interface GlobalIndexFileReader {
 
-    SeekableInputStream create(String fileName) throws IOException;
+    SeekableInputStream getInputStream(String fileName) throws IOException;
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalFileReader.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/io/GlobalIndexFileWriter.java
similarity index 76%
rename from 
paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalFileReader.java
rename to 
paimon-common/src/main/java/org/apache/paimon/globalindex/io/GlobalIndexFileWriter.java
index 7524d1e1f9..850cbb9451 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalFileReader.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/io/GlobalIndexFileWriter.java
@@ -16,14 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.globalindex;
-
-import org.apache.paimon.fs.SeekableInputStream;
+package org.apache.paimon.globalindex.io;
 
 import java.io.IOException;
+import java.io.OutputStream;
+
+/** Global index writer io operators. */
+public interface GlobalIndexFileWriter {
 
-/** File reader for global index. */
-public interface GlobalFileReader {
+    String newFileName(String prefix);
 
-    SeekableInputStream create(String fileName) throws IOException;
+    OutputStream newOutputStream(String fileName) throws IOException;
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/wrap/FileIndexWriterWrapper.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/wrap/FileIndexWriterWrapper.java
new file mode 100644
index 0000000000..b7bbb5b2a7
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/wrap/FileIndexWriterWrapper.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.wrap;
+
+import org.apache.paimon.fileindex.FileIndexWriter;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
+import org.apache.paimon.utils.Range;
+
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+
+/** A {@link GlobalIndexReader} wrapper for {@link FileIndexWriter}. */
+public class FileIndexWriterWrapper implements GlobalIndexWriter {
+
+    private final GlobalIndexFileWriter fileWriter;
+    private final FileIndexWriter writer;
+    private final String indexType;
+    private long count = 0;
+
+    public FileIndexWriterWrapper(
+            GlobalIndexFileWriter fileWriter, FileIndexWriter writer, String 
indexType) {
+        this.fileWriter = fileWriter;
+        this.writer = writer;
+        this.indexType = indexType;
+    }
+
+    @Override
+    public void write(Object key) {
+        count++;
+        writer.write(key);
+    }
+
+    @Override
+    public List<ResultEntry> finish() {
+        if (count > 0) {
+            String fileName = fileWriter.newFileName(indexType);
+            try (OutputStream outputStream = 
fileWriter.newOutputStream(fileName)) {
+                outputStream.write(writer.serializedBytes());
+            } catch (Exception e) {
+                throw new RuntimeException("Failed to write global index file: 
" + fileName, e);
+            }
+            return Collections.singletonList(
+                    ResultEntry.of(fileName, null, new Range(0, count - 1)));
+        } else {
+            return Collections.emptyList();
+        }
+    }
+}
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/Range.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
index 57a35e59d6..3a77b309f2 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
@@ -37,6 +37,10 @@ public class Range {
         return to - from + 1;
     }
 
+    public Range addOffset(long offset) {
+        return new Range(from + offset, to + offset);
+    }
+
     public boolean isBefore(Range other) {
         return to < other.from;
     }
diff --git 
a/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory
 
b/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory
new file mode 100644
index 0000000000..e1e63b1057
--- /dev/null
+++ 
b/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.paimon.globalindex.bitmap.BitmapGlobalIndexerFactory
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/globalindex/bitmapindex/BitmapGlobalIndexTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/globalindex/bitmapindex/BitmapGlobalIndexTest.java
new file mode 100644
index 0000000000..871ecafc3c
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/globalindex/bitmapindex/BitmapGlobalIndexTest.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.bitmapindex;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.fileindex.bitmap.BitmapFileIndex;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.globalindex.GlobalIndexMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.bitmap.BitmapGlobalIndex;
+import org.apache.paimon.globalindex.bitmap.BitmapIndexResultWrapper;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RoaringBitmap32;
+
+import org.junit.Rule;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.function.Consumer;
+
+/** Tests for {@link BitmapGlobalIndex}. */
+public class BitmapGlobalIndexTest {
+
+    @TempDir private File tempDir;
+
+    @Rule public TemporaryFolder folder = new TemporaryFolder();
+
+    @Test
+    public void testV1() throws Exception {
+        testIntType(BitmapFileIndex.VERSION_1);
+        testStringType(BitmapFileIndex.VERSION_1);
+        testBooleanType(BitmapFileIndex.VERSION_1);
+        testHighCardinality(BitmapFileIndex.VERSION_1, 1000000, 100000, null);
+        testStringTypeWithReusing(BitmapFileIndex.VERSION_1);
+        testAllNull(BitmapFileIndex.VERSION_1);
+    }
+
+    @Test
+    public void testV2() throws Exception {
+        testIntType(BitmapFileIndex.VERSION_2);
+        testStringType(BitmapFileIndex.VERSION_2);
+        testBooleanType(BitmapFileIndex.VERSION_2);
+        testHighCardinality(BitmapFileIndex.VERSION_2, 1000000, 100000, null);
+        testStringTypeWithReusing(BitmapFileIndex.VERSION_2);
+        testAllNull(BitmapFileIndex.VERSION_2);
+    }
+
+    private void testStringType(int version) throws Exception {
+        FieldRef fieldRef = new FieldRef(0, "", DataTypes.STRING());
+        BinaryString a = BinaryString.fromString("a");
+        BinaryString b = BinaryString.fromString("b");
+        Object[] dataColumn = {a, null, b, null, a};
+        GlobalIndexReader reader =
+                createTestReaderOnWriter(
+                        version,
+                        null,
+                        DataTypes.STRING(),
+                        writer -> {
+                            for (Object o : dataColumn) {
+                                writer.write(o);
+                            }
+                        });
+        assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, a))
+                .getBitmapIndexResult()
+                .get()
+                .equals(RoaringBitmap32.bitmapOf(0, 4));
+        assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, b))
+                .getBitmapIndexResult()
+                .get()
+                .equals(RoaringBitmap32.bitmapOf(2));
+        assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef))
+                .getBitmapIndexResult()
+                .get()
+                .equals(RoaringBitmap32.bitmapOf(1, 3));
+        assert ((BitmapIndexResultWrapper) reader.visitIn(fieldRef, 
Arrays.asList(a, b)))
+                .getBitmapIndexResult()
+                .get()
+                .equals(RoaringBitmap32.bitmapOf(0, 2, 4));
+        assert !reader.visitEqual(fieldRef, 
BinaryString.fromString("c")).iterator().hasNext();
+    }
+
+    private void testIntType(int version) throws Exception {
+        FieldRef fieldRef = new FieldRef(0, "", DataTypes.INT());
+        Object[] dataColumn = {0, 1, null};
+        GlobalIndexReader reader =
+                createTestReaderOnWriter(
+                        version,
+                        null,
+                        DataTypes.INT(),
+                        writer -> {
+                            for (Object o : dataColumn) {
+                                writer.write(o);
+                            }
+                        });
+        assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, 0))
+                .getBitmapIndexResult()
+                .get()
+                .equals(RoaringBitmap32.bitmapOf(0));
+        assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, 1))
+                .getBitmapIndexResult()
+                .get()
+                .equals(RoaringBitmap32.bitmapOf(1));
+        assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef))
+                .getBitmapIndexResult()
+                .get()
+                .equals(RoaringBitmap32.bitmapOf(2));
+        assert ((BitmapIndexResultWrapper) reader.visitIn(fieldRef, 
Arrays.asList(0, 1, 2)))
+                .getBitmapIndexResult()
+                .get()
+                .equals(RoaringBitmap32.bitmapOf(0, 1));
+
+        assert !reader.visitEqual(fieldRef, 2).iterator().hasNext();
+    }
+
+    private void testBooleanType(int version) throws Exception {
+        FieldRef fieldRef = new FieldRef(0, "", DataTypes.BOOLEAN());
+        Object[] dataColumn = {Boolean.TRUE, Boolean.FALSE, Boolean.TRUE, 
Boolean.FALSE, null};
+        GlobalIndexReader reader =
+                createTestReaderOnWriter(
+                        version,
+                        null,
+                        DataTypes.BOOLEAN(),
+                        writer -> {
+                            for (Object o : dataColumn) {
+                                writer.write(o);
+                            }
+                        });
+        assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, 
Boolean.TRUE))
+                .getBitmapIndexResult()
+                .get()
+                .equals(RoaringBitmap32.bitmapOf(0, 2));
+        assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef))
+                .getBitmapIndexResult()
+                .get()
+                .equals(RoaringBitmap32.bitmapOf(4));
+    }
+
+    private void testHighCardinality(
+            int version, int rowCount, int approxCardinality, Integer 
secondaryBlockSize)
+            throws Exception {
+        FieldRef fieldRef = new FieldRef(0, "", DataTypes.STRING());
+        RoaringBitmap32 middleBm = new RoaringBitmap32();
+        RoaringBitmap32 nullBm = new RoaringBitmap32();
+        long time1 = System.currentTimeMillis();
+        String prefix = "ssssssssss";
+        GlobalIndexReader reader =
+                createTestReaderOnWriter(
+                        version,
+                        secondaryBlockSize,
+                        DataTypes.STRING(),
+                        writer -> {
+                            for (int i = 0; i < rowCount; i++) {
+
+                                int sid = (int) (Math.random() * 
approxCardinality);
+                                if (sid == approxCardinality / 2) {
+                                    middleBm.add(i);
+                                } else if (Math.random() < 0.01) {
+                                    nullBm.add(i);
+                                    writer.write(null);
+                                    continue;
+                                }
+                                writer.write(BinaryString.fromString(prefix + 
sid));
+                            }
+                        });
+        System.out.println("write time: " + (System.currentTimeMillis() - 
time1));
+        long time2 = System.currentTimeMillis();
+        GlobalIndexResult result =
+                reader.visitEqual(
+                        fieldRef, BinaryString.fromString(prefix + 
(approxCardinality / 2)));
+        RoaringBitmap32 resultBm = ((BitmapIndexResultWrapper) 
result).getBitmapIndexResult().get();
+        System.out.println("read time: " + (System.currentTimeMillis() - 
time2));
+        assert resultBm.equals(middleBm);
+        long time3 = System.currentTimeMillis();
+        GlobalIndexResult resultNull = reader.visitIsNull(fieldRef);
+        RoaringBitmap32 resultNullBm =
+                ((BitmapIndexResultWrapper) 
resultNull).getBitmapIndexResult().get();
+        System.out.println("read null bitmap time: " + 
(System.currentTimeMillis() - time3));
+        assert resultNullBm.equals(nullBm);
+    }
+
+    private GlobalIndexReader createTestReaderOnWriter(
+            int writerVersion,
+            Integer indexBlockSize,
+            DataType dataType,
+            Consumer<GlobalIndexWriter> consumer)
+            throws Exception {
+        Options options = new Options();
+        options.setInteger(BitmapFileIndex.VERSION, writerVersion);
+        if (indexBlockSize != null) {
+            options.setInteger(BitmapFileIndex.INDEX_BLOCK_SIZE, 
indexBlockSize);
+        }
+        BitmapFileIndex bitmapFileIndex = new BitmapFileIndex(dataType, 
options);
+        BitmapGlobalIndex bitmapGlobalIndex = new 
BitmapGlobalIndex(bitmapFileIndex);
+        final FileIO fileIO = new LocalFileIO();
+        GlobalIndexFileWriter fileWriter =
+                new GlobalIndexFileWriter() {
+                    @Override
+                    public String newFileName(String prefix) {
+                        return prefix + UUID.randomUUID();
+                    }
+
+                    @Override
+                    public OutputStream newOutputStream(String fileName) 
throws IOException {
+                        return fileIO.newOutputStream(new 
Path(tempDir.toString(), fileName), true);
+                    }
+                };
+        GlobalIndexWriter globalIndexWriter = 
bitmapGlobalIndex.createWriter(fileWriter);
+        consumer.accept(globalIndexWriter);
+        String fileName = globalIndexWriter.finish().get(0).fileName();
+        Path path = new Path(tempDir.toString(), fileName);
+        long fileSize = fileIO.getFileSize(path);
+        Range range = new Range(0, Long.MAX_VALUE);
+
+        GlobalIndexFileReader fileReader =
+                prefix -> fileIO.newInputStream(new Path(tempDir.toString(), 
prefix));
+
+        GlobalIndexMeta globalIndexMeta = new GlobalIndexMeta(fileName, 
fileSize, range, null);
+
+        return bitmapGlobalIndex.createReader(
+                fileReader, Collections.singletonList(globalIndexMeta));
+    }
+
+    private void testStringTypeWithReusing(int version) throws Exception {
+        FieldRef fieldRef = new FieldRef(0, "", DataTypes.STRING());
+        BinaryString a = BinaryString.fromString("a");
+        BinaryString b = BinaryString.fromString("b");
+        BinaryString c = BinaryString.fromString("a");
+        GlobalIndexReader reader =
+                createTestReaderOnWriter(
+                        version,
+                        null,
+                        DataTypes.STRING(),
+                        writer -> {
+                            writer.write(a);
+                            writer.write(null);
+                            a.pointTo(b.getSegments(), b.getOffset(), 
b.getSizeInBytes());
+                            writer.write(null);
+                            writer.write(a);
+                            writer.write(null);
+                            a.pointTo(c.getSegments(), c.getOffset(), 
c.getSizeInBytes());
+                            writer.write(null);
+                        });
+        assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, a))
+                .getBitmapIndexResult()
+                .get()
+                .equals(RoaringBitmap32.bitmapOf(0));
+        assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, b))
+                .getBitmapIndexResult()
+                .get()
+                .equals(RoaringBitmap32.bitmapOf(3));
+        assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef))
+                .getBitmapIndexResult()
+                .get()
+                .equals(RoaringBitmap32.bitmapOf(1, 2, 4, 5));
+        assert ((BitmapIndexResultWrapper) reader.visitIn(fieldRef, 
Arrays.asList(a, b)))
+                .getBitmapIndexResult()
+                .get()
+                .equals(RoaringBitmap32.bitmapOf(0, 3));
+        assert !reader.visitEqual(fieldRef, 
BinaryString.fromString("c")).iterator().hasNext();
+    }
+
+    private void testAllNull(int version) throws Exception {
+        FieldRef fieldRef = new FieldRef(0, "", DataTypes.INT());
+        Object[] dataColumn = {null, null, null};
+        GlobalIndexReader reader =
+                createTestReaderOnWriter(
+                        version,
+                        null,
+                        DataTypes.INT(),
+                        writer -> {
+                            for (Object o : dataColumn) {
+                                writer.write(o);
+                            }
+                        });
+        assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef))
+                .getBitmapIndexResult()
+                .get()
+                .equals(RoaringBitmap32.bitmapOf(0, 1, 2));
+        assert !reader.visitIsNotNull(fieldRef).iterator().hasNext();
+    }
+}


Reply via email to