This is an automated email from the ASF dual-hosted git repository. hope pushed a commit to branch release-1.4 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 773c438296776aa89344c7ee11c1d8965314ac81 Author: Jingsong Lee <[email protected]> AuthorDate: Mon Mar 30 17:53:23 2026 +0800 [tantivy] Introduce paimon-tantivy for full text search index (#7551) Introduced a complete Tantivy full text search module, including Rust JNI layer, Java integration layer, archive format CI workflow. --- ...titcase-vortex.yml => utitcase-rust-native.yml} | 39 ++- .gitignore | 5 +- docs/content/append-table/global-index.md | 55 +++- paimon-tantivy/paimon-tantivy-index/README.md | 148 +++++++++ paimon-tantivy/paimon-tantivy-index/pom.xml | 91 ++++++ .../index/TantivyFullTextGlobalIndexReader.java | 311 ++++++++++++++++++ .../index/TantivyFullTextGlobalIndexWriter.java | 217 +++++++++++++ .../index/TantivyFullTextGlobalIndexer.java | 45 +++ .../index/TantivyFullTextGlobalIndexerFactory.java | 40 +++ .../index/TantivyScoredGlobalIndexResult.java | 57 ++++ ....apache.paimon.globalindex.GlobalIndexerFactory | 16 + .../index/TantivyFullTextGlobalIndexTest.java | 300 +++++++++++++++++ paimon-tantivy/paimon-tantivy-jni/README.md | 60 ++++ paimon-tantivy/paimon-tantivy-jni/pom.xml | 69 ++++ paimon-tantivy/paimon-tantivy-jni/rust/Cargo.toml | 13 + .../paimon-tantivy-jni/rust/src/jni_directory.rs | 259 +++++++++++++++ paimon-tantivy/paimon-tantivy-jni/rust/src/lib.rs | 358 +++++++++++++++++++++ .../org/apache/paimon/tantivy/NativeLoader.java | 83 +++++ .../org/apache/paimon/tantivy/SearchResult.java | 43 +++ .../org/apache/paimon/tantivy/StreamFileInput.java | 32 ++ .../apache/paimon/tantivy/TantivyIndexWriter.java | 70 ++++ .../org/apache/paimon/tantivy/TantivySearcher.java | 91 ++++++ .../src/main/resources/META-INF/NOTICE | 17 + .../org/apache/paimon/tantivy/TantivyJniTest.java | 77 +++++ .../paimon/tantivy/TantivyStreamSearchTest.java | 174 ++++++++++ paimon-tantivy/pom.xml | 39 +++ pom.xml | 2 + 27 files changed, 2708 insertions(+), 3 deletions(-) diff --git a/.github/workflows/utitcase-vortex.yml b/.github/workflows/utitcase-rust-native.yml similarity index 67% rename from .github/workflows/utitcase-vortex.yml rename to .github/workflows/utitcase-rust-native.yml index fa83ab22d8..3493193881 100644 --- a/.github/workflows/utitcase-vortex.yml +++ b/.github/workflows/utitcase-rust-native.yml @@ -16,15 +16,17 @@ # limitations under the License. ################################################################################ -name: UTCase Vortex +name: UTCase Rust Native on: push: paths: - 'paimon-vortex/**' + - 'paimon-tantivy/**' pull_request: paths: - 'paimon-vortex/**' + - 'paimon-tantivy/**' env: JDK_VERSION: 8 @@ -70,3 +72,38 @@ jobs: mvn -B -ntp verify -pl paimon-vortex/paimon-vortex-jni,paimon-vortex/paimon-vortex-format -Dcheckstyle.skip=true -Dspotless.check.skip=true env: MAVEN_OPTS: -Xmx4096m + + tantivy_test: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v6 + + - name: Set up JDK ${{ env.JDK_VERSION }} + uses: actions/setup-java@v5 + with: + java-version: ${{ env.JDK_VERSION }} + distribution: 'temurin' + + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@stable + + - name: Build Tantivy native library + run: | + cd paimon-tantivy/paimon-tantivy-jni/rust + cargo build --release + + - name: Copy native library to resources + run: | + RESOURCE_DIR=paimon-tantivy/paimon-tantivy-jni/src/main/resources/native/linux-amd64 + mkdir -p ${RESOURCE_DIR} + cp paimon-tantivy/paimon-tantivy-jni/rust/target/release/libtantivy_jni.so ${RESOURCE_DIR}/ + + - name: Build and test Tantivy modules + timeout-minutes: 30 + run: | + mvn -T 2C -B -ntp clean install -DskipTests + mvn -B -ntp verify -pl paimon-tantivy/paimon-tantivy-jni,paimon-tantivy/paimon-tantivy-index -Dcheckstyle.skip=true -Dspotless.check.skip=true + env: + MAVEN_OPTS: -Xmx4096m diff --git a/.gitignore b/.gitignore index 6d7a8a971e..045e76e926 100644 --- a/.gitignore +++ b/.gitignore @@ -44,6 +44,9 @@ paimon-python/dev/log *.swp .cache +### Rust ### +Cargo.lock ### Vortex lib ### - *libvortex_jni* +### Tantivy lib ### +*libtantivy_jni* diff --git a/docs/content/append-table/global-index.md b/docs/content/append-table/global-index.md index dc90a4664f..39a7b6b0be 100644 --- a/docs/content/append-table/global-index.md +++ b/docs/content/append-table/global-index.md @@ -33,6 +33,7 @@ without full-table scans. Paimon supports multiple global index types: - **BTree Index**: A B-tree based index for scalar column lookups. Supports equality, IN, range predicates, and can be combined across multiple columns with AND/OR logic. - **Vector Index**: An approximate nearest neighbor (ANN) index powered by DiskANN for vector similarity search. +- **Full-Text Index**: A full-text search index powered by Tantivy for text retrieval. Supports term matching and relevance scoring. Global indexes work on top of Data Evolution tables. To use global indexes, your table **must** have: @@ -48,7 +49,8 @@ Create a table with the required properties: CREATE TABLE my_table ( id INT, name STRING, - embedding ARRAY<FLOAT> + embedding ARRAY<FLOAT>, + content STRING ) TBLPROPERTIES ( 'bucket' = '-1', 'row-tracking.enabled' = 'true', @@ -133,3 +135,54 @@ try (RecordReader<InternalRow> reader = readBuilder.newRead().createReader(plan) {{< /tab >}} {{< /tabs >}} + +## Full-Text Index + +Full-Text Index provides text search capabilities powered by Tantivy. It is suitable for text retrieval scenarios +such as document search, log analysis, and content-based filtering. + +**Build Full-Text Index** + +```sql +-- Create full-text index on 'content' column +CALL sys.create_global_index( + table => 'db.my_table', + index_column => 'content', + index_type => 'tantivy-fulltext' +); +``` + +**Full-Text Search** + +{{< tabs "fulltext-search" >}} + +{{< tab "Spark SQL" >}} +```sql +-- Search for top-10 documents matching the query +SELECT * FROM full_text_search('my_table', 'content', 'paimon lake format', 10); +``` +{{< /tab >}} + +{{< tab "Java API" >}} +```java +Table table = catalog.getTable(identifier); + +// Step 1: Build full-text search +GlobalIndexResult result = table.newFullTextSearchBuilder() + .withQueryText("paimon lake format") + .withLimit(10) + .withTextColumn("content") + .executeLocal(); + +// Step 2: Read matching rows using the search result +ReadBuilder readBuilder = table.newReadBuilder(); +TableScan.Plan plan = readBuilder.newScan().withGlobalIndexResult(result).plan(); +try (RecordReader<InternalRow> reader = readBuilder.newRead().createReader(plan)) { + reader.forEachRemaining(row -> { + System.out.println("id=" + row.getInt(0) + ", content=" + row.getString(1)); + }); +} +``` +{{< /tab >}} + +{{< /tabs >}} diff --git a/paimon-tantivy/paimon-tantivy-index/README.md b/paimon-tantivy/paimon-tantivy-index/README.md new file mode 100644 index 0000000000..18ea8bfc58 --- /dev/null +++ b/paimon-tantivy/paimon-tantivy-index/README.md @@ -0,0 +1,148 @@ +# Paimon Tantivy Index + +Full-text search global index for Apache Paimon, powered by [Tantivy](https://github.com/quickwit-oss/tantivy) (a Rust full-text search engine). + +## Overview + +This module provides full-text search capabilities for Paimon's Data Evolution (append) tables through the Global Index framework. It consists of two sub-modules: + +- **paimon-tantivy-jni**: Rust/JNI bridge that wraps Tantivy's indexing and search APIs as native methods callable from Java. +- **paimon-tantivy-index**: Java integration layer that implements Paimon's `GlobalIndexer` SPI, handling index building, archive packing, and query execution. + +### Architecture + +``` +┌─────────────────────────────────────────────────────┐ +│ Paimon Engine │ +│ (FullTextSearchBuilder / FullTextScan / FullTextRead)│ +└──────────────────────┬──────────────────────────────┘ + │ GlobalIndexer SPI +┌──────────────────────▼──────────────────────────────┐ +│ paimon-tantivy-index │ +│ TantivyFullTextGlobalIndexWriter (build index) │ +│ TantivyFullTextGlobalIndexReader (search index) │ +└──────────────────────┬──────────────────────────────┘ + │ JNI +┌──────────────────────▼──────────────────────────────┐ +│ paimon-tantivy-jni │ +│ TantivyIndexWriter (write docs via JNI) │ +│ TantivySearcher (search via JNI / stream I/O) │ +└──────────────────────┬──────────────────────────────┘ + │ FFI +┌──────────────────────▼──────────────────────────────┐ +│ Rust (lib.rs + jni_directory.rs) │ +│ Tantivy index writer / reader / query parser │ +└─────────────────────────────────────────────────────┘ +``` + +### Index Schema + +Tantivy index uses a fixed two-field schema: + +| Field | Tantivy Type | Description | +|-----------|-------------|--------------------------------------------------| +| `row_id` | u64 (stored, indexed) | Paimon's global row ID, used to map search results back to table rows | +| `text` | TEXT (tokenized, indexed) | The text content from the indexed column | + +## Archive File Format + +The writer produces a **single archive file** that bundles all Tantivy segment files into one sequential stream. This format is designed to be stored on any Paimon-supported file system (HDFS, S3, OSS, etc.) and read back without extracting to local disk. + +### Layout + +All integers are **big-endian**. + +``` +┌─────────────────────────────────────────────────┐ +│ File Count (4 bytes, int32) │ +├─────────────────────────────────────────────────┤ +│ File Entry 1 │ +│ ┌─────────────────────────────────────────────┐│ +│ │ Name Length (4 bytes, int32) ││ +│ │ Name (N bytes, UTF-8) ││ +│ │ Data Length (8 bytes, int64) ││ +│ │ Data (M bytes, raw) ││ +│ └─────────────────────────────────────────────┘│ +├─────────────────────────────────────────────────┤ +│ File Entry 2 │ +│ ┌─────────────────────────────────────────────┐│ +│ │ Name Length (4 bytes, int32) ││ +│ │ Name (N bytes, UTF-8) ││ +│ │ Data Length (8 bytes, int64) ││ +│ │ Data (M bytes, raw) ││ +│ └─────────────────────────────────────────────┘│ +├─────────────────────────────────────────────────┤ +│ ... │ +└─────────────────────────────────────────────────┘ +``` + +### Field Details + +| Field | Size | Type | Description | +|-------------|---------|--------|------------------------------------------------| +| File Count | 4 bytes | int32 | Number of files in the archive | +| Name Length | 4 bytes | int32 | Byte length of the file name | +| Name | N bytes | UTF-8 | Tantivy segment file name (e.g. `meta.json`, `*.term`, `*.pos`, `*.store`) | +| Data Length | 8 bytes | int64 | Byte length of the file data | +| Data | M bytes | raw | Raw file content | + +### Write Path + +1. `TantivyFullTextGlobalIndexWriter` receives text values via `write(Object)`, one per row. +2. Each non-null text is passed to `TantivyIndexWriter` (JNI) as `addDocument(rowId, text)`, where `rowId` is a 0-based sequential counter. +3. On `finish()`, the Tantivy index is committed and all files in the local temp directory are packed into the archive format above. +4. The archive is written as a single file to Paimon's global index file system. +5. The local temp directory is deleted. + +### Read Path + +1. `TantivyFullTextGlobalIndexReader` opens the archive file as a `SeekableInputStream`. +2. The archive header is parsed to build a file layout table (name → offset, length). +3. A `TantivySearcher` is created with the layout and a `StreamFileInput` callback — Tantivy reads file data on demand via JNI callbacks to `seek()` + `read()` on the stream. No temp files are created. +4. Search queries are executed via Tantivy's `QueryParser` with BM25 scoring, returning `(rowId, score)` pairs. + +## Usage + +### Build Index + +```sql +CALL sys.create_global_index( + table => 'db.my_table', + index_column => 'content', + index_type => 'tantivy-fulltext' +); +``` + +### Search + +```sql +SELECT * FROM full_text_search('my_table', 'content', 'search query', 10); +``` + +### Java API + +```java +Table table = catalog.getTable(identifier); + +GlobalIndexResult result = table.newFullTextSearchBuilder() + .withQueryText("search query") + .withLimit(10) + .withTextColumn("content") + .executeLocal(); + +ReadBuilder readBuilder = table.newReadBuilder(); +TableScan.Plan plan = readBuilder.newScan() + .withGlobalIndexResult(result).plan(); +try (RecordReader<InternalRow> reader = readBuilder.newRead().createReader(plan)) { + reader.forEachRemaining(row -> System.out.println(row)); +} +``` + +## SPI Registration + +The index type `tantivy-fulltext` is registered via Java SPI: + +``` +META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory + → org.apache.paimon.tantivy.index.TantivyFullTextGlobalIndexerFactory +``` diff --git a/paimon-tantivy/paimon-tantivy-index/pom.xml b/paimon-tantivy/paimon-tantivy-index/pom.xml new file mode 100644 index 0000000000..b4cd732546 --- /dev/null +++ b/paimon-tantivy/paimon-tantivy-index/pom.xml @@ -0,0 +1,91 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>paimon-tantivy</artifactId> + <groupId>org.apache.paimon</groupId> + <version>1.5-SNAPSHOT</version> + </parent> + + <artifactId>paimon-tantivy-index</artifactId> + <name>Paimon : Tantivy Index</name> + + <dependencies> + <dependency> + <groupId>org.apache.paimon</groupId> + <artifactId>paimon-tantivy-jni</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.paimon</groupId> + <artifactId>paimon-common</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <!-- test dependencies --> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter</artifactId> + <version>${junit5.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.paimon</groupId> + <artifactId>paimon-core</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.paimon</groupId> + <artifactId>paimon-format</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.paimon</groupId> + <artifactId>paimon-test-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <forkCount>1</forkCount> + <redirectTestOutputToFile>true</redirectTestOutputToFile> + <parallel>none</parallel> + </configuration> + </plugin> + </plugins> + </build> +</project> diff --git a/paimon-tantivy/paimon-tantivy-index/src/main/java/org/apache/paimon/tantivy/index/TantivyFullTextGlobalIndexReader.java b/paimon-tantivy/paimon-tantivy-index/src/main/java/org/apache/paimon/tantivy/index/TantivyFullTextGlobalIndexReader.java new file mode 100644 index 0000000000..d45c18bd6e --- /dev/null +++ b/paimon-tantivy/paimon-tantivy-index/src/main/java/org/apache/paimon/tantivy/index/TantivyFullTextGlobalIndexReader.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.tantivy.index; + +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.globalindex.GlobalIndexIOMeta; +import org.apache.paimon.globalindex.GlobalIndexReader; +import org.apache.paimon.globalindex.GlobalIndexResult; +import org.apache.paimon.globalindex.ScoredGlobalIndexResult; +import org.apache.paimon.globalindex.io.GlobalIndexFileReader; +import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.predicate.FullTextSearch; +import org.apache.paimon.tantivy.SearchResult; +import org.apache.paimon.tantivy.StreamFileInput; +import org.apache.paimon.tantivy.TantivySearcher; +import org.apache.paimon.utils.RoaringNavigableMap64; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Optional; + +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** + * Full-text global index reader using Tantivy. + * + * <p>Reads the archive header to get file layout, then opens a Tantivy searcher backed by JNI + * callbacks to the {@link SeekableInputStream}. No temp files are created. + */ +public class TantivyFullTextGlobalIndexReader implements GlobalIndexReader { + + private final GlobalIndexIOMeta ioMeta; + private final GlobalIndexFileReader fileReader; + + private volatile TantivySearcher searcher; + private volatile SeekableInputStream openStream; + + public TantivyFullTextGlobalIndexReader( + GlobalIndexFileReader fileReader, List<GlobalIndexIOMeta> ioMetas) { + checkArgument(ioMetas.size() == 1, "Expected exactly one index file per shard"); + this.fileReader = fileReader; + this.ioMeta = ioMetas.get(0); + } + + @Override + public Optional<ScoredGlobalIndexResult> visitFullTextSearch(FullTextSearch fullTextSearch) { + try { + ensureLoaded(); + SearchResult result = + searcher.search(fullTextSearch.queryText(), fullTextSearch.limit()); + return Optional.of(toScoredResult(result)); + } catch (IOException e) { + throw new RuntimeException("Failed to search Tantivy full-text index", e); + } + } + + private ScoredGlobalIndexResult toScoredResult(SearchResult result) { + RoaringNavigableMap64 bitmap = new RoaringNavigableMap64(); + HashMap<Long, Float> id2scores = new HashMap<>(result.size()); + for (int i = 0; i < result.size(); i++) { + long rowId = result.getRowIds()[i]; + bitmap.add(rowId); + id2scores.put(rowId, result.getScores()[i]); + } + return new TantivyScoredGlobalIndexResult(bitmap, id2scores); + } + + private void ensureLoaded() throws IOException { + if (searcher == null) { + synchronized (this) { + if (searcher == null) { + SeekableInputStream in = fileReader.getInputStream(ioMeta); + try { + ArchiveLayout layout = parseArchiveHeader(in); + StreamFileInput streamInput = new SynchronizedStreamFileInput(in); + searcher = + new TantivySearcher( + layout.fileNames, + layout.fileOffsets, + layout.fileLengths, + streamInput); + openStream = in; + } catch (Exception e) { + in.close(); + throw e; + } + } + } + } + } + + /** + * Parse the archive header to extract file names, offsets, and lengths. The archive format is: + * [fileCount(4)] then for each file: [nameLen(4)] [name(utf8)] [dataLen(8)] [data]. + * + * <p>This method reads the header sequentially and computes the absolute byte offset of each + * file's data within the stream. + */ + private static ArchiveLayout parseArchiveHeader(SeekableInputStream in) throws IOException { + int fileCount = readInt(in); + List<String> names = new ArrayList<>(fileCount); + List<Long> offsets = new ArrayList<>(fileCount); + List<Long> lengths = new ArrayList<>(fileCount); + + for (int i = 0; i < fileCount; i++) { + int nameLen = readInt(in); + byte[] nameBytes = new byte[nameLen]; + readFully(in, nameBytes); + names.add(new String(nameBytes, StandardCharsets.UTF_8)); + + long dataLen = readLong(in); + long dataOffset = in.getPos(); + offsets.add(dataOffset); + lengths.add(dataLen); + + // Skip past the file data + in.seek(dataOffset + dataLen); + } + + return new ArchiveLayout( + names.toArray(new String[0]), + offsets.stream().mapToLong(Long::longValue).toArray(), + lengths.stream().mapToLong(Long::longValue).toArray()); + } + + private static int readInt(SeekableInputStream in) throws IOException { + int b1 = in.read(); + int b2 = in.read(); + int b3 = in.read(); + int b4 = in.read(); + if ((b1 | b2 | b3 | b4) < 0) { + throw new IOException("Unexpected end of stream"); + } + return (b1 << 24) | (b2 << 16) | (b3 << 8) | b4; + } + + private static long readLong(SeekableInputStream in) throws IOException { + return ((long) readInt(in) << 32) | (readInt(in) & 0xFFFFFFFFL); + } + + private static void readFully(SeekableInputStream in, byte[] buf) throws IOException { + int off = 0; + while (off < buf.length) { + int read = in.read(buf, off, buf.length - off); + if (read == -1) { + throw new IOException("Unexpected end of stream"); + } + off += read; + } + } + + @Override + public void close() throws IOException { + Throwable firstException = null; + + if (searcher != null) { + try { + searcher.close(); + } catch (Throwable t) { + firstException = t; + } + searcher = null; + } + + if (openStream != null) { + try { + openStream.close(); + } catch (Throwable t) { + if (firstException == null) { + firstException = t; + } else { + firstException.addSuppressed(t); + } + } + openStream = null; + } + + if (firstException != null) { + if (firstException instanceof IOException) { + throw (IOException) firstException; + } + throw new RuntimeException("Failed to close Tantivy reader", firstException); + } + } + + // =================== unsupported ===================== + + @Override + public Optional<GlobalIndexResult> visitIsNotNull(FieldRef fieldRef) { + return Optional.empty(); + } + + @Override + public Optional<GlobalIndexResult> visitIsNull(FieldRef fieldRef) { + return Optional.empty(); + } + + @Override + public Optional<GlobalIndexResult> visitStartsWith(FieldRef fieldRef, Object literal) { + return Optional.empty(); + } + + @Override + public Optional<GlobalIndexResult> visitEndsWith(FieldRef fieldRef, Object literal) { + return Optional.empty(); + } + + @Override + public Optional<GlobalIndexResult> visitContains(FieldRef fieldRef, Object literal) { + return Optional.empty(); + } + + @Override + public Optional<GlobalIndexResult> visitLike(FieldRef fieldRef, Object literal) { + return Optional.empty(); + } + + @Override + public Optional<GlobalIndexResult> visitLessThan(FieldRef fieldRef, Object literal) { + return Optional.empty(); + } + + @Override + public Optional<GlobalIndexResult> visitGreaterOrEqual(FieldRef fieldRef, Object literal) { + return Optional.empty(); + } + + @Override + public Optional<GlobalIndexResult> visitNotEqual(FieldRef fieldRef, Object literal) { + return Optional.empty(); + } + + @Override + public Optional<GlobalIndexResult> visitLessOrEqual(FieldRef fieldRef, Object literal) { + return Optional.empty(); + } + + @Override + public Optional<GlobalIndexResult> visitEqual(FieldRef fieldRef, Object literal) { + return Optional.empty(); + } + + @Override + public Optional<GlobalIndexResult> visitGreaterThan(FieldRef fieldRef, Object literal) { + return Optional.empty(); + } + + @Override + public Optional<GlobalIndexResult> visitIn(FieldRef fieldRef, List<Object> literals) { + return Optional.empty(); + } + + @Override + public Optional<GlobalIndexResult> visitNotIn(FieldRef fieldRef, List<Object> literals) { + return Optional.empty(); + } + + /** Parsed archive layout: file names with their offsets and lengths in the stream. */ + private static class ArchiveLayout { + final String[] fileNames; + final long[] fileOffsets; + final long[] fileLengths; + + ArchiveLayout(String[] fileNames, long[] fileOffsets, long[] fileLengths) { + this.fileNames = fileNames; + this.fileOffsets = fileOffsets; + this.fileLengths = fileLengths; + } + } + + /** + * Thread-safe wrapper around {@link SeekableInputStream} implementing {@link StreamFileInput}. + * Rust JNI holds a Mutex across seek+read to prevent interleaving from concurrent threads. + */ + private static class SynchronizedStreamFileInput implements StreamFileInput { + private final SeekableInputStream in; + + SynchronizedStreamFileInput(SeekableInputStream in) { + this.in = in; + } + + @Override + public synchronized void seek(long position) throws IOException { + in.seek(position); + } + + @Override + public synchronized int read(byte[] buf, int off, int len) throws IOException { + return in.read(buf, off, len); + } + } +} diff --git a/paimon-tantivy/paimon-tantivy-index/src/main/java/org/apache/paimon/tantivy/index/TantivyFullTextGlobalIndexWriter.java b/paimon-tantivy/paimon-tantivy-index/src/main/java/org/apache/paimon/tantivy/index/TantivyFullTextGlobalIndexWriter.java new file mode 100644 index 0000000000..6def9cb69e --- /dev/null +++ b/paimon-tantivy/paimon-tantivy-index/src/main/java/org/apache/paimon/tantivy/index/TantivyFullTextGlobalIndexWriter.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.tantivy.index; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.globalindex.GlobalIndexSingletonWriter; +import org.apache.paimon.globalindex.ResultEntry; +import org.apache.paimon.globalindex.io.GlobalIndexFileWriter; +import org.apache.paimon.tantivy.TantivyIndexWriter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Full-text global index writer using Tantivy. + * + * <p>Text data is written to a local Tantivy index via JNI. On {@link #finish()}, the index + * directory is packed into a single file and written to the global index file system. + */ +public class TantivyFullTextGlobalIndexWriter implements GlobalIndexSingletonWriter, Closeable { + + private static final String FILE_NAME_PREFIX = "tantivy"; + private static final Logger LOG = + LoggerFactory.getLogger(TantivyFullTextGlobalIndexWriter.class); + + private final GlobalIndexFileWriter fileWriter; + private File tempIndexDir; + private TantivyIndexWriter writer; + private long rowId; + private boolean closed; + + public TantivyFullTextGlobalIndexWriter(GlobalIndexFileWriter fileWriter) { + this.fileWriter = fileWriter; + this.rowId = 0; + this.closed = false; + + try { + this.tempIndexDir = Files.createTempDirectory("tantivy-index-").toFile(); + this.tempIndexDir.deleteOnExit(); + this.writer = new TantivyIndexWriter(tempIndexDir.getAbsolutePath()); + } catch (IOException e) { + throw new RuntimeException("Failed to create temp index directory", e); + } + } + + @Override + public void write(Object fieldData) { + if (fieldData == null) { + rowId++; + return; + } + + String text; + if (fieldData instanceof BinaryString) { + text = fieldData.toString(); + } else if (fieldData instanceof String) { + text = (String) fieldData; + } else { + throw new IllegalArgumentException( + "Unsupported field type: " + fieldData.getClass().getName()); + } + + writer.addDocument(rowId, text); + rowId++; + } + + @Override + public List<ResultEntry> finish() { + try { + if (rowId == 0) { + return Collections.emptyList(); + } + + writer.commit(); + writer.close(); + writer = null; + + return Collections.singletonList(packIndex()); + } catch (IOException e) { + throw new RuntimeException("Failed to write Tantivy full-text global index", e); + } finally { + if (writer != null) { + writer.close(); + writer = null; + } + closed = true; + deleteTempDir(); + } + } + + private ResultEntry packIndex() throws IOException { + LOG.info("Packing Tantivy index: {} documents", rowId); + + String fileName = fileWriter.newFileName(FILE_NAME_PREFIX); + try (PositionOutputStream out = fileWriter.newOutputStream(fileName)) { + // Write all files in the index directory as a simple archive: + // For each file: [nameLen(4)] [name(utf8)] [dataLen(8)] [data] + File[] allFiles = tempIndexDir.listFiles(); + if (allFiles == null) { + throw new IOException("Index directory is empty"); + } + + // Filter to regular files only before writing count + List<File> indexFiles = new ArrayList<>(); + for (File file : allFiles) { + if (file.isFile()) { + indexFiles.add(file); + } + } + + // Write file count + writeInt(out, indexFiles.size()); + + for (File file : indexFiles) { + byte[] nameBytes = file.getName().getBytes(StandardCharsets.UTF_8); + long fileLen = file.length(); + + writeInt(out, nameBytes.length); + out.write(nameBytes); + writeLong(out, fileLen); + + try (FileInputStream fis = new FileInputStream(file)) { + byte[] buf = new byte[8192]; + int read; + while ((read = fis.read(buf)) != -1) { + out.write(buf, 0, read); + } + } + } + out.flush(); + } + + LOG.info("Tantivy index packed: {} documents", rowId); + return new ResultEntry(fileName, rowId, null); + } + + private static void writeInt(PositionOutputStream out, int value) throws IOException { + out.write((value >>> 24) & 0xFF); + out.write((value >>> 16) & 0xFF); + out.write((value >>> 8) & 0xFF); + out.write(value & 0xFF); + } + + private static void writeLong(PositionOutputStream out, long value) throws IOException { + writeInt(out, (int) (value >>> 32)); + writeInt(out, (int) value); + } + + private void deleteTempDir() { + if (tempIndexDir != null) { + try { + Files.walkFileTree( + tempIndexDir.toPath(), + new SimpleFileVisitor<Path>() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) + throws IOException { + Files.delete(file); + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult postVisitDirectory(Path dir, IOException exc) + throws IOException { + Files.delete(dir); + return FileVisitResult.CONTINUE; + } + }); + } catch (IOException ignored) { + } + tempIndexDir = null; + } + } + + @Override + public void close() { + if (!closed) { + closed = true; + if (writer != null) { + writer.close(); + writer = null; + } + deleteTempDir(); + } + } +} diff --git a/paimon-tantivy/paimon-tantivy-index/src/main/java/org/apache/paimon/tantivy/index/TantivyFullTextGlobalIndexer.java b/paimon-tantivy/paimon-tantivy-index/src/main/java/org/apache/paimon/tantivy/index/TantivyFullTextGlobalIndexer.java new file mode 100644 index 0000000000..e8d882acae --- /dev/null +++ b/paimon-tantivy/paimon-tantivy-index/src/main/java/org/apache/paimon/tantivy/index/TantivyFullTextGlobalIndexer.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.tantivy.index; + +import org.apache.paimon.globalindex.GlobalIndexIOMeta; +import org.apache.paimon.globalindex.GlobalIndexReader; +import org.apache.paimon.globalindex.GlobalIndexWriter; +import org.apache.paimon.globalindex.GlobalIndexer; +import org.apache.paimon.globalindex.io.GlobalIndexFileReader; +import org.apache.paimon.globalindex.io.GlobalIndexFileWriter; + +import java.util.List; + +/** Tantivy full-text global indexer. */ +public class TantivyFullTextGlobalIndexer implements GlobalIndexer { + + public TantivyFullTextGlobalIndexer() {} + + @Override + public GlobalIndexWriter createWriter(GlobalIndexFileWriter fileWriter) { + return new TantivyFullTextGlobalIndexWriter(fileWriter); + } + + @Override + public GlobalIndexReader createReader( + GlobalIndexFileReader fileReader, List<GlobalIndexIOMeta> files) { + return new TantivyFullTextGlobalIndexReader(fileReader, files); + } +} diff --git a/paimon-tantivy/paimon-tantivy-index/src/main/java/org/apache/paimon/tantivy/index/TantivyFullTextGlobalIndexerFactory.java b/paimon-tantivy/paimon-tantivy-index/src/main/java/org/apache/paimon/tantivy/index/TantivyFullTextGlobalIndexerFactory.java new file mode 100644 index 0000000000..d4e6febc15 --- /dev/null +++ b/paimon-tantivy/paimon-tantivy-index/src/main/java/org/apache/paimon/tantivy/index/TantivyFullTextGlobalIndexerFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.tantivy.index; + +import org.apache.paimon.globalindex.GlobalIndexer; +import org.apache.paimon.globalindex.GlobalIndexerFactory; +import org.apache.paimon.options.Options; +import org.apache.paimon.types.DataField; + +/** Factory for creating Tantivy full-text index. */ +public class TantivyFullTextGlobalIndexerFactory implements GlobalIndexerFactory { + + public static final String IDENTIFIER = "tantivy-fulltext"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public GlobalIndexer create(DataField field, Options options) { + return new TantivyFullTextGlobalIndexer(); + } +} diff --git a/paimon-tantivy/paimon-tantivy-index/src/main/java/org/apache/paimon/tantivy/index/TantivyScoredGlobalIndexResult.java b/paimon-tantivy/paimon-tantivy-index/src/main/java/org/apache/paimon/tantivy/index/TantivyScoredGlobalIndexResult.java new file mode 100644 index 0000000000..758e583437 --- /dev/null +++ b/paimon-tantivy/paimon-tantivy-index/src/main/java/org/apache/paimon/tantivy/index/TantivyScoredGlobalIndexResult.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.tantivy.index; + +import org.apache.paimon.globalindex.ScoreGetter; +import org.apache.paimon.globalindex.ScoredGlobalIndexResult; +import org.apache.paimon.utils.RoaringNavigableMap64; + +import java.util.Map; + +/** Full-text search global index result for Tantivy. */ +public class TantivyScoredGlobalIndexResult implements ScoredGlobalIndexResult { + + private final RoaringNavigableMap64 results; + private final Map<Long, Float> id2scores; + + public TantivyScoredGlobalIndexResult( + RoaringNavigableMap64 results, Map<Long, Float> id2scores) { + this.results = results; + this.id2scores = id2scores; + } + + @Override + public ScoreGetter scoreGetter() { + return rowId -> { + Float score = id2scores.get(rowId); + if (score == null) { + throw new IllegalArgumentException( + "No score found for rowId: " + + rowId + + ". Only rowIds present in results() are valid."); + } + return score; + }; + } + + @Override + public RoaringNavigableMap64 results() { + return results; + } +} diff --git a/paimon-tantivy/paimon-tantivy-index/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory b/paimon-tantivy/paimon-tantivy-index/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory new file mode 100644 index 0000000000..f9f321648e --- /dev/null +++ b/paimon-tantivy/paimon-tantivy-index/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.paimon.tantivy.index.TantivyFullTextGlobalIndexerFactory diff --git a/paimon-tantivy/paimon-tantivy-index/src/test/java/org/apache/paimon/tantivy/index/TantivyFullTextGlobalIndexTest.java b/paimon-tantivy/paimon-tantivy-index/src/test/java/org/apache/paimon/tantivy/index/TantivyFullTextGlobalIndexTest.java new file mode 100644 index 0000000000..beb92ee0a2 --- /dev/null +++ b/paimon-tantivy/paimon-tantivy-index/src/test/java/org/apache/paimon/tantivy/index/TantivyFullTextGlobalIndexTest.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.tantivy.index; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.globalindex.GlobalIndexIOMeta; +import org.apache.paimon.globalindex.ResultEntry; +import org.apache.paimon.globalindex.ScoredGlobalIndexResult; +import org.apache.paimon.globalindex.io.GlobalIndexFileReader; +import org.apache.paimon.globalindex.io.GlobalIndexFileWriter; +import org.apache.paimon.predicate.FullTextSearch; +import org.apache.paimon.tantivy.NativeLoader; +import org.apache.paimon.utils.RoaringNavigableMap64; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +/** + * Test for {@link TantivyFullTextGlobalIndexWriter} and {@link TantivyFullTextGlobalIndexReader}. + */ +public class TantivyFullTextGlobalIndexTest { + + @BeforeAll + static void checkNativeLibrary() { + assumeTrue(isNativeAvailable(), "Tantivy native library not available, skipping tests"); + } + + private static boolean isNativeAvailable() { + try { + NativeLoader.loadJni(); + return true; + } catch (Throwable t) { + return false; + } + } + + @TempDir java.nio.file.Path tempDir; + + private FileIO fileIO; + private Path indexPath; + + @BeforeEach + public void setup() { + fileIO = new LocalFileIO(); + indexPath = new Path(tempDir.toString()); + } + + @AfterEach + public void cleanup() throws IOException { + if (fileIO != null) { + fileIO.delete(indexPath, true); + } + } + + private GlobalIndexFileWriter createFileWriter(Path path) { + return new GlobalIndexFileWriter() { + @Override + public String newFileName(String prefix) { + return prefix + "-" + UUID.randomUUID(); + } + + @Override + public PositionOutputStream newOutputStream(String fileName) throws IOException { + return fileIO.newOutputStream(new Path(path, fileName), false); + } + }; + } + + private GlobalIndexFileReader createFileReader() { + return meta -> fileIO.newInputStream(meta.filePath()); + } + + private List<GlobalIndexIOMeta> toIOMetas(List<ResultEntry> results, Path path) + throws IOException { + assertThat(results).hasSize(1); + ResultEntry result = results.get(0); + Path filePath = new Path(path, result.fileName()); + return Collections.singletonList( + new GlobalIndexIOMeta(filePath, fileIO.getFileSize(filePath), result.meta())); + } + + @Test + public void testEndToEnd() throws IOException { + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + TantivyFullTextGlobalIndexWriter writer = new TantivyFullTextGlobalIndexWriter(fileWriter); + + writer.write(BinaryString.fromString("Apache Paimon is a streaming data lake platform")); + writer.write(BinaryString.fromString("Tantivy is a full-text search engine in Rust")); + writer.write(BinaryString.fromString("Paimon supports real-time data ingestion")); + + List<ResultEntry> results = writer.finish(); + assertThat(results).hasSize(1); + assertThat(results.get(0).rowCount()).isEqualTo(3); + + List<GlobalIndexIOMeta> metas = toIOMetas(results, indexPath); + GlobalIndexFileReader fileReader = createFileReader(); + + try (TantivyFullTextGlobalIndexReader reader = + new TantivyFullTextGlobalIndexReader(fileReader, metas)) { + FullTextSearch search = new FullTextSearch("paimon", 10, "text"); + Optional<ScoredGlobalIndexResult> searchResult = reader.visitFullTextSearch(search); + assertThat(searchResult).isPresent(); + + ScoredGlobalIndexResult scored = searchResult.get(); + RoaringNavigableMap64 rowIds = scored.results(); + // Row 0 and row 2 mention "paimon" + assertThat(rowIds.getLongCardinality()).isEqualTo(2); + assertThat(rowIds.contains(0L)).isTrue(); + assertThat(rowIds.contains(2L)).isTrue(); + + // Scores should be positive + float score0 = scored.scoreGetter().score(0L); + float score2 = scored.scoreGetter().score(2L); + assertThat(score0).isGreaterThan(0); + assertThat(score2).isGreaterThan(0); + } + } + + @Test + public void testSearchNoResults() throws IOException { + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + TantivyFullTextGlobalIndexWriter writer = new TantivyFullTextGlobalIndexWriter(fileWriter); + + writer.write(BinaryString.fromString("Hello world")); + writer.write(BinaryString.fromString("Foo bar baz")); + + List<ResultEntry> results = writer.finish(); + List<GlobalIndexIOMeta> metas = toIOMetas(results, indexPath); + GlobalIndexFileReader fileReader = createFileReader(); + + try (TantivyFullTextGlobalIndexReader reader = + new TantivyFullTextGlobalIndexReader(fileReader, metas)) { + FullTextSearch search = new FullTextSearch("nonexistent", 10, "text"); + Optional<ScoredGlobalIndexResult> searchResult = reader.visitFullTextSearch(search); + assertThat(searchResult).isPresent(); + + RoaringNavigableMap64 rowIds = searchResult.get().results(); + assertThat(rowIds.getLongCardinality()).isEqualTo(0); + } + } + + @Test + public void testNullFieldSkipped() throws IOException { + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + TantivyFullTextGlobalIndexWriter writer = new TantivyFullTextGlobalIndexWriter(fileWriter); + + writer.write(BinaryString.fromString("Paimon data lake")); + writer.write(null); // row 1 is null, should be skipped + writer.write(BinaryString.fromString("Paimon streaming")); + + List<ResultEntry> results = writer.finish(); + assertThat(results.get(0).rowCount()).isEqualTo(3); + + List<GlobalIndexIOMeta> metas = toIOMetas(results, indexPath); + GlobalIndexFileReader fileReader = createFileReader(); + + try (TantivyFullTextGlobalIndexReader reader = + new TantivyFullTextGlobalIndexReader(fileReader, metas)) { + FullTextSearch search = new FullTextSearch("paimon", 10, "text"); + Optional<ScoredGlobalIndexResult> searchResult = reader.visitFullTextSearch(search); + assertThat(searchResult).isPresent(); + + ScoredGlobalIndexResult scored = searchResult.get(); + RoaringNavigableMap64 rowIds = scored.results(); + // Row 0 and row 2 match, row 1 was null + assertThat(rowIds.getLongCardinality()).isEqualTo(2); + assertThat(rowIds.contains(0L)).isTrue(); + assertThat(rowIds.contains(1L)).isFalse(); + assertThat(rowIds.contains(2L)).isTrue(); + } + } + + @Test + public void testEmptyIndex() { + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + TantivyFullTextGlobalIndexWriter writer = new TantivyFullTextGlobalIndexWriter(fileWriter); + + List<ResultEntry> results = writer.finish(); + assertThat(results).isEmpty(); + } + + @Test + public void testLargeDataset() throws IOException { + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + TantivyFullTextGlobalIndexWriter writer = new TantivyFullTextGlobalIndexWriter(fileWriter); + + int numDocs = 500; + for (int i = 0; i < numDocs; i++) { + String text = "document number " + i + " with some searchable content"; + if (i % 10 == 0) { + text += " special_keyword"; + } + writer.write(BinaryString.fromString(text)); + } + + List<ResultEntry> results = writer.finish(); + assertThat(results.get(0).rowCount()).isEqualTo(numDocs); + + List<GlobalIndexIOMeta> metas = toIOMetas(results, indexPath); + GlobalIndexFileReader fileReader = createFileReader(); + + try (TantivyFullTextGlobalIndexReader reader = + new TantivyFullTextGlobalIndexReader(fileReader, metas)) { + // Search for the special keyword — should match every 10th doc + FullTextSearch search = new FullTextSearch("special_keyword", 1000, "text"); + Optional<ScoredGlobalIndexResult> searchResult = reader.visitFullTextSearch(search); + assertThat(searchResult).isPresent(); + + ScoredGlobalIndexResult scored = searchResult.get(); + RoaringNavigableMap64 rowIds = scored.results(); + assertThat(rowIds.getLongCardinality()).isEqualTo(50); + assertThat(rowIds.contains(0L)).isTrue(); + assertThat(rowIds.contains(10L)).isTrue(); + assertThat(rowIds.contains(490L)).isTrue(); + assertThat(rowIds.contains(1L)).isFalse(); + } + } + + @Test + public void testLimitRespected() throws IOException { + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + TantivyFullTextGlobalIndexWriter writer = new TantivyFullTextGlobalIndexWriter(fileWriter); + + for (int i = 0; i < 20; i++) { + writer.write(BinaryString.fromString("paimon document " + i)); + } + + List<ResultEntry> results = writer.finish(); + List<GlobalIndexIOMeta> metas = toIOMetas(results, indexPath); + GlobalIndexFileReader fileReader = createFileReader(); + + try (TantivyFullTextGlobalIndexReader reader = + new TantivyFullTextGlobalIndexReader(fileReader, metas)) { + // Limit to 5 results + FullTextSearch search = new FullTextSearch("paimon", 5, "text"); + Optional<ScoredGlobalIndexResult> searchResult = reader.visitFullTextSearch(search); + assertThat(searchResult).isPresent(); + + RoaringNavigableMap64 rowIds = searchResult.get().results(); + assertThat(rowIds.getLongCardinality()).isEqualTo(5); + } + } + + @Test + public void testViaIndexer() throws IOException { + TantivyFullTextGlobalIndexer indexer = new TantivyFullTextGlobalIndexer(); + + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + TantivyFullTextGlobalIndexWriter writer = + (TantivyFullTextGlobalIndexWriter) indexer.createWriter(fileWriter); + + writer.write(BinaryString.fromString("test via indexer factory")); + List<ResultEntry> results = writer.finish(); + assertThat(results).hasSize(1); + + List<GlobalIndexIOMeta> metas = toIOMetas(results, indexPath); + GlobalIndexFileReader fileReader = createFileReader(); + + try (TantivyFullTextGlobalIndexReader reader = + (TantivyFullTextGlobalIndexReader) indexer.createReader(fileReader, metas)) { + FullTextSearch search = new FullTextSearch("indexer", 10, "text"); + Optional<ScoredGlobalIndexResult> searchResult = reader.visitFullTextSearch(search); + assertThat(searchResult).isPresent(); + assertThat(searchResult.get().results().getLongCardinality()).isEqualTo(1); + assertThat(searchResult.get().results().contains(0L)).isTrue(); + } + } +} diff --git a/paimon-tantivy/paimon-tantivy-jni/README.md b/paimon-tantivy/paimon-tantivy-jni/README.md new file mode 100644 index 0000000000..f7bf86b800 --- /dev/null +++ b/paimon-tantivy/paimon-tantivy-jni/README.md @@ -0,0 +1,60 @@ +# Paimon Tantivy JNI + +JNI wrapper around [Tantivy](https://github.com/quickwit-oss/tantivy) full-text search engine. Fixed schema: `rowId` (long) + `text` (String). Search returns top-N results with scores for ranking. + +## Build + +### 1. Build the Rust native library + +```bash +cd rust +cargo build --release +``` + +Output artifacts in `rust/target/release/`: +- macOS: `libtantivy_jni.dylib` +- Linux: `libtantivy_jni.so` + +### 2. Copy native library to resources + +```bash +# macOS (Apple Silicon) +mkdir -p src/main/resources/native/darwin-aarch64 +cp rust/target/release/libtantivy_jni.dylib src/main/resources/native/darwin-aarch64/ + +# macOS (Intel) +mkdir -p src/main/resources/native/darwin-x86_64 +cp rust/target/release/libtantivy_jni.dylib src/main/resources/native/darwin-x86_64/ + +# Linux (x86_64) +mkdir -p src/main/resources/native/linux-amd64 +cp rust/target/release/libtantivy_jni.so src/main/resources/native/linux-amd64/ +``` + +### 3. Build the Java module + +```bash +# From the project root +mvn compile -pl paimon-tantivy/paimon-tantivy-jni -am +``` + +## Usage + +```java +// Create index and write documents +try (TantivyIndexWriter writer = new TantivyIndexWriter("/tmp/my_index")) { + writer.addDocument(1L, "Apache Paimon is a streaming data lake platform"); + writer.addDocument(2L, "Tantivy is a full-text search engine written in Rust"); + writer.addDocument(3L, "Paimon supports real-time data ingestion"); + writer.commit(); +} + +// Search — returns (rowId, score) pairs ranked by relevance +try (TantivySearcher searcher = new TantivySearcher("/tmp/my_index")) { + SearchResult result = searcher.search("paimon", 10); + for (int i = 0; i < result.size(); i++) { + System.out.println("rowId=" + result.getRowIds()[i] + + " score=" + result.getScores()[i]); + } +} +``` diff --git a/paimon-tantivy/paimon-tantivy-jni/pom.xml b/paimon-tantivy/paimon-tantivy-jni/pom.xml new file mode 100644 index 0000000000..4277b6a592 --- /dev/null +++ b/paimon-tantivy/paimon-tantivy-jni/pom.xml @@ -0,0 +1,69 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>paimon-tantivy</artifactId> + <groupId>org.apache.paimon</groupId> + <version>1.5-SNAPSHOT</version> + </parent> + + <artifactId>paimon-tantivy-jni</artifactId> + <name>Paimon : Tantivy JNI</name> + + <properties> + <target.java.version>1.8</target.java.version> + <spotless.check.skip>true</spotless.check.skip> + <spotless.apply.skip>true</spotless.apply.skip> + <checkstyle.skip>true</checkstyle.skip> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.paimon</groupId> + <artifactId>paimon-shade-guava-30</artifactId> + <version>${paimon.shade.guava.version}-${paimon.shade.version}</version> + </dependency> + + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter</artifactId> + <version>${junit5.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <forkCount>1</forkCount> + <redirectTestOutputToFile>true</redirectTestOutputToFile> + <parallel>none</parallel> + </configuration> + </plugin> + </plugins> + </build> +</project> diff --git a/paimon-tantivy/paimon-tantivy-jni/rust/Cargo.toml b/paimon-tantivy/paimon-tantivy-jni/rust/Cargo.toml new file mode 100644 index 0000000000..a064ad8dd4 --- /dev/null +++ b/paimon-tantivy/paimon-tantivy-jni/rust/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "tantivy-jni" +version = "0.1.0" +edition = "2021" + +[lib] +name = "tantivy_jni" +crate-type = ["cdylib"] + +[dependencies] +jni = "0.21" +tantivy = "0.22" +serde_json = "1" diff --git a/paimon-tantivy/paimon-tantivy-jni/rust/src/jni_directory.rs b/paimon-tantivy/paimon-tantivy-jni/rust/src/jni_directory.rs new file mode 100644 index 0000000000..ee667a68c3 --- /dev/null +++ b/paimon-tantivy/paimon-tantivy-jni/rust/src/jni_directory.rs @@ -0,0 +1,259 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use jni::objects::GlobalRef; +use jni::JavaVM; +use std::collections::HashMap; +use std::fmt; +use std::io; +use std::ops::Range; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; +use tantivy::directory::error::{DeleteError, LockError, OpenReadError, OpenWriteError}; +use tantivy::directory::{ + DirectoryLock, FileHandle, Lock, OwnedBytes, WatchCallback, WatchHandle, WritePtr, +}; +use tantivy::directory::{Directory, TerminatingWrite, AntiCallToken}; +use tantivy::HasLen; + +/// File metadata within the archive: offset and length in the stream. +#[derive(Clone, Debug)] +struct FileMeta { + offset: u64, + length: u64, +} + +/// A read-only Tantivy Directory backed by JNI callbacks to a Java SeekableInputStream. +/// +/// The archive is a single stream containing multiple files. Each file's position +/// (offset, length) is known upfront. Reads are dispatched via JNI to the Java side. +#[derive(Clone)] +pub struct JniDirectory { + jvm: Arc<JavaVM>, + stream_ref: Arc<GlobalRef>, + files: Arc<HashMap<PathBuf, FileMeta>>, + /// meta.json content cached after atomic_write + atomic_data: Arc<Mutex<HashMap<PathBuf, Vec<u8>>>>, + /// Mutex to protect seek+read atomicity on the shared stream + stream_lock: Arc<Mutex<()>>, +} + +impl JniDirectory { + pub fn new( + jvm: JavaVM, + stream_ref: GlobalRef, + file_entries: Vec<(String, u64, u64)>, + ) -> Self { + let mut files = HashMap::new(); + for (name, offset, length) in file_entries { + files.insert( + PathBuf::from(&name), + FileMeta { offset, length }, + ); + } + JniDirectory { + jvm: Arc::new(jvm), + stream_ref: Arc::new(stream_ref), + files: Arc::new(files), + atomic_data: Arc::new(Mutex::new(HashMap::new())), + stream_lock: Arc::new(Mutex::new(())), + } + } + + /// Read bytes from the Java stream at a given absolute position. + /// The stream_lock ensures seek+read is atomic across concurrent Rust threads. + fn read_from_stream(&self, position: u64, length: usize) -> io::Result<Vec<u8>> { + let _guard = self.stream_lock.lock().map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("Stream lock poisoned: {}", e)) + })?; + + let mut env = self + .jvm + .attach_current_thread() + .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("JNI attach failed: {}", e)))?; + + // Call seek(long) + env.call_method(self.stream_ref.as_obj(), "seek", "(J)V", &[jni::objects::JValue::Long(position as i64)]) + .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("JNI seek failed: {}", e)))?; + + // Create byte array and call read(byte[], int, int) + let buf = env + .new_byte_array(length as i32) + .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("JNI new_byte_array failed: {}", e)))?; + + let mut total_read = 0i32; + while (total_read as usize) < length { + let remaining = length as i32 - total_read; + let n = env + .call_method( + self.stream_ref.as_obj(), + "read", + "([BII)I", + &[ + jni::objects::JValue::Object(&buf), + jni::objects::JValue::Int(total_read), + jni::objects::JValue::Int(remaining), + ], + ) + .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("JNI read failed: {}", e)))? + .i() + .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("JNI read return type: {}", e)))?; + + if n <= 0 { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + format!("Unexpected EOF: read {} of {} bytes", total_read, length), + )); + } + total_read += n; + } + + let mut result = vec![0i8; length]; + env.get_byte_array_region(&buf, 0, &mut result) + .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("JNI get_byte_array_region: {}", e)))?; + + // Safe: i8 and u8 have the same layout + let result: Vec<u8> = result.into_iter().map(|b| b as u8).collect(); + Ok(result) + } +} + +impl fmt::Debug for JniDirectory { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("JniDirectory") + .field("files", &self.files.keys().collect::<Vec<_>>()) + .finish() + } +} + +/// A FileHandle backed by JNI stream reads for a single file within the archive. +#[derive(Clone)] +struct JniFileHandle { + directory: JniDirectory, + file_offset: u64, + file_length: usize, +} + +impl fmt::Debug for JniFileHandle { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("JniFileHandle") + .field("offset", &self.file_offset) + .field("length", &self.file_length) + .finish() + } +} + +impl HasLen for JniFileHandle { + fn len(&self) -> usize { + self.file_length + } +} + +impl FileHandle for JniFileHandle { + fn read_bytes(&self, range: Range<usize>) -> io::Result<OwnedBytes> { + let start = self.file_offset + range.start as u64; + let length = range.end - range.start; + let data = self.directory.read_from_stream(start, length)?; + Ok(OwnedBytes::new(data)) + } +} + +impl Directory for JniDirectory { + fn get_file_handle(&self, path: &Path) -> Result<Arc<dyn FileHandle>, OpenReadError> { + let meta = self + .files + .get(path) + .ok_or_else(|| OpenReadError::FileDoesNotExist(path.to_path_buf()))?; + + Ok(Arc::new(JniFileHandle { + directory: self.clone(), + file_offset: meta.offset, + file_length: meta.length as usize, + })) + } + + fn exists(&self, path: &Path) -> Result<bool, OpenReadError> { + Ok(self.files.contains_key(path) || self.atomic_data.lock().unwrap().contains_key(path)) + } + + fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> { + // Check in-memory atomic data first + if let Some(data) = self.atomic_data.lock().unwrap().get(path) { + return Ok(data.clone()); + } + // Fall back to archive + let meta = self + .files + .get(path) + .ok_or_else(|| OpenReadError::FileDoesNotExist(path.to_path_buf()))?; + self.read_from_stream(meta.offset, meta.length as usize) + .map_err(|e| OpenReadError::wrap_io_error(e, path.to_path_buf())) + } + + fn atomic_write(&self, path: &Path, data: &[u8]) -> io::Result<()> { + self.atomic_data + .lock() + .unwrap() + .insert(path.to_path_buf(), data.to_vec()); + Ok(()) + } + + // --- Read-only: the following are no-ops or unsupported --- + + fn delete(&self, _path: &Path) -> Result<(), DeleteError> { + Ok(()) + } + + fn open_write(&self, _path: &Path) -> Result<WritePtr, OpenWriteError> { + // Tantivy needs this for lock files; provide a dummy writer + let buf: Vec<u8> = Vec::new(); + Ok(io::BufWriter::new(Box::new(VecTerminatingWrite(buf)))) + } + + fn sync_directory(&self) -> io::Result<()> { + Ok(()) + } + + fn acquire_lock(&self, _lock: &Lock) -> Result<DirectoryLock, LockError> { + // Read-only: no locking needed + Ok(DirectoryLock::from(Box::new(()))) + } + + fn watch(&self, _watch_callback: WatchCallback) -> tantivy::Result<WatchHandle> { + Ok(WatchHandle::empty()) + } +} + +/// A dummy writer that implements TerminatingWrite for lock file support. +struct VecTerminatingWrite(Vec<u8>); + +impl io::Write for VecTerminatingWrite { + fn write(&mut self, buf: &[u8]) -> io::Result<usize> { + self.0.extend_from_slice(buf); + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl TerminatingWrite for VecTerminatingWrite { + fn terminate_ref(&mut self, _token: AntiCallToken) -> io::Result<()> { + Ok(()) + } +} diff --git a/paimon-tantivy/paimon-tantivy-jni/rust/src/lib.rs b/paimon-tantivy/paimon-tantivy-jni/rust/src/lib.rs new file mode 100644 index 0000000000..f768a0d8a6 --- /dev/null +++ b/paimon-tantivy/paimon-tantivy-jni/rust/src/lib.rs @@ -0,0 +1,358 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod jni_directory; + +use jni::objects::{JClass, JObject, JString, JValue}; +use jni::sys::{jfloat, jint, jlong, jobject}; +use jni::JNIEnv; +use std::ptr; +use tantivy::collector::TopDocs; +use tantivy::query::QueryParser; +use tantivy::schema::{Field, NumericOptions, Schema, TEXT}; +use tantivy::{Index, IndexReader, IndexWriter, ReloadPolicy}; + +use crate::jni_directory::JniDirectory; + +/// Helper: throw a Java RuntimeException and return a default value. +fn throw_and_return<T: Default>(env: &mut JNIEnv, msg: &str) -> T { + let _ = env.throw_new("java/lang/RuntimeException", msg); + T::default() +} + +/// Helper: throw a Java RuntimeException and return a null jobject. +fn throw_and_return_null(env: &mut JNIEnv, msg: &str) -> jobject { + let _ = env.throw_new("java/lang/RuntimeException", msg); + ptr::null_mut() +} + +/// Fixed schema: rowId (u64 fast field) + text (full-text indexed). +struct TantivyIndex { + writer: IndexWriter, + row_id_field: Field, + text_field: Field, +} + +struct TantivySearcherHandle { + reader: IndexReader, + text_field: Field, +} + +fn build_schema() -> (Schema, Field, Field) { + let mut builder = Schema::builder(); + let row_id_field = builder.add_u64_field( + "row_id", + NumericOptions::default() + .set_stored() + .set_indexed() + .set_fast(), + ); + let text_field = builder.add_text_field("text", TEXT); + (builder.build(), row_id_field, text_field) +} + +// --------------------------------------------------------------------------- +// TantivyIndexWriter native methods +// --------------------------------------------------------------------------- + +#[no_mangle] +pub extern "system" fn Java_org_apache_paimon_tantivy_TantivyIndexWriter_createIndex( + mut env: JNIEnv, + _class: JClass, + index_path: JString, +) -> jlong { + let path: String = match env.get_string(&index_path) { + Ok(s) => s.into(), + Err(e) => return throw_and_return(&mut env, &format!("Failed to get index path: {}", e)), + }; + let (schema, row_id_field, text_field) = build_schema(); + + let dir = std::path::Path::new(&path); + if let Err(e) = std::fs::create_dir_all(dir) { + return throw_and_return(&mut env, &format!("Failed to create directory: {}", e)); + } + let index = match Index::create_in_dir(dir, schema) { + Ok(i) => i, + Err(e) => return throw_and_return(&mut env, &format!("Failed to create index: {}", e)), + }; + let writer = match index.writer(50_000_000) { + Ok(w) => w, + Err(e) => return throw_and_return(&mut env, &format!("Failed to create writer: {}", e)), + }; + + let handle = Box::new(TantivyIndex { + writer, + row_id_field, + text_field, + }); + Box::into_raw(handle) as jlong +} + +#[no_mangle] +pub extern "system" fn Java_org_apache_paimon_tantivy_TantivyIndexWriter_writeDocument( + mut env: JNIEnv, + _class: JClass, + index_ptr: jlong, + row_id: jlong, + text: JString, +) { + let handle = unsafe { &mut *(index_ptr as *mut TantivyIndex) }; + let text_str: String = match env.get_string(&text) { + Ok(s) => s.into(), + Err(e) => { + throw_and_return::<()>(&mut env, &format!("Failed to get text string: {}", e)); + return; + } + }; + + let mut doc = tantivy::TantivyDocument::new(); + doc.add_u64(handle.row_id_field, row_id as u64); + doc.add_text(handle.text_field, &text_str); + if let Err(e) = handle.writer.add_document(doc) { + throw_and_return::<()>(&mut env, &format!("Failed to add document: {}", e)); + } +} + +#[no_mangle] +pub extern "system" fn Java_org_apache_paimon_tantivy_TantivyIndexWriter_commitIndex( + mut env: JNIEnv, + _class: JClass, + index_ptr: jlong, +) { + let handle = unsafe { &mut *(index_ptr as *mut TantivyIndex) }; + if let Err(e) = handle.writer.commit() { + throw_and_return::<()>(&mut env, &format!("Failed to commit index: {}", e)); + } +} + +#[no_mangle] +pub extern "system" fn Java_org_apache_paimon_tantivy_TantivyIndexWriter_freeIndex( + _env: JNIEnv, + _class: JClass, + index_ptr: jlong, +) { + unsafe { + let _ = Box::from_raw(index_ptr as *mut TantivyIndex); + } +} + +// --------------------------------------------------------------------------- +// TantivySearcher native methods +// --------------------------------------------------------------------------- + +#[no_mangle] +pub extern "system" fn Java_org_apache_paimon_tantivy_TantivySearcher_openIndex( + mut env: JNIEnv, + _class: JClass, + index_path: JString, +) -> jlong { + let path: String = match env.get_string(&index_path) { + Ok(s) => s.into(), + Err(e) => return throw_and_return(&mut env, &format!("Failed to get index path: {}", e)), + }; + let index = match Index::open_in_dir(&path) { + Ok(i) => i, + Err(e) => return throw_and_return(&mut env, &format!("Failed to open index: {}", e)), + }; + let schema = index.schema(); + + let text_field = schema.get_field("text").unwrap(); + + let reader = match index + .reader_builder() + .reload_policy(ReloadPolicy::OnCommitWithDelay) + .try_into() + { + Ok(r) => r, + Err(e) => return throw_and_return(&mut env, &format!("Failed to create reader: {}", e)), + }; + + let handle = Box::new(TantivySearcherHandle { + reader, + text_field, + }); + Box::into_raw(handle) as jlong +} + +/// Open an index from a Java StreamFileInput callback object. +/// +/// fileNames: String[] — names of files in the archive +/// fileOffsets: long[] — byte offset of each file in the stream +/// fileLengths: long[] — byte length of each file +/// streamInput: StreamFileInput — Java object with seek(long) and read(byte[], int, int) methods +#[no_mangle] +pub extern "system" fn Java_org_apache_paimon_tantivy_TantivySearcher_openFromStream( + mut env: JNIEnv, + _class: JClass, + file_names: jni::objects::JObjectArray, + file_offsets: jni::objects::JLongArray, + file_lengths: jni::objects::JLongArray, + stream_input: JObject, +) -> jlong { + // Parse file metadata from Java arrays + let count = match env.get_array_length(&file_names) { + Ok(c) => c as usize, + Err(e) => return throw_and_return(&mut env, &format!("Failed to get array length: {}", e)), + }; + let mut offsets_buf = vec![0i64; count]; + let mut lengths_buf = vec![0i64; count]; + if let Err(e) = env.get_long_array_region(&file_offsets, 0, &mut offsets_buf) { + return throw_and_return(&mut env, &format!("Failed to get offsets: {}", e)); + } + if let Err(e) = env.get_long_array_region(&file_lengths, 0, &mut lengths_buf) { + return throw_and_return(&mut env, &format!("Failed to get lengths: {}", e)); + } + + let mut files = Vec::with_capacity(count); + for i in 0..count { + let obj = match env.get_object_array_element(&file_names, i as i32) { + Ok(o) => o, + Err(e) => return throw_and_return(&mut env, &format!("Failed to get file name at {}: {}", i, e)), + }; + let jstr = JString::from(obj); + let name: String = match env.get_string(&jstr) { + Ok(s) => s.into(), + Err(e) => return throw_and_return(&mut env, &format!("Failed to convert file name: {}", e)), + }; + files.push((name, offsets_buf[i] as u64, lengths_buf[i] as u64)); + } + + // Create a global ref to the Java stream callback + let jvm = match env.get_java_vm() { + Ok(v) => v, + Err(e) => return throw_and_return(&mut env, &format!("Failed to get JVM: {}", e)), + }; + let stream_ref = match env.new_global_ref(stream_input) { + Ok(r) => r, + Err(e) => return throw_and_return(&mut env, &format!("Failed to create global ref: {}", e)), + }; + + let directory = JniDirectory::new(jvm, stream_ref, files); + let index = match Index::open(directory) { + Ok(i) => i, + Err(e) => return throw_and_return(&mut env, &format!("Failed to open index from stream: {}", e)), + }; + let schema = index.schema(); + + let text_field = schema.get_field("text").unwrap(); + + let reader = match index + .reader_builder() + .reload_policy(ReloadPolicy::Manual) + .try_into() + { + Ok(r) => r, + Err(e) => return throw_and_return(&mut env, &format!("Failed to create reader: {}", e)), + }; + + let handle = Box::new(TantivySearcherHandle { + reader, + text_field, + }); + Box::into_raw(handle) as jlong +} + +/// Search and return a SearchResult(long[] rowIds, float[] scores). +#[no_mangle] +pub extern "system" fn Java_org_apache_paimon_tantivy_TantivySearcher_searchIndex( + mut env: JNIEnv, + _class: JClass, + searcher_ptr: jlong, + query_string: JString, + limit: jint, +) -> jobject { + let handle = unsafe { &*(searcher_ptr as *const TantivySearcherHandle) }; + let query_str: String = match env.get_string(&query_string) { + Ok(s) => s.into(), + Err(e) => return throw_and_return_null(&mut env, &format!("Failed to get query string: {}", e)), + }; + + let searcher = handle.reader.searcher(); + let query_parser = QueryParser::for_index(&searcher.index(), vec![handle.text_field]); + let query = match query_parser.parse_query(&query_str) { + Ok(q) => q, + Err(e) => return throw_and_return_null(&mut env, &format!("Failed to parse query '{}': {}", query_str, e)), + }; + let top_docs = match searcher.search(&query, &TopDocs::with_limit(limit as usize)) { + Ok(d) => d, + Err(e) => return throw_and_return_null(&mut env, &format!("Search failed: {}", e)), + }; + + let count = top_docs.len(); + + // Build Java long[] and float[] + let row_id_array = match env.new_long_array(count as i32) { + Ok(a) => a, + Err(e) => return throw_and_return_null(&mut env, &format!("Failed to create long array: {}", e)), + }; + let score_array = match env.new_float_array(count as i32) { + Ok(a) => a, + Err(e) => return throw_and_return_null(&mut env, &format!("Failed to create float array: {}", e)), + }; + + let mut row_ids: Vec<jlong> = Vec::with_capacity(count); + let mut scores: Vec<jfloat> = Vec::with_capacity(count); + + // Use fast field reader for efficient row_id retrieval + for (score, doc_address) in &top_docs { + let segment_reader = searcher.segment_reader(doc_address.segment_ord); + let fast_fields = match segment_reader.fast_fields().u64("row_id") { + Ok(f) => f, + Err(e) => return throw_and_return_null(&mut env, &format!("Failed to get fast field: {}", e)), + }; + let row_id = fast_fields.first(doc_address.doc_id).unwrap_or(0) as jlong; + row_ids.push(row_id); + scores.push(*score as jfloat); + } + + if let Err(e) = env.set_long_array_region(&row_id_array, 0, &row_ids) { + return throw_and_return_null(&mut env, &format!("Failed to set long array: {}", e)); + } + if let Err(e) = env.set_float_array_region(&score_array, 0, &scores) { + return throw_and_return_null(&mut env, &format!("Failed to set float array: {}", e)); + } + + // Construct SearchResult object + let class = match env.find_class("org/apache/paimon/tantivy/SearchResult") { + Ok(c) => c, + Err(e) => return throw_and_return_null(&mut env, &format!("Failed to find SearchResult class: {}", e)), + }; + let obj = match env.new_object( + class, + "([J[F)V", + &[ + JValue::Object(&JObject::from(row_id_array)), + JValue::Object(&JObject::from(score_array)), + ], + ) { + Ok(o) => o, + Err(e) => return throw_and_return_null(&mut env, &format!("Failed to create SearchResult: {}", e)), + }; + + obj.into_raw() +} + +#[no_mangle] +pub extern "system" fn Java_org_apache_paimon_tantivy_TantivySearcher_freeSearcher( + _env: JNIEnv, + _class: JClass, + searcher_ptr: jlong, +) { + unsafe { + let _ = Box::from_raw(searcher_ptr as *mut TantivySearcherHandle); + } +} diff --git a/paimon-tantivy/paimon-tantivy-jni/src/main/java/org/apache/paimon/tantivy/NativeLoader.java b/paimon-tantivy/paimon-tantivy-jni/src/main/java/org/apache/paimon/tantivy/NativeLoader.java new file mode 100644 index 0000000000..af4a446127 --- /dev/null +++ b/paimon-tantivy/paimon-tantivy-jni/src/main/java/org/apache/paimon/tantivy/NativeLoader.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.tantivy; + +import org.apache.paimon.shade.guava30.com.google.common.io.ByteStreams; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Locale; + +/** Utility class for loading the native Tantivy JNI library. */ +public final class NativeLoader { + private static boolean loaded = false; + + private NativeLoader() {} + + public static synchronized void loadJni() { + if (loaded) { + return; + } + + String osName = System.getProperty("os.name").toLowerCase(Locale.ROOT); + String osArch = System.getProperty("os.arch").toLowerCase(Locale.ROOT); + String libName = "libtantivy_jni"; + + String osShortName; + String libExt; + if (osName.contains("win")) { + osShortName = "win"; + libExt = ".dll"; + libName += libExt; + } else if (osName.contains("mac")) { + osShortName = "darwin"; + libExt = ".dylib"; + libName += libExt; + } else if (osName.contains("nix") || osName.contains("nux")) { + osShortName = "linux"; + libExt = ".so"; + libName += libExt; + } else { + throw new UnsupportedOperationException("Unsupported OS: " + osName); + } + + String libPath = "/native/" + osShortName + "-" + osArch + "/" + libName; + try (InputStream in = NativeLoader.class.getResourceAsStream(libPath)) { + if (in == null) { + throw new FileNotFoundException("Library not found: " + libPath); + } + File tempFile = File.createTempFile("libtantivy_jni", libExt); + tempFile.deleteOnExit(); + + try (OutputStream out = new FileOutputStream(tempFile)) { + ByteStreams.copy(in, out); + } + libName = tempFile.getAbsolutePath(); + } catch (IOException e) { + throw new RuntimeException("Failed to load library: " + e.getMessage(), e); + } + + System.load(libName); + loaded = true; + } +} diff --git a/paimon-tantivy/paimon-tantivy-jni/src/main/java/org/apache/paimon/tantivy/SearchResult.java b/paimon-tantivy/paimon-tantivy-jni/src/main/java/org/apache/paimon/tantivy/SearchResult.java new file mode 100644 index 0000000000..a2086237a7 --- /dev/null +++ b/paimon-tantivy/paimon-tantivy-jni/src/main/java/org/apache/paimon/tantivy/SearchResult.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.tantivy; + +/** Search result containing parallel arrays of rowIds and scores, ordered by score descending. */ +public class SearchResult { + + private final long[] rowIds; + private final float[] scores; + + public SearchResult(long[] rowIds, float[] scores) { + this.rowIds = rowIds; + this.scores = scores; + } + + public long[] getRowIds() { + return rowIds; + } + + public float[] getScores() { + return scores; + } + + public int size() { + return rowIds.length; + } +} diff --git a/paimon-tantivy/paimon-tantivy-jni/src/main/java/org/apache/paimon/tantivy/StreamFileInput.java b/paimon-tantivy/paimon-tantivy-jni/src/main/java/org/apache/paimon/tantivy/StreamFileInput.java new file mode 100644 index 0000000000..5f3aa83ea4 --- /dev/null +++ b/paimon-tantivy/paimon-tantivy-jni/src/main/java/org/apache/paimon/tantivy/StreamFileInput.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.tantivy; + +import java.io.IOException; + +/** + * Callback interface for Rust JNI to read from a seekable stream. Implementations must be + * thread-safe — Rust may call seek/read from multiple threads. + */ +public interface StreamFileInput { + + void seek(long position) throws IOException; + + int read(byte[] buf, int off, int len) throws IOException; +} diff --git a/paimon-tantivy/paimon-tantivy-jni/src/main/java/org/apache/paimon/tantivy/TantivyIndexWriter.java b/paimon-tantivy/paimon-tantivy-jni/src/main/java/org/apache/paimon/tantivy/TantivyIndexWriter.java new file mode 100644 index 0000000000..51ccabf3a0 --- /dev/null +++ b/paimon-tantivy/paimon-tantivy-jni/src/main/java/org/apache/paimon/tantivy/TantivyIndexWriter.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.tantivy; + +/** Java wrapper for Tantivy index writer via JNI. Fixed schema: rowId (long) + text (String). */ +public class TantivyIndexWriter implements AutoCloseable { + + static { + NativeLoader.loadJni(); + } + + private long indexPtr; + private boolean closed; + + public TantivyIndexWriter(String indexPath) { + this.indexPtr = createIndex(indexPath); + this.closed = false; + } + + public void addDocument(long rowId, String text) { + checkNotClosed(); + writeDocument(indexPtr, rowId, text); + } + + public void commit() { + checkNotClosed(); + commitIndex(indexPtr); + } + + @Override + public void close() { + if (!closed) { + freeIndex(indexPtr); + indexPtr = 0; + closed = true; + } + } + + private void checkNotClosed() { + if (closed) { + throw new IllegalStateException("IndexWriter is already closed"); + } + } + + // ---- native methods ---- + + static native long createIndex(String indexPath); + + static native void writeDocument(long indexPtr, long rowId, String text); + + static native void commitIndex(long indexPtr); + + static native void freeIndex(long indexPtr); +} diff --git a/paimon-tantivy/paimon-tantivy-jni/src/main/java/org/apache/paimon/tantivy/TantivySearcher.java b/paimon-tantivy/paimon-tantivy-jni/src/main/java/org/apache/paimon/tantivy/TantivySearcher.java new file mode 100644 index 0000000000..65efde90b4 --- /dev/null +++ b/paimon-tantivy/paimon-tantivy-jni/src/main/java/org/apache/paimon/tantivy/TantivySearcher.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.tantivy; + +/** Java wrapper for Tantivy index searcher via JNI. Returns (rowId, score) pairs. */ +public class TantivySearcher implements AutoCloseable { + + static { + NativeLoader.loadJni(); + } + + private long searcherPtr; + private boolean closed; + + /** Open a searcher from a local index directory. */ + public TantivySearcher(String indexPath) { + this.searcherPtr = openIndex(indexPath); + this.closed = false; + } + + /** + * Open a searcher from a stream-backed archive. + * + * @param fileNames names of files in the archive + * @param fileOffsets byte offset of each file in the stream + * @param fileLengths byte length of each file + * @param streamInput callback for seek/read operations on the stream + */ + public TantivySearcher( + String[] fileNames, + long[] fileOffsets, + long[] fileLengths, + StreamFileInput streamInput) { + this.searcherPtr = openFromStream(fileNames, fileOffsets, fileLengths, streamInput); + this.closed = false; + } + + /** + * Search the index with a query string, returning top N results ranked by score. + * + * @param queryString the query text + * @param limit max number of results + * @return search results containing rowIds and scores + */ + public SearchResult search(String queryString, int limit) { + checkNotClosed(); + return searchIndex(searcherPtr, queryString, limit); + } + + @Override + public void close() { + if (!closed) { + freeSearcher(searcherPtr); + searcherPtr = 0; + closed = true; + } + } + + private void checkNotClosed() { + if (closed) { + throw new IllegalStateException("Searcher is already closed"); + } + } + + // ---- native methods ---- + + static native long openIndex(String indexPath); + + static native long openFromStream( + String[] fileNames, long[] fileOffsets, long[] fileLengths, StreamFileInput streamInput); + + static native SearchResult searchIndex(long searcherPtr, String queryString, int limit); + + static native void freeSearcher(long searcherPtr); +} diff --git a/paimon-tantivy/paimon-tantivy-jni/src/main/resources/META-INF/NOTICE b/paimon-tantivy/paimon-tantivy-jni/src/main/resources/META-INF/NOTICE new file mode 100644 index 0000000000..00074de3a3 --- /dev/null +++ b/paimon-tantivy/paimon-tantivy-jni/src/main/resources/META-INF/NOTICE @@ -0,0 +1,17 @@ +Paimon : Tantivy JNI +Copyright 2023-2026 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +This product includes software from the Tantivy project +(https://github.com/quickwit-oss/tantivy), licensed under the +MIT License. + +This product includes software from the jni crate +(https://github.com/jni-rs/jni-rs), licensed under the +MIT License / Apache License, Version 2.0. + +This product includes software from the serde_json crate +(https://github.com/serde-rs/json), licensed under the +MIT License / Apache License, Version 2.0. diff --git a/paimon-tantivy/paimon-tantivy-jni/src/test/java/org/apache/paimon/tantivy/TantivyJniTest.java b/paimon-tantivy/paimon-tantivy-jni/src/test/java/org/apache/paimon/tantivy/TantivyJniTest.java new file mode 100644 index 0000000000..4313f5768d --- /dev/null +++ b/paimon-tantivy/paimon-tantivy-jni/src/test/java/org/apache/paimon/tantivy/TantivyJniTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.tantivy; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Path; + +import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +/** Smoke test for Tantivy JNI. */ +class TantivyJniTest { + + @BeforeAll + static void checkNativeLibrary() { + assumeTrue(isNativeAvailable(), "Tantivy native library not available, skipping tests"); + } + + private static boolean isNativeAvailable() { + try { + NativeLoader.loadJni(); + return true; + } catch (Throwable t) { + return false; + } + } + + @Test + void testWriteAndSearch(@TempDir Path tempDir) { + String indexPath = tempDir.resolve("test_index").toString(); + + try (TantivyIndexWriter writer = new TantivyIndexWriter(indexPath)) { + writer.addDocument(1L, "Apache Paimon is a streaming data lake platform"); + writer.addDocument(2L, "Tantivy is a full-text search engine written in Rust"); + writer.addDocument(3L, "Paimon supports real-time data ingestion"); + writer.commit(); + } + + try (TantivySearcher searcher = new TantivySearcher(indexPath)) { + SearchResult result = searcher.search("paimon", 10); + assertTrue(result.size() > 0, "Should find at least one result"); + assertEquals(result.getRowIds().length, result.getScores().length); + + // Both doc 1 and doc 3 mention "paimon" + assertEquals(2, result.size()); + for (int i = 0; i < result.size(); i++) { + long rowId = result.getRowIds()[i]; + assertTrue(rowId == 1L || rowId == 3L, "Unexpected rowId: " + rowId); + assertTrue(result.getScores()[i] > 0, "Score should be positive"); + } + + // Scores should be descending + if (result.size() > 1) { + assertTrue(result.getScores()[0] >= result.getScores()[1]); + } + } + } +} diff --git a/paimon-tantivy/paimon-tantivy-jni/src/test/java/org/apache/paimon/tantivy/TantivyStreamSearchTest.java b/paimon-tantivy/paimon-tantivy-jni/src/test/java/org/apache/paimon/tantivy/TantivyStreamSearchTest.java new file mode 100644 index 0000000000..a7afa7915b --- /dev/null +++ b/paimon-tantivy/paimon-tantivy-jni/src/test/java/org/apache/paimon/tantivy/TantivyStreamSearchTest.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.tantivy; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +/** Test for stream-based TantivySearcher via JNI Directory callback. */ +class TantivyStreamSearchTest { + + @BeforeAll + static void checkNativeLibrary() { + assumeTrue(isNativeAvailable(), "Tantivy native library not available, skipping tests"); + } + + private static boolean isNativeAvailable() { + try { + NativeLoader.loadJni(); + return true; + } catch (Throwable t) { + return false; + } + } + + @Test + void testStreamBasedSearch(@TempDir Path tempDir) throws IOException { + // 1. Create index on disk + String indexPath = tempDir.resolve("test_index").toString(); + try (TantivyIndexWriter writer = new TantivyIndexWriter(indexPath)) { + writer.addDocument(0L, "Apache Paimon is a streaming data lake platform"); + writer.addDocument(1L, "Tantivy is a full-text search engine written in Rust"); + writer.addDocument(2L, "Paimon supports real-time data ingestion"); + writer.commit(); + } + + // 2. Pack the index into an archive (same format as TantivyFullTextGlobalIndexWriter) + File indexDir = new File(indexPath); + File[] indexFiles = indexDir.listFiles(); + assertNotNull(indexFiles); + + ByteArrayOutputStream archiveOut = new ByteArrayOutputStream(); + List<String> fileNames = new ArrayList<>(); + List<Long> fileOffsets = new ArrayList<>(); + List<Long> fileLengths = new ArrayList<>(); + + // Write file count + writeInt(archiveOut, indexFiles.length); + + // First pass: write archive and track offsets + // We need to compute offsets relative to the start of the archive + // Build the archive in memory + ByteArrayOutputStream headerOut = new ByteArrayOutputStream(); + ByteArrayOutputStream dataOut = new ByteArrayOutputStream(); + + // Compute header size first + int headerSize = 4; // file count + for (File file : indexFiles) { + if (!file.isFile()) continue; + byte[] nameBytes = file.getName().getBytes("UTF-8"); + headerSize += 4 + nameBytes.length + 8; // nameLen + name + dataLen + } + + long dataOffset = headerSize; + ByteArrayOutputStream fullArchive = new ByteArrayOutputStream(); + writeInt(fullArchive, indexFiles.length); + + for (File file : indexFiles) { + if (!file.isFile()) continue; + byte[] nameBytes = file.getName().getBytes("UTF-8"); + long fileLen = file.length(); + + writeInt(fullArchive, nameBytes.length); + fullArchive.write(nameBytes); + writeLong(fullArchive, fileLen); + + fileNames.add(file.getName()); + fileOffsets.add((long) fullArchive.size()); + fileLengths.add(fileLen); + + try (FileInputStream fis = new FileInputStream(file)) { + byte[] buf = new byte[8192]; + int read; + while ((read = fis.read(buf)) != -1) { + fullArchive.write(buf, 0, read); + } + } + } + + byte[] archiveBytes = fullArchive.toByteArray(); + + // 3. Write archive to a file and open as RandomAccessFile-backed stream + File archiveFile = tempDir.resolve("archive.bin").toFile(); + try (java.io.FileOutputStream fos = new java.io.FileOutputStream(archiveFile)) { + fos.write(archiveBytes); + } + + // 4. Create a StreamFileInput backed by RandomAccessFile + RandomAccessFile raf = new RandomAccessFile(archiveFile, "r"); + StreamFileInput streamInput = + new StreamFileInput() { + @Override + public synchronized void seek(long position) throws IOException { + raf.seek(position); + } + + @Override + public synchronized int read(byte[] buf, int off, int len) throws IOException { + return raf.read(buf, off, len); + } + }; + + // 5. Open searcher from stream + try (TantivySearcher searcher = + new TantivySearcher( + fileNames.toArray(new String[0]), + fileOffsets.stream().mapToLong(Long::longValue).toArray(), + fileLengths.stream().mapToLong(Long::longValue).toArray(), + streamInput)) { + + SearchResult result = searcher.search("paimon", 10); + assertTrue(result.size() > 0, "Should find at least one result"); + assertEquals(2, result.size(), "Both doc 0 and doc 2 mention paimon"); + + for (int i = 0; i < result.size(); i++) { + long rowId = result.getRowIds()[i]; + assertTrue(rowId == 0L || rowId == 2L, "Unexpected rowId: " + rowId); + assertTrue(result.getScores()[i] > 0, "Score should be positive"); + } + } finally { + raf.close(); + } + } + + private static void writeInt(ByteArrayOutputStream out, int value) { + out.write((value >>> 24) & 0xFF); + out.write((value >>> 16) & 0xFF); + out.write((value >>> 8) & 0xFF); + out.write(value & 0xFF); + } + + private static void writeLong(ByteArrayOutputStream out, long value) { + writeInt(out, (int) (value >>> 32)); + writeInt(out, (int) value); + } +} diff --git a/paimon-tantivy/pom.xml b/paimon-tantivy/pom.xml new file mode 100644 index 0000000000..b0609f92b9 --- /dev/null +++ b/paimon-tantivy/pom.xml @@ -0,0 +1,39 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>paimon-parent</artifactId> + <groupId>org.apache.paimon</groupId> + <version>1.5-SNAPSHOT</version> + </parent> + + <artifactId>paimon-tantivy</artifactId> + <name>Paimon : Tantivy : </name> + <packaging>pom</packaging> + + <modules> + <module>paimon-tantivy-jni</module> + <module>paimon-tantivy-index</module> + </modules> +</project> diff --git a/pom.xml b/pom.xml index 68e0c26844..7b0c3448d4 100644 --- a/pom.xml +++ b/pom.xml @@ -74,6 +74,7 @@ under the License. <module>paimon-api</module> <module>paimon-lumina</module> <module>paimon-vortex</module> + <module>paimon-tantivy</module> </modules> <properties> @@ -628,6 +629,7 @@ under the License. <exclude>paimon-core/src/test/resources/compatibility/**</exclude> <exclude>paimon-python/pypaimon/sample/data/**</exclude> <exclude>**/*.proto</exclude> + <exclude>**/Cargo*</exclude> </excludes> </configuration> </plugin>
