modules\indexing\src\main\java\org\apache\ignite\internal\processors\query\h2\opt\GridLuceneIndex.java
```java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.query.h2.opt;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.util.GridAtomicLong;
import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.LongField;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.queryparser.classic.MultiFieldQueryParser;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.NumericRangeQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.util.BytesRef;
import org.h2.util.JdbcUtils;
import org.jetbrains.annotations.Nullable;
import static
org.apache.ignite.internal.processors.query.QueryUtils.KEY_FIELD_NAME;
import static
org.apache.ignite.internal.processors.query.QueryUtils.VAL_FIELD_NAME;
/**
* Lucene fulltext index.
*/
public class GridLuceneIndex implements AutoCloseable {
/** Field name for string representation of value. */
public static final String VAL_STR_FIELD_NAME = "_gg_val_str__";
/** Field name for value version. */
public static final String VER_FIELD_NAME = "_gg_ver__";
/** Field name for value expiration time. */
public static final String EXPIRATION_TIME_FIELD_NAME = "_gg_expires__";
/** */
private final String cacheName;
/** */
private final GridQueryTypeDescriptor type;
/** */
private final IndexWriter writer;
/** */
private final String[] idxdFields;
/** */
private final AtomicLong updateCntr = new GridAtomicLong();
/** */
private final GridLuceneDirectory dir;
/** */
private final GridKernalContext ctx;
/**
* Constructor.
*
* @param ctx Kernal context.
* @param cacheName Cache name.
* @param type Type descriptor.
* @throws IgniteCheckedException If failed.
*/
public GridLuceneIndex(GridKernalContext ctx, @Nullable String
cacheName, GridQueryTypeDescriptor type)
throws IgniteCheckedException {
this.ctx = ctx;
this.cacheName = cacheName;
this.type = type;
dir = new GridLuceneDirectory(new GridUnsafeMemory(0));
try {
writer = new IndexWriter(dir, new IndexWriterConfig(new
StandardAnalyzer()));
}
catch (IOException e) {
throw new IgniteCheckedException(e);
}
GridQueryIndexDescriptor idx = type.textIndex();
if (idx != null) {
Collection<String> fields = idx.fields();
idxdFields = new String[fields.size() + 1];
fields.toArray(idxdFields);
}
else {
assert type.valueTextIndex() || type.valueClass() ==
String.class;
idxdFields = new String[1];
}
idxdFields[idxdFields.length - 1] = VAL_STR_FIELD_NAME;
}
/**
* @return Cache object context.
*/
private CacheObjectContext objectContext() {
if (ctx == null)
return null;
return
ctx.cache().internalCache(cacheName).context().cacheObjectContext();
}
/**
* Stores given data in this fulltext index.
*
* @param k Key.
* @param v Value.
* @param ver Version.
* @param expires Expiration time.
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("ConstantConditions")
public void store(CacheObject k, CacheObject v, GridCacheVersion ver,
long expires) throws IgniteCheckedException {
CacheObjectContext coctx = objectContext();
Object key = k.isPlatformType() ? k.value(coctx, false) : k;
Object val = v.isPlatformType() ? v.value(coctx, false) : v;
Document doc = new Document();
boolean stringsFound = false;
if (type.valueTextIndex() || type.valueClass() == String.class) {
doc.add(new TextField(VAL_STR_FIELD_NAME, val.toString(),
Field.Store.YES));
stringsFound = true;
}
for (int i = 0, last = idxdFields.length - 1; i < last; i++) {
Object fieldVal = type.value(idxdFields[i], key, val);
if (fieldVal != null) {
doc.add(new TextField(idxdFields[i], fieldVal.toString(),
Field.Store.YES));
stringsFound = true;
}
}
BytesRef keyByteRef = new BytesRef(k.valueBytes(coctx));
try {
final Term term = new Term(KEY_FIELD_NAME, keyByteRef);
if (!stringsFound) {
writer.deleteDocuments(term);
return; // We did not find any strings to be indexed, will
not store data at all.
}
doc.add(new StringField(KEY_FIELD_NAME, keyByteRef,
Field.Store.YES));
if (type.valueClass() != String.class)
doc.add(new StoredField(VAL_FIELD_NAME,
v.valueBytes(coctx)));
doc.add(new StoredField(VER_FIELD_NAME,
ver.toString().getBytes()));
doc.add(new LongField(EXPIRATION_TIME_FIELD_NAME, expires,
Field.Store.YES));
// Next implies remove than add atomically operation.
writer.updateDocument(term, doc);
}
catch (IOException e) {
throw new IgniteCheckedException(e);
}
finally {
updateCntr.incrementAndGet();
}
}
/**
* Removes entry for given key from this index.
*
* @param key Key.
* @throws IgniteCheckedException If failed.
*/
public void remove(CacheObject key) throws IgniteCheckedException {
try {
writer.deleteDocuments(new Term(KEY_FIELD_NAME,
new BytesRef(key.valueBytes(objectContext()))));
}
catch (IOException e) {
throw new IgniteCheckedException(e);
}
finally {
updateCntr.incrementAndGet();
}
}
/**
* Runs lucene fulltext query over this index.
*
* @param qry Query.
* @param filters Filters over result.
* @param pageSize Size of batch
* @return Query result.
* @throws IgniteCheckedException If failed.
*/
public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> query(String
qry, IndexingQueryFilter filters, int pageSize) throws
IgniteCheckedException {
IndexReader reader;
try {
long updates = updateCntr.get();
if (updates != 0) {
writer.commit();
updateCntr.addAndGet(-updates);
}
//We can cache reader\searcher and change this to
'openIfChanged'
reader = DirectoryReader.open(writer, true);
}
catch (IOException e) {
throw new IgniteCheckedException(e);
}
IndexSearcher searcher;
Query query;
try {
searcher = new IndexSearcher(reader);
MultiFieldQueryParser parser = new
MultiFieldQueryParser(idxdFields,
writer.getAnalyzer());
// parser.setAllowLeadingWildcard(true);
// Filter expired items.
Query filter =
NumericRangeQuery.newLongRange(EXPIRATION_TIME_FIELD_NAME,
U.currentTimeMillis(),
null, false, false);
query = new BooleanQuery.Builder()
.add(parser.parse(qry), BooleanClause.Occur.MUST)
.add(filter, BooleanClause.Occur.FILTER)
.build();
}
catch (Exception e) {
U.closeQuiet(reader);
throw new IgniteCheckedException(e);
}
IndexingQueryCacheFilter fltr = null;
if (filters != null)
fltr = filters.forCache(cacheName);
return new It<>(reader, searcher, query, fltr, pageSize);
}
/** {@inheritDoc} */
@Override public void close() {
U.closeQuiet(writer);
U.close(dir, ctx.log(GridLuceneIndex.class));
}
/**
* Key-value iterator over fulltext search result.
*/
private class It<K, V> extends
GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> {
private final int BatchPosBeforeHead = -1;
/** */
private static final long serialVersionUID = 0L;
/** */
private final int pageSize;
/** */
private final IndexReader reader;
/** */
private final Query query;
/** */
private final IndexSearcher searcher;
/** current batch docs*/
private ScoreDoc[] batch;
/** current position in batch*/
private int batchPos = BatchPosBeforeHead;
/** */
private final IndexingQueryCacheFilter filters;
/** */
private IgniteBiTuple<K, V> curr;
/** */
private CacheObjectContext coctx;
/**
* Constructor.
*
* @param reader Reader.
* @param searcher Searcher.
* @param filters Filters over result.
* @throws IgniteCheckedException if failed.
*/
private It(IndexReader reader, IndexSearcher searcher, Query query,
IndexingQueryCacheFilter filters, int pageSize)
throws IgniteCheckedException {
this.reader = reader;
this.searcher = searcher;
this.filters = filters;
this.query = query;
this.pageSize = pageSize;
coctx = objectContext();
findNext();
}
/**
* @param bytes Bytes.
* @param ldr Class loader.
* @return Object.
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("unchecked")
private <Z> Z unmarshall(byte[] bytes, ClassLoader ldr) throws
IgniteCheckedException {
if (coctx == null) // For tests.
return (Z)JdbcUtils.deserialize(bytes, null);
return (Z)coctx.kernalContext().cacheObjects().unmarshal(coctx,
bytes, ldr);
}
/**
* Finds next element.
*
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("unchecked")
private void findNext() throws IgniteCheckedException {
curr = null;
if(isClosed())
throw new IgniteCheckedException("Iterator already closed");
if (shouldRequestNextBatch()) {
try {
requestNextBatch();
} catch (IOException e) {
close();
throw new IgniteCheckedException(e);
}
}
if(batch == null)
return;
while (batchPos < batch.length) {
Document doc;
ScoreDoc scoreDoc =batch[batchPos++];
try {
doc = searcher.doc(scoreDoc.doc);
}
catch (IOException e) {
throw new IgniteCheckedException(e);
}
ClassLoader ldr = null;
if (ctx != null && ctx.deploy().enabled())
ldr =
ctx.cache().internalCache(cacheName).context().deploy().globalLoader();
K k = unmarshall(doc.getBinaryValue(KEY_FIELD_NAME).bytes,
ldr);
if (filters != null && !filters.apply(k))
continue;
V v = type.valueClass() == String.class ?
(V)doc.get(VAL_STR_FIELD_NAME) :
this.<V>unmarshall(doc.getBinaryValue(VAL_FIELD_NAME).bytes, ldr);
assert v != null;
curr = new IgniteBiTuple<>(k, v);
break;
}
}
private boolean shouldRequestNextBatch() {
if(batch == null){
// should request for first batch
return (batchPos == BatchPosBeforeHead) ;
} else {
// should request when reached to the end of batch
return (batchPos == batch.length);
}
}
private void requestNextBatch() throws IOException {
TopDocs docs;
if (batch == null) {
docs = searcher.search(query, pageSize);
} else {
docs = searcher.searchAfter(batch[batch.length - 1], query,
pageSize);
}
if(docs.scoreDocs.length ==0) {
batch = null;
}else{
batch = docs.scoreDocs;
}
batchPos = 0;
}
/** {@inheritDoc} */
@Override protected IgniteBiTuple<K, V> onNext() throws
IgniteCheckedException {
IgniteBiTuple<K, V> res = curr;
findNext();
return res;
}
/** {@inheritDoc} */
@Override protected boolean onHasNext() throws
IgniteCheckedException {
return curr != null;
}
/** {@inheritDoc} */
@Override protected void onClose() throws IgniteCheckedException {
U.closeQuiet(reader);
}
}
}
```
On Fri, Sep 7, 2018 at 9:02 AM Tâm Nguyễn Mạnh <[email protected]>
wrote:
> Hi,
>
> I tried to implement iterator for GridLuceneInde, could you please help to
> review ?
>
> --
> Thanks & Best Regards
>
> Tam, Nguyen Manh
>
>
--
Thanks & Best Regards
Tam, Nguyen Manh