[ 
https://issues.apache.org/jira/browse/JENA-1305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15925990#comment-15925990
 ] 

ASF GitHub Bot commented on JENA-1305:
--------------------------------------

Github user osma commented on a diff in the pull request:

    https://github.com/apache/jena/pull/227#discussion_r106146503
  
    --- Diff: 
jena-text/src/main/java/org/apache/jena/query/text/TextIndexES.java ---
    @@ -0,0 +1,427 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.jena.query.text;
    +
    +import org.apache.jena.graph.Node;
    +import org.apache.jena.graph.NodeFactory;
    +import org.apache.jena.sparql.util.NodeFactoryExtra;
    +import 
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
    +import 
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
    +import org.elasticsearch.action.get.GetResponse;
    +import org.elasticsearch.action.index.IndexRequest;
    +import org.elasticsearch.action.search.SearchResponse;
    +import org.elasticsearch.action.update.UpdateRequest;
    +import org.elasticsearch.action.update.UpdateResponse;
    +import org.elasticsearch.client.Client;
    +import org.elasticsearch.client.transport.TransportClient;
    +import org.elasticsearch.common.settings.Settings;
    +import org.elasticsearch.common.transport.InetSocketTransportAddress;
    +import org.elasticsearch.common.xcontent.XContentBuilder;
    +import org.elasticsearch.index.get.GetField;
    +import org.elasticsearch.index.query.QueryBuilders;
    +import org.elasticsearch.script.Script;
    +import org.elasticsearch.search.SearchHit;
    +import org.elasticsearch.transport.client.PreBuiltTransportClient;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.net.InetAddress;
    +import java.util.*;
    +
    +import static 
org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
    +
    +/**
    + * Elastic Search Implementation of {@link TextIndex}
    + *
    + */
    +public class TextIndexES implements TextIndex {
    +
    +    /**
    +     * The definition of the Entity we are trying to Index
    +     */
    +    private final EntityDefinition docDef ;
    +
    +    /**
    +     * Thread safe ElasticSearch Java Client to perform Index operations
    +     */
    +    private static Client client;
    +
    +    /**
    +     * The name of the index. Defaults to 'test'
    +     */
    +    private final String INDEX_NAME;
    +
    +    static final String CLUSTER_NAME = "cluster.name";
    +
    +    static final String NUM_OF_SHARDS = "number_of_shards";
    +
    +    static final String NUM_OF_REPLICAS = "number_of_replicas";
    +
    +    private boolean isMultilingual ;
    +
    +    private static final Logger LOGGER      = 
LoggerFactory.getLogger(TextIndexES.class) ;
    +
    +    public TextIndexES(TextIndexConfig config, ESSettings esSettings) 
throws Exception{
    +
    +        this.INDEX_NAME = esSettings.getIndexName();
    +        this.docDef = config.getEntDef();
    +
    +
    +        this.isMultilingual = config.isMultilingualSupport();
    +        if (this.isMultilingual &&  config.getEntDef().getLangField() == 
null) {
    +            //multilingual index cannot work without lang field
    +            docDef.setLangField("lang");
    +        }
    +        if(client == null) {
    +
    +            LOGGER.debug("Initializing the Elastic Search Java Client with 
settings: " + esSettings);
    +            Settings settings = Settings.builder()
    +                    .put(CLUSTER_NAME, 
esSettings.getClusterName()).build();
    +            List<InetSocketTransportAddress> addresses = new ArrayList<>();
    +            for(String host: esSettings.getHostToPortMapping().keySet()) {
    +                InetSocketTransportAddress addr = new 
InetSocketTransportAddress(InetAddress.getByName(host), 
esSettings.getHostToPortMapping().get(host));
    +                addresses.add(addr);
    +            }
    +
    +            InetSocketTransportAddress socketAddresses[] = new 
InetSocketTransportAddress[addresses.size()];
    +            client = new 
PreBuiltTransportClient(settings).addTransportAddresses(addresses.toArray(socketAddresses));
    +            LOGGER.debug("Successfully initialized the client");
    +        }
    +
    +
    +        IndicesExistsResponse exists = client.admin().indices().exists(new 
IndicesExistsRequest(INDEX_NAME)).get();
    +        if(!exists.isExists()) {
    +            Settings indexSettings = Settings.builder()
    +                    .put(NUM_OF_SHARDS, esSettings.getShards())
    +                    .put(NUM_OF_REPLICAS, esSettings.getReplicas())
    +                    .build();
    +            LOGGER.debug("Index with name " + INDEX_NAME + " does not 
exist yet. Creating one with settings: " + indexSettings.toString());
    +            
client.admin().indices().prepareCreate(INDEX_NAME).setSettings(indexSettings).get();
    +        }
    +
    +
    +
    +    }
    +
    +
    +    /**
    +     * Constructor used mainly for performing Integration tests
    +     * @param config an instance of {@link TextIndexConfig}
    +     * @param client an instance of {@link TransportClient}. The client 
should already have been initialized with an index
    +     */
    +    public TextIndexES(TextIndexConfig config, Client client, String 
indexName) {
    +        this.docDef = config.getEntDef();
    +        this.isMultilingual = true;
    +        this.client = client;
    +        this.INDEX_NAME = indexName;
    +    }
    +
    +    /**
    +     * We do not have any specific logic to perform before committing
    +     */
    +    @Override
    +    public void prepareCommit() {
    +        //Do Nothing
    +
    +    }
    +
    +    /**
    +     * Commit happens in the individual get/add/delete operations
    +     */
    +    @Override
    +    public void commit() {
    +        // Do Nothing
    +    }
    +
    +    /**
    +     * not really sure what we need to roll back.
    +     */
    +    @Override
    +    public void rollback() {
    +       //Not sure what to do here
    +
    +    }
    +
    +    /**
    +     * We don't have resources that need to be closed explicitely
    +     */
    +    @Override
    +    public void close() {
    +        // Do Nothing
    +
    +    }
    +
    +    /**
    +     * Update an Entity. Since we are doing Upserts in add entity anyways, 
we simply call {@link #addEntity(Entity)}
    +     * method that takes care of updating the Entity as well.
    +     * @param entity the entity to update.
    +     */
    +    @Override
    +    public void updateEntity(Entity entity) {
    +        //Since Add entity also updates the indexed document in case it 
already exists,
    +        // we can simply call the addEntity from here.
    +        addEntity(entity);
    +    }
    +
    +
    +    /**
    +     * Add an Entity to the ElasticSearch Index.
    +     * The entity will be added as a new document in ES, if it does not 
already exists.
    +     * If the Entity exists, then the entity will simply be updated.
    +     * The entity will never be replaced.
    +     * @param entity the entity to add
    +     */
    +    @Override
    +    public void addEntity(Entity entity) {
    +        LOGGER.debug("Adding/Updating the entity in ES");
    +
    +        //The field that has a not null value in the current Entity 
instance.
    +        //Required, mainly for building a script for the update command.
    +        String fieldToAdd = null;
    +        String fieldValueToAdd = "";
    +        try {
    +            XContentBuilder builder = jsonBuilder()
    +                    .startObject();
    +
    +            //Currently ignoring Graph field based indexing
    +//            if (docDef.getGraphField() != null) {
    +//                builder = builder.field(docDef.getGraphField(), 
entity.getGraph());
    +//            }
    +
    +            for(String field: docDef.fields()) {
    +                if(entity.get(field) != null) {
    +                    if(entity.getLanguage() != null && 
!entity.getLanguage().isEmpty() && isMultilingual) {
    +                        fieldToAdd = field + "_" + entity.getLanguage();
    +                    } else {
    +                        fieldToAdd = field;
    +                    }
    +
    +                    fieldValueToAdd = (String) entity.get(field);
    +                    builder = builder.field(fieldToAdd, 
Arrays.asList(fieldValueToAdd));
    +                    break;
    +                } else {
    +                    //We are making sure that the field is at-least added 
to the index.
    +                    //This will help us tremendously when we are appending 
the data later in an already indexed document.
    +                    builder = builder.field(field, 
Collections.emptyList());
    +                }
    +
    +            }
    +
    +            builder = builder.endObject();
    +            IndexRequest indexRequest = new IndexRequest(INDEX_NAME, 
docDef.getEntityField(), entity.getId())
    +                    .source(builder);
    +
    +            /**
    +             * We are creating an upsert request here instead of a simple 
insert request.
    +             * The reason is we want to add a document if it does not 
exist with the given Subject Id (URI).
    +             * But if the document exists with the same Subject Id, we 
want to do an update to it instead of deleting it and
    +             * then creating it with only the latest field values.
    +             * This functionality is called Upsert functionality and more 
can be learned about it here:
    +             * 
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html#upserts
    +             */
    +
    +            //First Search of the field exists or not
    +            SearchResponse existsResponse = 
client.prepareSearch(INDEX_NAME)
    +                    .setTypes(docDef.getEntityField())
    +                    .setQuery(QueryBuilders.existsQuery(fieldToAdd))
    +                    .get();
    +            String script;
    +            if(existsResponse != null && existsResponse.getHits() != null 
&& existsResponse.getHits().totalHits() > 0) {
    +                //This means field already exists and therefore we should 
append to it
    +                script = "ctx._source." + fieldToAdd+".add('"+ 
fieldValueToAdd + "')";
    +            } else {
    +                //The field does not exists. so we create one
    +                script = "ctx._source." + fieldToAdd+" =['"+ 
fieldValueToAdd + "']";
    +            }
    +
    +
    +
    +            UpdateRequest upReq = new UpdateRequest(INDEX_NAME, 
docDef.getEntityField(), entity.getId())
    +                    .script(new Script(script))
    +                    .upsert(indexRequest);
    +
    +            UpdateResponse response = client.update(upReq).get();
    +
    +            LOGGER.debug("Received the following Update response : " + 
response + " for the following entity: " + entity);
    +
    +        } catch(Exception e) {
    +            throw new TextIndexException("Unable to Index the Entity in 
ElasticSearch.", e);
    +        }
    +
    +
    +    }
    +
    +    /**
    +     * Delete an entity.
    +     * Since we are storing different predicate values within the same 
indexed document,
    +     * deleting the document using entity Id is sufficient to delete all 
the related contents for a given entity.
    +     * @param entity entity to delete
    +     */
    +    @Override
    +    public void deleteEntity(Entity entity) {
    +
    +        String fieldToRemove = null;
    +        String valueToRemove = null;
    +        for(String field : docDef.fields()) {
    +            if(entity.get(field) != null) {
    +                fieldToRemove = field;
    +                valueToRemove = (String)entity.get(field);
    +                break;
    +            }
    +        }
    +        //First Search of the field exists or not
    +        SearchResponse existsResponse = client.prepareSearch(INDEX_NAME)
    +                .setTypes(docDef.getEntityField())
    +                .setQuery(QueryBuilders.existsQuery(fieldToRemove))
    +                .get();
    +
    +        String script = null;
    +        if(existsResponse != null && existsResponse.getHits() != null && 
existsResponse.getHits().totalHits() > 0) {
    --- End diff --
    
    Similar to the update operation, this may cause a race condition if 
multiple deletes happen simultaneously. Would it be possible to do an atomic 
delete here, scripted in such a way that there is no error thrown even if the 
field value to delete doesn't exist in the index?


> Elastic Search Support for Apache Jena Text 
> --------------------------------------------
>
>                 Key: JENA-1305
>                 URL: https://issues.apache.org/jira/browse/JENA-1305
>             Project: Apache Jena
>          Issue Type: New Feature
>          Components: Text
>    Affects Versions: Jena 3.2.0
>            Reporter: Anuj Kumar
>            Assignee: Osma Suominen
>              Labels: elasticsearch
>   Original Estimate: 240h
>  Remaining Estimate: 240h
>
> This Jira tracks the development of Jena Text ElasticSearch Implementation.
> The goal is to extend Jena Text capability to index, at scale, in 
> ElasticSearch. This implementation would be similar to the Lucene and Solr 
> implementations.
> We will use ES version 5.2.1 for the implementation.
> The following functionalities would be supported:
> * Indexing Literal values
> * Updating indexed values
> * Deleting Indexed values
> * Custom Analyzer Support
> * Configuration using Assembler as well as Java techniques.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to