[ https://issues.apache.org/jira/browse/GORA-266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16400607#comment-16400607 ]
ASF GitHub Bot commented on GORA-266: ------------------------------------- Github user renato2099 commented on a diff in the pull request: https://github.com/apache/gora/pull/131#discussion_r174723758 --- Diff: gora-lucene/src/main/java/org/apache/gora/lucene/store/LuceneStore.java --- @@ -0,0 +1,475 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.lucene.store; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Type; +import org.apache.avro.util.Utf8; +import org.apache.gora.lucene.query.LuceneQuery; +import org.apache.gora.lucene.query.LuceneResult; +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.query.PartitionQuery; +import org.apache.gora.query.Query; +import org.apache.gora.query.Result; +import org.apache.gora.query.impl.FileSplitPartitionQuery; +import org.apache.gora.store.DataStore; +import org.apache.gora.store.DataStoreFactory; +import org.apache.gora.store.impl.FileBackedDataStoreBase; +import org.apache.gora.util.AvroUtils; +import org.apache.gora.util.GoraException; +import org.apache.gora.util.IOUtils; +import org.apache.gora.util.OperationNotSupportedException; +import org.apache.hadoop.conf.Configurable; +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field.Store; +import org.apache.lucene.document.StoredField; +import org.apache.lucene.document.StringField; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.IndexWriterConfig.OpenMode; +import org.apache.lucene.index.Term; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.SearcherFactory; +import org.apache.lucene.search.SearcherManager; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.util.BytesRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.w3c.dom.Element; +import org.w3c.dom.NodeList; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.FileSystems; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +public class LuceneStore<K, T extends PersistentBase> extends FileBackedDataStoreBase<K, T> implements Configurable { + + private static final Logger LOG = LoggerFactory.getLogger( LuceneStore.class ); + + private static final String DEFAULT_MAPPING_FILE = "gora-lucene-mapping.xml"; + private static final String LUCENE_VERSION_KEY = "gora.lucene.index.version"; + private static final String DEFAULT_LUCENE_VERSION = "LATEST"; + private static final String LUCENE_RAM_BUFFER_KEY = "gora.lucene.index.writer.rambuffer"; + private static final String DEFAULT_LUCENE_RAMBUFFER = "16"; + + private LuceneMapping mapping; + private IndexWriter writer; + private SearcherManager searcherManager; + + @Override + public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) { + try { + super.initialize(keyClass, persistentClass, properties); + } catch (GoraException e1) { + e1.printStackTrace(); + } + + String mappingFile = null; + try { + mappingFile = DataStoreFactory.getMappingFile( + properties, (DataStore<?, ?>) this, DEFAULT_MAPPING_FILE ); + } catch (IOException e1) { + e1.printStackTrace(); + } + String luceneVersion = properties.getProperty( + LUCENE_VERSION_KEY, DEFAULT_LUCENE_VERSION); + String ramBuffer = properties.getProperty( + LUCENE_RAM_BUFFER_KEY, DEFAULT_LUCENE_RAMBUFFER); + + LOG.debug("Lucene index version: {}", luceneVersion); + LOG.debug("Lucene index writer RAM buffer size: {}", ramBuffer); + + try { + mapping = readMapping(mappingFile); + } catch (IOException e1) { + e1.printStackTrace(); + } + try(Directory dir = FSDirectory.open(FileSystems.getDefault().getPath(outputPath))) { + + Analyzer analyzer = new StandardAnalyzer(); + IndexWriterConfig iwc = new IndexWriterConfig(analyzer); + + iwc.setOpenMode(OpenMode.CREATE_OR_APPEND); + iwc.setRAMBufferSizeMB(Double.parseDouble(ramBuffer)); + + writer = new IndexWriter(dir, iwc); + //do we definately want all past deletions to be applied. + searcherManager = new SearcherManager(writer, true, true, new SearcherFactory()); + } catch (IOException e ) { + LOG.error("Error opening {} with Lucene FSDirectory.", outputPath, e); + } + } + + LuceneMapping readMapping(String filename) throws IOException { + try { + + LuceneMapping mapping = new LuceneMapping(); + + DocumentBuilder db = DocumentBuilderFactory.newInstance().newDocumentBuilder(); + org.w3c.dom.Document dom = db.parse(getClass().getClassLoader().getResourceAsStream(filename)); + + Element root = dom.getDocumentElement(); + + NodeList nl = root.getElementsByTagName("class"); + for (int i = 0; i < nl.getLength(); i++) { + + Element classElement = (Element) nl.item(i); + if (classElement.getAttribute("keyClass").equals(keyClass.getCanonicalName()) + && classElement.getAttribute("name").equals(persistentClass.getCanonicalName())) { + + NodeList fields; + Element fe; + + fields = classElement.getElementsByTagName("primarykey"); + for (int j = 0; j < fields.getLength(); j++) { + fe = (Element) fields.item(j); + mapping.setPrimaryKey(fe.getAttribute("column")); + } + + fields = classElement.getElementsByTagName("field"); + for (int j = 0; j < fields.getLength(); j++) { + fe = (Element) fields.item(j); + + String name = fe.getAttribute("name"); + String column = fe.getAttribute("column"); + + mapping.addField(name, column); + } + } + } + return mapping; + } catch (Exception ex) { + throw new IOException("Unable to read " + filename, ex); + } + } + + + @Override + public boolean delete(K key) { + try { + writer.deleteDocuments(new Term(mapping.getPrimaryKey(), key.toString())); + searcherManager.maybeRefresh(); + return true; + } catch (IOException e) { + LOG.error("Unable to delete key: {}", key.toString(), e); + } + return false; + } + + + @Override + public long deleteByQuery(Query<K,T> query) { + try { + // Figure out how many were there before + LuceneQuery<K,T> q = (LuceneQuery<K,T>)query; + LuceneResult<K,T> r = (LuceneResult<K,T>)q.execute(); + int before = r.getScoreDocs().length; + + // Delete them + writer.deleteDocuments(q.toLuceneQuery()); + searcherManager.maybeRefresh(); + + // Figure out how many there are after + r = (LuceneResult<K,T>)q.execute(); + int after = r.getScoreDocs().length; + + return before - after; + } catch (IOException e) { + LOG.error("Unable to deleteByQuery: {}", query.toString(), e); + } + return 0; + } + + + @Override + public void deleteSchema() { + try { + writer.deleteAll(); + searcherManager.maybeRefresh(); + } catch (IOException e) { + LOG.error("Unable to deleteAll: {}", e); + } + } + + + @Override + public T get(K key, String[] fieldsToLoad) { + + Set<String> fields; + if (fieldsToLoad != null) { + fields = new HashSet<>(fieldsToLoad.length); + fields.addAll(Arrays.asList(fieldsToLoad)); + } + else { + fields = new HashSet<>(); + fields.addAll(mapping.getLuceneFields()); + } + try { + final IndexSearcher s = searcherManager.acquire(); + TermQuery q = new TermQuery(new Term(mapping.getPrimaryKey(), key.toString())); + ScoreDoc[] hits = s.search(q, 2).scoreDocs; + if (hits.length > 0) { + Document doc = s.doc(hits[0].doc, fields); + LOG.debug("get:Document: {}", doc.toString()); + String[] a = {}; + return newInstance( doc, fields.toArray(a) ); + } + searcherManager.release(s); + } catch (IOException e) { + LOG.error("Error in get: {}", e); + } + return null; + } + + + public T newInstance(Document doc, String[] fields) throws IOException { + T persistent = newPersistent(); + if ( fields == null ) { + fields = fieldMap.keySet().toArray( new String[fieldMap.size()] ); + } + String pk = mapping.getPrimaryKey(); + + for ( String f : fields ) { + org.apache.avro.Schema.Field field = fieldMap.get( f ); + Schema fieldSchema = field.schema(); + String sf; + if ( pk.equals( f ) ) { + sf = f; + } else { + sf = mapping.getLuceneField(f); + } + Object v; + Object sv; + switch ( fieldSchema.getType() ) { + case MAP: + case ARRAY: + case RECORD: + case UNION: + sv = doc.getBinaryValue(sf); + if (sv == null) + continue; + BytesRef b = (BytesRef)sv; + v = IOUtils.deserialize(b.bytes, datumReader, (T) persistent.get( field.pos() )); + persistent.put( field.pos(), v ); + break; + case ENUM: + sv = doc.get(sf); + if (sv == null) + continue; + v = AvroUtils.getEnumValue( fieldSchema, (String) sv ); + persistent.put( field.pos(), v ); + break; + default: + sv = doc.get(sf); + if (sv == null) + continue; + put(persistent, field.pos(), fieldSchema.getType(), sv); + } + persistent.setDirty( field.pos() ); + } + persistent.clearDirty(); + return persistent; + } + + + void put(T p, int pos, Type t, Object o) { + switch(t) { + case FIXED: + // Could we combine this with the BYTES section below and + // either fix the size of the array or not depending on Type? + // This might be a buffer copy. Do we need to pad if the + // fixed sized data is smaller than the type? Do we truncate + // if the data is larger than the type? + LOG.error( "Fixed-sized fields are not supported yet" ); + break; + case BYTES: + p.put( pos, ByteBuffer.wrap( (byte[]) o ) ); + break; + case BOOLEAN: + p.put( pos, Boolean.parseBoolean((String)o) ); + break; + case DOUBLE: + p.put( pos, Double.parseDouble((String)o) ); + break; + case FLOAT: + p.put( pos, Float.parseFloat((String)o) ); + break; + case INT: + p.put( pos, Integer.parseInt((String)o) ); + break; + case LONG: + p.put( pos, Long.parseLong((String)o) ); + break; + case STRING: + p.put( pos, new Utf8( o.toString() ) ); + break; + default: + LOG.error( "Unknown field type: {}", t ); + } + } + + + @Override + public String getSchemaName() { + return "default"; + } + + @Override + public Query<K,T> newQuery() { + return new LuceneQuery<>( this ); + } + + @Override + public void put(K key, T persistent) { + Schema schema = persistent.getSchema(); + Document doc = new Document(); + + // populate the doc + List<org.apache.avro.Schema.Field> fields = schema.getFields(); + for ( org.apache.avro.Schema.Field field : fields ) { + if (!persistent.isDirty( field.name() )) { + continue; + } + String sf = mapping.getLuceneField( field.name() ); + if ( sf == null ) { + continue; + } + Schema fieldSchema = field.schema(); + Object v = persistent.get( field.pos() ); + if ( v == null ) { + continue; + } + switch ( fieldSchema.getType() ) { + case MAP: //TODO: These should be handled better + case ARRAY: + case RECORD: + case UNION: --- End diff -- Clarification question: So what are the other options for storing complex data types? I mean besides storing the complex data type as is. > Lucene datastore for Gora > ------------------------- > > Key: GORA-266 > URL: https://issues.apache.org/jira/browse/GORA-266 > Project: Apache Gora > Issue Type: New Feature > Components: gora-lucene > Reporter: Lewis John McGibbney > Assignee: Lewis John McGibbney > Priority: Minor > Labels: features > Fix For: 0.9 > > Attachments: GORA-266.patch > > > Sometimes we don't need to set up, administer, maintain and configure a > running solr server(s). > Sometime we just want to write to Lucene index files and read directly from > them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)