
ASF GitHub Bot commented on GORA-266:

Github user renato2099 commented on a diff in the pull request:

    --- Diff: 
gora-lucene/src/main/java/org/apache/gora/lucene/store/LuceneStore.java ---
    @@ -0,0 +1,475 @@
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.gora.lucene.store;
    +import org.apache.avro.Schema;
    +import org.apache.avro.Schema.Type;
    +import org.apache.avro.util.Utf8;
    +import org.apache.gora.lucene.query.LuceneQuery;
    +import org.apache.gora.lucene.query.LuceneResult;
    +import org.apache.gora.persistency.impl.PersistentBase;
    +import org.apache.gora.query.PartitionQuery;
    +import org.apache.gora.query.Query;
    +import org.apache.gora.query.Result;
    +import org.apache.gora.query.impl.FileSplitPartitionQuery;
    +import org.apache.gora.store.DataStore;
    +import org.apache.gora.store.DataStoreFactory;
    +import org.apache.gora.store.impl.FileBackedDataStoreBase;
    +import org.apache.gora.util.AvroUtils;
    +import org.apache.gora.util.GoraException;
    +import org.apache.gora.util.IOUtils;
    +import org.apache.gora.util.OperationNotSupportedException;
    +import org.apache.hadoop.conf.Configurable;
    +import org.apache.lucene.analysis.Analyzer;
    +import org.apache.lucene.analysis.standard.StandardAnalyzer;
    +import org.apache.lucene.document.Document;
    +import org.apache.lucene.document.Field.Store;
    +import org.apache.lucene.document.StoredField;
    +import org.apache.lucene.document.StringField;
    +import org.apache.lucene.index.IndexWriter;
    +import org.apache.lucene.index.IndexWriterConfig;
    +import org.apache.lucene.index.IndexWriterConfig.OpenMode;
    +import org.apache.lucene.index.Term;
    +import org.apache.lucene.search.IndexSearcher;
    +import org.apache.lucene.search.ScoreDoc;
    +import org.apache.lucene.search.SearcherFactory;
    +import org.apache.lucene.search.SearcherManager;
    +import org.apache.lucene.search.TermQuery;
    +import org.apache.lucene.store.Directory;
    +import org.apache.lucene.store.FSDirectory;
    +import org.apache.lucene.util.BytesRef;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.w3c.dom.Element;
    +import org.w3c.dom.NodeList;
    +import javax.xml.parsers.DocumentBuilder;
    +import javax.xml.parsers.DocumentBuilderFactory;
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.nio.file.FileSystems;
    +import java.util.Arrays;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Properties;
    +import java.util.Set;
    +public class LuceneStore<K, T extends PersistentBase> extends 
FileBackedDataStoreBase<K, T> implements Configurable {
    +  private static final Logger LOG = LoggerFactory.getLogger( 
LuceneStore.class );
    +  private static final String DEFAULT_MAPPING_FILE = 
    +  private static final String LUCENE_VERSION_KEY = 
    +  private static final String DEFAULT_LUCENE_VERSION = "LATEST";
    +  private static final String LUCENE_RAM_BUFFER_KEY = 
    +  private static final String DEFAULT_LUCENE_RAMBUFFER = "16";
    +  private LuceneMapping mapping;
    +  private IndexWriter writer;
    +  private SearcherManager searcherManager;
    +  @Override
    +  public void initialize(Class<K> keyClass, Class<T> persistentClass, 
Properties properties) {
    +    try {
    +      super.initialize(keyClass, persistentClass, properties);
    +    } catch (GoraException e1) {
    +      e1.printStackTrace();
    +    }
    +    String mappingFile = null;
    +    try {
    +      mappingFile = DataStoreFactory.getMappingFile(
    +              properties, (DataStore<?, ?>) this, DEFAULT_MAPPING_FILE );
    +    } catch (IOException e1) {
    +      e1.printStackTrace();
    +    }
    +    String luceneVersion = properties.getProperty(
    +    String ramBuffer = properties.getProperty(
    +    LOG.debug("Lucene index version: {}", luceneVersion);
    +    LOG.debug("Lucene index writer RAM buffer size: {}", ramBuffer);
    +    try {
    +      mapping = readMapping(mappingFile);
    +    } catch (IOException e1) {
    +      e1.printStackTrace();
    +    }
    +    try(Directory dir = 
FSDirectory.open(FileSystems.getDefault().getPath(outputPath))) {
    +      Analyzer analyzer = new StandardAnalyzer();
    +      IndexWriterConfig iwc = new IndexWriterConfig(analyzer);
    +      iwc.setOpenMode(OpenMode.CREATE_OR_APPEND);
    +      iwc.setRAMBufferSizeMB(Double.parseDouble(ramBuffer));
    +      writer = new IndexWriter(dir, iwc);
    +      //do we definately want all past deletions to be applied.
    +      searcherManager = new SearcherManager(writer, true, true, new 
    +    } catch (IOException e ) {
    +      LOG.error("Error opening {} with Lucene FSDirectory.", outputPath, 
    +    }
    +  }
    +  LuceneMapping readMapping(String filename) throws IOException {
    +    try {
    +      LuceneMapping mapping = new LuceneMapping();
    +      DocumentBuilder db = 
    +      org.w3c.dom.Document dom = 
    +      Element root = dom.getDocumentElement();
    +      NodeList nl = root.getElementsByTagName("class");
    +      for (int i = 0; i < nl.getLength(); i++) {
    +        Element classElement = (Element) nl.item(i);
    +        if 
    +                && 
classElement.getAttribute("name").equals(persistentClass.getCanonicalName())) {
    +          NodeList fields;
    +          Element fe;
    +          fields = classElement.getElementsByTagName("primarykey");
    +          for (int j = 0; j < fields.getLength(); j++) {
    +            fe = (Element) fields.item(j);
    +            mapping.setPrimaryKey(fe.getAttribute("column"));
    +          }
    +          fields = classElement.getElementsByTagName("field");
    +          for (int j = 0; j < fields.getLength(); j++) {
    +            fe = (Element) fields.item(j);
    +            String name = fe.getAttribute("name");
    +            String column = fe.getAttribute("column");
    +            mapping.addField(name, column);
    +          }
    +        }
    +      }
    +      return mapping;
    +    } catch (Exception ex) {
    +      throw new IOException("Unable to read " + filename, ex);
    +    }
    +  }
    +  @Override
    +  public boolean delete(K key) {
    +    try {
    +      writer.deleteDocuments(new Term(mapping.getPrimaryKey(), 
    +      searcherManager.maybeRefresh();
    +      return true;
    +    } catch (IOException e) {
    +      LOG.error("Unable to delete key: {}", key.toString(), e);
    +    }
    +    return false;
    +  }
    +  @Override
    +  public long deleteByQuery(Query<K,T> query) {
    +    try {
    +      // Figure out how many were there before
    +      LuceneQuery<K,T> q = (LuceneQuery<K,T>)query;
    +      LuceneResult<K,T> r = (LuceneResult<K,T>)q.execute();
    +      int before = r.getScoreDocs().length;
    +      // Delete them
    +      writer.deleteDocuments(q.toLuceneQuery());
    +      searcherManager.maybeRefresh();
    +      // Figure out how many there are after
    +      r = (LuceneResult<K,T>)q.execute();
    +      int after = r.getScoreDocs().length;
    +      return before - after;
    +    } catch (IOException e) {
    +      LOG.error("Unable to deleteByQuery: {}", query.toString(), e);
    +    }
    +    return 0;
    +  }
    +  @Override
    +  public void deleteSchema() {
    +    try {
    +      writer.deleteAll();
    +      searcherManager.maybeRefresh();
    +    } catch (IOException e) {
    +      LOG.error("Unable to deleteAll: {}", e);
    +    }
    +  }
    +  @Override
    +  public T get(K key, String[] fieldsToLoad) {
    +    Set<String> fields;
    +    if (fieldsToLoad != null) {
    +      fields = new HashSet<>(fieldsToLoad.length);
    +      fields.addAll(Arrays.asList(fieldsToLoad));
    +    }
    +    else {
    +      fields = new HashSet<>();
    +      fields.addAll(mapping.getLuceneFields());
    +    }
    +    try {
    +      final IndexSearcher s = searcherManager.acquire();
    +      TermQuery q = new TermQuery(new Term(mapping.getPrimaryKey(), 
    +      ScoreDoc[] hits = s.search(q, 2).scoreDocs;
    +      if (hits.length > 0) {
    +        Document doc = s.doc(hits[0].doc, fields);
    +        LOG.debug("get:Document: {}", doc.toString());
    +        String[] a = {};
    +        return newInstance( doc, fields.toArray(a) );
    +      }
    +      searcherManager.release(s);
    +    } catch (IOException e) {
    +      LOG.error("Error in get: {}", e);
    +    }
    +    return null;
    +  }
    +  public T newInstance(Document doc, String[] fields) throws IOException {
    +    T persistent = newPersistent();
    +    if ( fields == null ) {
    +      fields = fieldMap.keySet().toArray( new String[fieldMap.size()] );
    +    }
    +    String pk = mapping.getPrimaryKey();
    +    for ( String f : fields ) {
    +      org.apache.avro.Schema.Field field = fieldMap.get( f );
    +      Schema fieldSchema = field.schema();
    +      String sf;
    +      if ( pk.equals( f ) ) {
    +        sf = f;
    +      } else {
    +        sf = mapping.getLuceneField(f);
    +      }
    +      Object v;
    +      Object sv;
    +      switch ( fieldSchema.getType() ) {
    +        case MAP:
    +        case ARRAY:
    +        case RECORD:
    +        case UNION:
    +        sv = doc.getBinaryValue(sf);
    +        if (sv == null)
    +          continue;
    +        BytesRef b = (BytesRef)sv;
    +        v = IOUtils.deserialize(b.bytes, datumReader, (T) persistent.get( 
field.pos() ));
    +        persistent.put( field.pos(), v );
    +        break;
    +        case ENUM:
    +        sv = doc.get(sf);
    +        if (sv == null)
    +          continue;
    +        v = AvroUtils.getEnumValue( fieldSchema, (String) sv );
    +        persistent.put( field.pos(), v );
    +        break;
    +        default:
    +        sv = doc.get(sf);
    +        if (sv == null)
    +          continue;
    +        put(persistent, field.pos(), fieldSchema.getType(), sv);
    +      }
    +      persistent.setDirty( field.pos() );
    +    }
    +    persistent.clearDirty();
    +    return persistent;
    +  }
    +  void put(T p, int pos, Type t, Object o) {
    +    switch(t) {
    +      case FIXED:
    +      // Could we combine this with the BYTES section below and
    +      // either fix the size of the array or not depending on Type?
    +      // This might be a buffer copy. Do we need to pad if the
    +      // fixed sized data is smaller than the type? Do we truncate
    +      // if the data is larger than the type?
    +      LOG.error( "Fixed-sized fields are not supported yet" );
    +      break;
    +      case BYTES:
    +      p.put( pos, ByteBuffer.wrap( (byte[]) o ) );
    +      break;
    +      case BOOLEAN:
    +      p.put( pos, Boolean.parseBoolean((String)o) );
    +      break;
    +      case DOUBLE:
    +      p.put( pos, Double.parseDouble((String)o) );
    +      break;
    +      case FLOAT:
    +      p.put( pos, Float.parseFloat((String)o) );
    +      break;
    +      case INT:
    +      p.put( pos, Integer.parseInt((String)o) );
    +      break;
    +      case LONG:
    +      p.put( pos, Long.parseLong((String)o) );
    +      break;
    +      case STRING:
    +      p.put( pos, new Utf8( o.toString() ) );
    +      break;
    +      default:
    +      LOG.error( "Unknown field type: {}", t );
    +    }
    +  }
    +  @Override
    +  public String getSchemaName() {
    +    return "default";
    +  }
    +  @Override
    +  public Query<K,T> newQuery() {
    +    return new LuceneQuery<>( this );
    +  }
    +  @Override
    +  public void put(K key, T persistent) {
    +    Schema schema = persistent.getSchema();
    +    Document doc = new Document();
    +    // populate the doc
    +    List<org.apache.avro.Schema.Field> fields = schema.getFields();
    +    for ( org.apache.avro.Schema.Field field : fields ) {
    +      if (!persistent.isDirty( field.name() )) {
    +        continue;
    +      }
    +      String sf = mapping.getLuceneField( field.name() );
    +      if ( sf == null ) {
    +        continue;
    +      }
    +      Schema fieldSchema = field.schema();
    +      Object v = persistent.get( field.pos() );
    +      if ( v == null ) {
    +        continue;
    +      }
    +      switch ( fieldSchema.getType() ) {
    +        case MAP:   //TODO: These should be handled better
    +        case ARRAY:
    +        case RECORD:
    +        case UNION:
    --- End diff --
    Clarification question:
    So what are the other options for storing complex data types? I mean 
besides storing the complex data type as is.

> Lucene datastore for Gora
> -------------------------
>                 Key: GORA-266
>                 URL: https://issues.apache.org/jira/browse/GORA-266
>             Project: Apache Gora
>          Issue Type: New Feature
>          Components: gora-lucene
>            Reporter: Lewis John McGibbney
>            Assignee: Lewis John McGibbney
>            Priority: Minor
>              Labels: features
>             Fix For: 0.9
>         Attachments: GORA-266.patch
> Sometimes we don't need to set up, administer, maintain and configure a 
> running solr server(s).
> Sometime we just want to write to Lucene index files and read directly from 
> them. 

This message was sent by Atlassian JIRA

Reply via email to