Repository: atlas
Updated Branches:
  refs/heads/master 9e9f024b4 -> a98d1bf0b


ATLAS-2920: Update JanusGraph Solr clients to use all zookeeper entries


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/a98d1bf0
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/a98d1bf0
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/a98d1bf0

Branch: refs/heads/master
Commit: a98d1bf0b3c6a354535dc4900869bcb1b2dd90ca
Parents: 9e9f024
Author: Sarath Subramanian <[email protected]>
Authored: Sun Oct 14 12:55:51 2018 -0700
Committer: Sarath Subramanian <[email protected]>
Committed: Sun Oct 14 12:55:51 2018 -0700

----------------------------------------------------------------------
 .../graphdb/janus/AtlasJanusGraphDatabase.java  |   46 +-
 .../janusgraph/diskstorage/solr/Solr6Index.java | 1193 ++++++++++++++++++
 2 files changed, 1233 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/a98d1bf0/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
----------------------------------------------------------------------
diff --git 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
index 47e561b..80e9cc3 100644
--- 
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
+++ 
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
@@ -18,6 +18,7 @@
 
 package org.apache.atlas.repository.graphdb.janus;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
@@ -30,19 +31,24 @@ import 
org.apache.atlas.typesystem.types.DataTypes.TypeCategory;
 import org.apache.atlas.utils.AtlasPerfTracer;
 import org.apache.commons.configuration.Configuration;
 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONMapper;
+import org.janusgraph.core.JanusGraph;
 import org.janusgraph.core.JanusGraphException;
-import 
org.janusgraph.graphdb.database.serialize.attribute.SerializableSerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.janusgraph.core.JanusGraphFactory;
-import org.janusgraph.core.JanusGraph;
 import org.janusgraph.core.schema.JanusGraphManagement;
+import org.janusgraph.diskstorage.StandardIndexProvider;
+import org.janusgraph.diskstorage.solr.Solr6Index;
+import 
org.janusgraph.graphdb.database.serialize.attribute.SerializableSerializer;
 import org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * Default implementation for Graph Provider that doles out JanusGraph.
@@ -59,6 +65,7 @@ public class AtlasJanusGraphDatabase implements 
GraphDatabase<AtlasJanusVertex,
     public static final String GRAPH_PREFIX         = "atlas.graph";
     public static final String INDEX_BACKEND_CONF   = "index.search.backend";
     public static final String SOLR_ZOOKEEPER_URL   = 
"atlas.graph.index.search.solr.zookeeper-url";
+    public static final String SOLR_ZOOKEEPER_URLS  = 
"atlas.graph.index.search.solr.zookeeper-urls";
     public static final String INDEX_BACKEND_LUCENE = "lucene";
     public static final String INDEX_BACKEND_ES     = "elasticsearch";
 
@@ -74,7 +81,10 @@ public class AtlasJanusGraphDatabase implements 
GraphDatabase<AtlasJanusVertex,
         startLocalSolr();
 
         Configuration configProperties = ApplicationProperties.get();
-        Configuration janusConfig      = 
ApplicationProperties.getSubsetConfiguration(configProperties, GRAPH_PREFIX);
+
+        configProperties.setProperty(SOLR_ZOOKEEPER_URLS, 
configProperties.getStringArray(SOLR_ZOOKEEPER_URL));
+
+        Configuration janusConfig = 
ApplicationProperties.getSubsetConfiguration(configProperties, GRAPH_PREFIX);
 
         //add serializers for non-standard property value types that Atlas uses
         
janusConfig.addProperty("attributes.custom.attribute1.attribute-class", 
TypeCategory.class.getName());
@@ -93,6 +103,30 @@ public class AtlasJanusGraphDatabase implements 
GraphDatabase<AtlasJanusVertex,
         return janusConfig;
     }
 
+    static {
+        addSolr6Index();
+    }
+
+    private static void addSolr6Index() {
+        try {
+            Field field = 
StandardIndexProvider.class.getDeclaredField("ALL_MANAGER_CLASSES");
+            field.setAccessible(true);
+
+            Field modifiersField = Field.class.getDeclaredField("modifiers");
+            modifiersField.setAccessible(true);
+            modifiersField.setInt(field, field.getModifiers() & 
~Modifier.FINAL);
+
+            Map<String, String> customMap = new 
HashMap<>(StandardIndexProvider.getAllProviderClasses());
+            customMap.put("solr", Solr6Index.class.getName());
+            ImmutableMap<String, String> immap = 
ImmutableMap.copyOf(customMap);
+            field.set(null, immap);
+
+            LOG.debug("Injected solr6 index - {}", Solr6Index.class.getName());
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     public static JanusGraph getGraphInstance() {
         if (graphInstance == null) {
             synchronized (AtlasJanusGraphDatabase.class) {

http://git-wip-us.apache.org/repos/asf/atlas/blob/a98d1bf0/graphdb/janus/src/main/java/org/janusgraph/diskstorage/solr/Solr6Index.java
----------------------------------------------------------------------
diff --git 
a/graphdb/janus/src/main/java/org/janusgraph/diskstorage/solr/Solr6Index.java 
b/graphdb/janus/src/main/java/org/janusgraph/diskstorage/solr/Solr6Index.java
new file mode 100644
index 0000000..1795943
--- /dev/null
+++ 
b/graphdb/janus/src/main/java/org/janusgraph/diskstorage/solr/Solr6Index.java
@@ -0,0 +1,1193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.janusgraph.diskstorage.solr;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpEntityEnclosingRequest;
+import org.apache.http.HttpException;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpRequestInterceptor;
+import org.apache.http.client.HttpClient;
+import org.apache.http.entity.BufferedHttpEntity;
+import org.apache.http.impl.auth.KerberosScheme;
+import org.apache.http.protocol.HttpContext;
+import org.apache.lucene.analysis.CachingTokenFilter;
+import org.apache.lucene.analysis.Tokenizer;
+import org.apache.lucene.analysis.tokenattributes.TermToBytesRefAttribute;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpClientUtil;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.impl.Krb5HttpClientBuilder;
+import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
+import org.apache.solr.client.solrj.impl.PreemptiveAuth;
+import org.apache.solr.client.solrj.impl.SolrHttpClientBuilder;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.client.solrj.util.ClientUtils;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.CommonParams;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.zookeeper.KeeperException;
+import org.janusgraph.core.Cardinality;
+import org.janusgraph.core.JanusGraphElement;
+import org.janusgraph.core.attribute.Cmp;
+import org.janusgraph.core.attribute.Geo;
+import org.janusgraph.core.attribute.Geoshape;
+import org.janusgraph.core.attribute.Text;
+import org.janusgraph.core.schema.Mapping;
+import org.janusgraph.core.schema.Parameter;
+import org.janusgraph.diskstorage.BackendException;
+import org.janusgraph.diskstorage.BaseTransaction;
+import org.janusgraph.diskstorage.BaseTransactionConfig;
+import org.janusgraph.diskstorage.BaseTransactionConfigurable;
+import org.janusgraph.diskstorage.PermanentBackendException;
+import org.janusgraph.diskstorage.TemporaryBackendException;
+import org.janusgraph.diskstorage.configuration.ConfigOption;
+import org.janusgraph.diskstorage.configuration.Configuration;
+import org.janusgraph.diskstorage.indexing.IndexEntry;
+import org.janusgraph.diskstorage.indexing.IndexFeatures;
+import org.janusgraph.diskstorage.indexing.IndexMutation;
+import org.janusgraph.diskstorage.indexing.IndexProvider;
+import org.janusgraph.diskstorage.indexing.IndexQuery;
+import org.janusgraph.diskstorage.indexing.KeyInformation;
+import org.janusgraph.diskstorage.indexing.RawQuery;
+import org.janusgraph.diskstorage.solr.transform.GeoToWktConverter;
+import org.janusgraph.diskstorage.util.DefaultTransaction;
+import org.janusgraph.graphdb.configuration.PreInitializeConfigOptions;
+import org.janusgraph.graphdb.database.serialize.AttributeUtil;
+import org.janusgraph.graphdb.internal.Order;
+import org.janusgraph.graphdb.query.JanusGraphPredicate;
+import org.janusgraph.graphdb.query.condition.And;
+import org.janusgraph.graphdb.query.condition.Condition;
+import org.janusgraph.graphdb.query.condition.Not;
+import org.janusgraph.graphdb.query.condition.Or;
+import org.janusgraph.graphdb.query.condition.PredicateCondition;
+import org.janusgraph.graphdb.types.ParameterType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.io.UncheckedIOException;
+import java.lang.reflect.Constructor;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.TimeZone;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.janusgraph.diskstorage.solr.SolrIndex.DYNAMIC_FIELDS;
+import static org.janusgraph.diskstorage.solr.SolrIndex.HTTP_ALLOW_COMPRESSION;
+import static 
org.janusgraph.diskstorage.solr.SolrIndex.HTTP_CONNECTION_TIMEOUT;
+import static 
org.janusgraph.diskstorage.solr.SolrIndex.HTTP_GLOBAL_MAX_CONNECTIONS;
+import static 
org.janusgraph.diskstorage.solr.SolrIndex.HTTP_MAX_CONNECTIONS_PER_HOST;
+import static org.janusgraph.diskstorage.solr.SolrIndex.HTTP_URLS;
+import static org.janusgraph.diskstorage.solr.SolrIndex.KERBEROS_ENABLED;
+import static org.janusgraph.diskstorage.solr.SolrIndex.KEY_FIELD_NAMES;
+import static org.janusgraph.diskstorage.solr.SolrIndex.MAX_SHARDS_PER_NODE;
+import static org.janusgraph.diskstorage.solr.SolrIndex.NUM_SHARDS;
+import static org.janusgraph.diskstorage.solr.SolrIndex.REPLICATION_FACTOR;
+import static org.janusgraph.diskstorage.solr.SolrIndex.SOLR_DEFAULT_CONFIG;
+import static org.janusgraph.diskstorage.solr.SolrIndex.SOLR_MODE;
+import static org.janusgraph.diskstorage.solr.SolrIndex.SOLR_NS;
+import static org.janusgraph.diskstorage.solr.SolrIndex.TTL_FIELD;
+import static org.janusgraph.diskstorage.solr.SolrIndex.WAIT_SEARCHER;
+import static 
org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_MAX_RESULT_SET_SIZE;
+
+/**
+ * @author Jared Holmberg ([email protected]), Pavel Yaskevich 
([email protected])
+ */
+
+/**
+ * NOTE: Copied from JanusGraph for supporting Kerberos and adding support for 
multiple zookeeper clients. Do not change
+ * This is a copy of SolrIndex.java from org.janusgraph.diskstorage.solr
+ */
+@PreInitializeConfigOptions
+public class Solr6Index implements IndexProvider {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(Solr6Index.class);
+
+
+    private static final String DEFAULT_ID_FIELD  = "id";
+    private static final char   CHROOT_START_CHAR = '/';
+
+    private enum Mode {
+        HTTP, CLOUD;
+
+        public static Mode parse(String mode) {
+            for (final Mode m : Mode.values()) {
+                if (m.toString().equalsIgnoreCase(mode)) return m;
+            }
+            throw new IllegalArgumentException("Unrecognized mode: "+mode);
+        }
+
+    }
+
+    public static final ConfigOption<String[]> ZOOKEEPER_URLS = new 
ConfigOption<>(SOLR_NS,"zookeeper-urls",
+            "URL of the Zookeeper instance coordinating the SolrCloud cluster",
+            ConfigOption.Type.MASKABLE, new String[]{"localhost:2181"});
+
+    private static final IndexFeatures SOLR_FEATURES = new 
IndexFeatures.Builder()
+            .supportsDocumentTTL()
+            .setDefaultStringMapping(Mapping.TEXT)
+            .supportedStringMappings(Mapping.TEXT, Mapping.STRING)
+            .supportsCardinality(Cardinality.SINGLE)
+            .supportsCardinality(Cardinality.LIST)
+            .supportsCardinality(Cardinality.SET)
+            .supportsCustomAnalyzer()
+            .supportsGeoContains()
+            .build();
+
+    private static final Map<Geo, String> SPATIAL_PREDICATES = 
spatialPredicates();
+
+    private final SolrClient solrClient;
+    private final Configuration configuration;
+    private final Mode mode;
+    private final boolean dynFields;
+    private final Map<String, String> keyFieldIds;
+    private final String ttlField;
+    private final int batchSize;
+    private final boolean waitSearcher;
+    private final boolean kerberosEnabled;
+
+    public Solr6Index(final Configuration config) throws BackendException {
+        Preconditions.checkArgument(config!=null);
+        configuration = config;
+        mode = Mode.parse(config.get(SOLR_MODE));
+        kerberosEnabled = config.get(KERBEROS_ENABLED);
+        dynFields = config.get(DYNAMIC_FIELDS);
+        keyFieldIds = parseKeyFieldsForCollections(config);
+        batchSize = config.get(INDEX_MAX_RESULT_SET_SIZE);
+        ttlField = config.get(TTL_FIELD);
+        waitSearcher = config.get(WAIT_SEARCHER);
+
+        if (kerberosEnabled) {
+            logger.debug("Kerberos is enabled. Configuring SOLR for 
Kerberos.");
+            configureSolrClientsForKerberos();
+        } else {
+            logger.debug("Kerberos is NOT enabled.");
+            logger.debug("KERBEROS_ENABLED name is " + 
KERBEROS_ENABLED.getName() + " and it is" + (KERBEROS_ENABLED.isOption() ? " " 
: " not") + " an option.");
+            logger.debug("KERBEROS_ENABLED type is " + 
KERBEROS_ENABLED.getType().name());
+        }
+        final ModifiableSolrParams clientParams = new ModifiableSolrParams();
+        switch (mode) {
+            case CLOUD:
+                /* ATLAS-2920: Update JanusGraph Solr clients to use all 
zookeeper entries – start */
+                final List<String> zookeeperUrls = getZookeeperURLs(config);
+                /* ATLAS-2920: end */
+                final CloudSolrClient cloudServer = new 
CloudSolrClient.Builder()
+                        .withLBHttpSolrClientBuilder(
+                                new LBHttpSolrClient.Builder()
+                                        .withHttpSolrClientBuilder(new 
HttpSolrClient.Builder().withInvariantParams(clientParams))
+                                        
.withBaseSolrUrls(config.get(HTTP_URLS))
+                        )
+                        .withZkHost(zookeeperUrls)
+                        .sendUpdatesOnlyToShardLeaders()
+                        .build();
+                cloudServer.connect();
+                solrClient = cloudServer;
+                break;
+            case HTTP:
+                clientParams.add(HttpClientUtil.PROP_ALLOW_COMPRESSION, 
config.get(HTTP_ALLOW_COMPRESSION).toString());
+                clientParams.add(HttpClientUtil.PROP_CONNECTION_TIMEOUT, 
config.get(HTTP_CONNECTION_TIMEOUT).toString());
+                clientParams.add(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 
config.get(HTTP_MAX_CONNECTIONS_PER_HOST).toString());
+                clientParams.add(HttpClientUtil.PROP_MAX_CONNECTIONS, 
config.get(HTTP_GLOBAL_MAX_CONNECTIONS).toString());
+                final HttpClient client = 
HttpClientUtil.createClient(clientParams);
+                solrClient = new LBHttpSolrClient.Builder()
+                        .withHttpClient(client)
+                        .withBaseSolrUrls(config.get(HTTP_URLS))
+                        .build();
+
+
+                break;
+            default:
+                throw new IllegalArgumentException("Unsupported Solr operation 
mode: " + mode);
+        }
+    }
+
+    private void configureSolrClientsForKerberos() throws 
PermanentBackendException {
+        String kerberosConfig = 
System.getProperty("java.security.auth.login.config");
+        if(kerberosConfig == null) {
+            throw new PermanentBackendException("Unable to configure kerberos 
for solr client. System property 'java.security.auth.login.config' is not 
set.");
+        }
+        logger.debug("Using kerberos configuration file located at '{}'.", 
kerberosConfig);
+        try(Krb5HttpClientBuilder krbBuild = new Krb5HttpClientBuilder()) {
+
+            SolrHttpClientBuilder kb = krbBuild.getBuilder();
+            HttpClientUtil.setHttpClientBuilder(kb);
+            HttpRequestInterceptor bufferedEntityInterceptor = new 
HttpRequestInterceptor() {
+                @Override
+                public void process(HttpRequest request, HttpContext context) 
throws HttpException, IOException {
+                    if(request instanceof HttpEntityEnclosingRequest) {
+                        HttpEntityEnclosingRequest enclosingRequest = 
((HttpEntityEnclosingRequest) request);
+                        HttpEntity requestEntity = 
enclosingRequest.getEntity();
+                        enclosingRequest.setEntity(new 
BufferedHttpEntity(requestEntity));
+                    }
+                }
+            };
+            HttpClientUtil.addRequestInterceptor(bufferedEntityInterceptor);
+
+            HttpRequestInterceptor preemptiveAuth = new PreemptiveAuth(new 
KerberosScheme());
+            HttpClientUtil.addRequestInterceptor(preemptiveAuth);
+        }
+    }
+
+    private Map<String, String> parseKeyFieldsForCollections(Configuration 
config) throws BackendException {
+        final Map<String, String> keyFieldNames = new HashMap<>();
+        final String[] collectionFieldStatements = config.has(KEY_FIELD_NAMES) 
? config.get(KEY_FIELD_NAMES) : new String[0];
+        for (final String collectionFieldStatement : 
collectionFieldStatements) {
+            final String[] parts = collectionFieldStatement.trim().split("=");
+            if (parts.length != 2) {
+                throw new PermanentBackendException(
+                        "Unable to parse the collection name / key field name 
pair. It should be of the format collection=field");
+            }
+            final String collectionName = parts[0];
+            final String keyFieldName = parts[1];
+            keyFieldNames.put(collectionName, keyFieldName);
+        }
+        return keyFieldNames;
+    }
+
+    private String getKeyFieldId(String collection) {
+        String field = keyFieldIds.get(collection);
+        if (field==null) field = DEFAULT_ID_FIELD;
+        return field;
+    }
+
+    /**
+     * Unlike the ElasticSearch Index, which is schema free, Solr requires a 
schema to
+     * support searching. This means that you will need to modify the solr 
schema with the
+     * appropriate field definitions in order to work properly.  If you have a 
running instance
+     * of Solr and you modify its schema with new fields, don't forget to 
re-index!
+     * @param store Index store
+     * @param key New key to register
+     * @param information data type to register for the key
+     * @param tx enclosing transaction
+     * @throws BackendException in case an exception is thrown when
+     * creating a collection.
+     */
+    @SuppressWarnings("unchecked")
+    @Override
+    public void register(String store, String key, KeyInformation information, 
BaseTransaction tx)
+            throws BackendException {
+        if (mode== Mode.CLOUD) {
+            final CloudSolrClient client = (CloudSolrClient) solrClient;
+            try {
+                createCollectionIfNotExists(client, configuration, store);
+            } catch (final IOException | SolrServerException | 
InterruptedException | KeeperException e) {
+                throw new PermanentBackendException(e);
+            }
+        }
+        //Since all data types must be defined in the schema.xml, 
pre-registering a type does not work
+        //But we check Analyse feature
+        String analyzer = 
ParameterType.STRING_ANALYZER.findParameter(information.getParameters(), null);
+        if (analyzer != null) {
+            //If the key have a tokenizer, we try to get it by reflection
+            try {
+                ((Constructor<Tokenizer>) 
ClassLoader.getSystemClassLoader().loadClass(analyzer)
+                        .getConstructor()).newInstance();
+            } catch (final ReflectiveOperationException e) {
+                throw new PermanentBackendException(e.getMessage(),e);
+            }
+        }
+        analyzer = 
ParameterType.TEXT_ANALYZER.findParameter(information.getParameters(), null);
+        if (analyzer != null) {
+            //If the key have a tokenizer, we try to get it by reflection
+            try {
+                ((Constructor<Tokenizer>) 
ClassLoader.getSystemClassLoader().loadClass(analyzer)
+                        .getConstructor()).newInstance();
+            } catch (final ReflectiveOperationException e) {
+                throw new PermanentBackendException(e.getMessage(),e);
+            }
+        }
+    }
+
+    @Override
+    public void mutate(Map<String, Map<String, IndexMutation>> mutations, 
KeyInformation.IndexRetriever information,
+                       BaseTransaction tx) throws BackendException {
+        logger.debug("Mutating SOLR");
+        try {
+            for (final Map.Entry<String, Map<String, IndexMutation>> stores : 
mutations.entrySet()) {
+                final String collectionName = stores.getKey();
+                final String keyIdField = getKeyFieldId(collectionName);
+
+                final List<String> deleteIds = new ArrayList<>();
+                final Collection<SolrInputDocument> changes = new 
ArrayList<>();
+
+                for (final Map.Entry<String, IndexMutation> entry : 
stores.getValue().entrySet()) {
+                    final String docId = entry.getKey();
+                    final IndexMutation mutation = entry.getValue();
+                    Preconditions.checkArgument(!(mutation.isNew() && 
mutation.isDeleted()));
+                    Preconditions.checkArgument(!mutation.isNew() || 
!mutation.hasDeletions());
+                    Preconditions.checkArgument(!mutation.isDeleted() || 
!mutation.hasAdditions());
+
+                    //Handle any deletions
+                    if (mutation.hasDeletions()) {
+                        if (mutation.isDeleted()) {
+                            logger.trace("Deleting entire document {}", docId);
+                            deleteIds.add(docId);
+                        } else {
+                            final List<IndexEntry> fieldDeletions = new 
ArrayList<>(mutation.getDeletions());
+                            if (mutation.hasAdditions()) {
+                                for (final IndexEntry indexEntry : 
mutation.getAdditions()) {
+                                    fieldDeletions.remove(indexEntry);
+                                }
+                            }
+                            handleRemovalsFromIndex(collectionName, 
keyIdField, docId, fieldDeletions, information);
+                        }
+                    }
+
+                    if (mutation.hasAdditions()) {
+                        final int ttl = mutation.determineTTL();
+
+                        final SolrInputDocument doc = new SolrInputDocument();
+                        doc.setField(keyIdField, docId);
+
+                        final boolean isNewDoc = mutation.isNew();
+
+                        if (isNewDoc)
+                            logger.trace("Adding new document {}", docId);
+                        final Map<String, Object> adds = 
collectFieldValues(mutation.getAdditions(), collectionName,
+                                information);
+                        // If cardinality is not single then we should use the 
"add" operation to update
+                        // the index so we don't overwrite existing values.
+                        adds.keySet().forEach(v-> {
+                            final KeyInformation keyInformation = 
information.get(collectionName, v);
+                            final String solrOp = 
keyInformation.getCardinality() == Cardinality.SINGLE ? "set" : "add";
+                            doc.setField(v, isNewDoc ? adds.get(v) :
+                                    new HashMap<String, Object>(1) 
{{put(solrOp, adds.get(v));}}
+                            );
+                        });
+                        if (ttl>0) {
+                            Preconditions.checkArgument(isNewDoc,
+                                    "Solr only supports TTL on new documents 
[%s]", docId);
+                            doc.setField(ttlField, String.format("+%dSECONDS", 
ttl));
+                        }
+                        changes.add(doc);
+                    }
+                }
+
+                commitDeletes(collectionName, deleteIds);
+                commitChanges(collectionName, changes);
+            }
+        } catch (final IllegalArgumentException e) {
+            throw new PermanentBackendException("Unable to complete query on 
Solr.", e);
+        } catch (final Exception e) {
+            throw storageException(e);
+        }
+    }
+
+    private void handleRemovalsFromIndex(String collectionName, String 
keyIdField, String docId,
+                                         List<IndexEntry> fieldDeletions, 
KeyInformation.IndexRetriever information)
+            throws SolrServerException, IOException, BackendException {
+        final Map<String, String> fieldDeletes = new HashMap<>(1);
+        fieldDeletes.put("set", null);
+        final SolrInputDocument doc = new SolrInputDocument();
+        doc.addField(keyIdField, docId);
+        for(final IndexEntry v: fieldDeletions) {
+            final KeyInformation keyInformation = 
information.get(collectionName, v.field);
+            // If the cardinality is a Set or List, we just need to remove the 
individual value
+            // received in the mutation and not set the field to null, but we 
still consolidate the values
+            // in the event of multiple removals in one mutation.
+            final Map<String, Object> deletes = 
collectFieldValues(fieldDeletions, collectionName, information);
+            deletes.keySet().forEach(vertex -> {
+                final Map<String, Object> remove;
+                if (keyInformation.getCardinality() == Cardinality.SINGLE) {
+                    remove = (Map) fieldDeletes;
+                } else {
+                    remove = new HashMap<>(1);
+                    remove.put("remove", deletes.get(vertex));
+                }
+                doc.setField(vertex, remove);
+            });
+        }
+
+        final UpdateRequest singleDocument = newUpdateRequest();
+        singleDocument.add(doc);
+        solrClient.request(singleDocument, collectionName);
+
+    }
+
+    private Object convertValue(Object value) throws BackendException {
+        if (value instanceof Geoshape) {
+            return GeoToWktConverter.convertToWktString((Geoshape) value);
+        }
+        if (value instanceof UUID) {
+            return value.toString();
+        }
+        if(value instanceof Instant) {
+            if(Math.floorMod(((Instant) value).getNano(), 1000000) != 0) {
+                throw new IllegalArgumentException("Solr indexes do not 
support nanoseconds");
+            }
+            return new Date(((Instant) value).toEpochMilli());
+        }
+        return value;
+    }
+
+    @Override
+    public void restore(Map<String, Map<String, List<IndexEntry>>> documents,
+                        KeyInformation.IndexRetriever information, 
BaseTransaction tx) throws BackendException {
+        try {
+            for (final Map.Entry<String, Map<String, List<IndexEntry>>> stores 
: documents.entrySet()) {
+                final String collectionName = stores.getKey();
+
+                final List<String> deleteIds = new ArrayList<>();
+                final List<SolrInputDocument> newDocuments = new ArrayList<>();
+
+                for (final Map.Entry<String, List<IndexEntry>> entry : 
stores.getValue().entrySet()) {
+                    final String docID = entry.getKey();
+                    final List<IndexEntry> content = entry.getValue();
+
+                    if (content == null || content.isEmpty()) {
+                        if (logger.isTraceEnabled())
+                            logger.trace("Deleting document [{}]", docID);
+
+                        deleteIds.add(docID);
+                        continue;
+                    }
+                    final SolrInputDocument doc = new SolrInputDocument();
+                    doc.setField(getKeyFieldId(collectionName), docID);
+                    final Map<String, Object> adds = 
collectFieldValues(content, collectionName, information);
+                    adds.forEach(doc::setField);
+                    newDocuments.add(doc);
+                }
+                commitDeletes(collectionName, deleteIds);
+                commitChanges(collectionName, newDocuments);
+            }
+        } catch (final Exception e) {
+            throw new TemporaryBackendException("Could not restore Solr 
index", e);
+        }
+    }
+
+    // This method will create a map of field ids to values.  In the case of 
multiValued fields,
+    // it will consolidate all the values into one List or Set so it can be 
updated with a single Solr operation
+    private Map<String, Object> collectFieldValues(List<IndexEntry> content, 
String collectionName,
+                                                   
KeyInformation.IndexRetriever information) throws BackendException {
+        final Map<String, Object> docs = new HashMap<>();
+        for (final IndexEntry addition: content) {
+            final KeyInformation keyInformation = 
information.get(collectionName, addition.field);
+            switch (keyInformation.getCardinality()) {
+                case SINGLE:
+                    docs.put(addition.field, convertValue(addition.value));
+                    break;
+                case SET:
+                    if (!docs.containsKey(addition.field)) {
+                        docs.put(addition.field, new HashSet<>());
+                    }
+                    ((Set<Object>) 
docs.get(addition.field)).add(convertValue(addition.value));
+                    break;
+                case LIST:
+                    if (!docs.containsKey(addition.field)) {
+                        docs.put(addition.field,  new ArrayList<>());
+                    }
+                    ((List<Object>) 
docs.get(addition.field)).add(convertValue(addition.value));
+                    break;
+            }
+        }
+        return docs;
+    }
+
+    private void commitChanges(String collectionName,
+                               Collection<SolrInputDocument> documents) throws 
SolrServerException, IOException {
+        if (documents.size() == 0) return;
+
+        try {
+            solrClient.request(newUpdateRequest().add(documents), 
collectionName);
+        } catch (final HttpSolrClient.RemoteSolrException rse) {
+            logger.error("Unable to save documents to Solr as one of the shape 
objects stored were not compatible with Solr.", rse);
+            logger.error("Details in failed document batch: ");
+            for (final SolrInputDocument d : documents) {
+                final Collection<String> fieldNames = d.getFieldNames();
+                for (final String name : fieldNames) {
+                    logger.error(name + ":" + d.getFieldValue(name));
+                }
+            }
+
+            throw rse;
+        }
+    }
+
+    private void commitDeletes(String collectionName, List<String> deleteIds) 
throws SolrServerException, IOException {
+        if (deleteIds.size() == 0) return;
+        solrClient.request(newUpdateRequest().deleteById(deleteIds), 
collectionName);
+    }
+
+    @Override
+    public Stream<String> query(IndexQuery query, 
KeyInformation.IndexRetriever information,
+                                BaseTransaction tx) throws BackendException {
+        final String collection = query.getStore();
+        final String keyIdField = getKeyFieldId(collection);
+        final SolrQuery solrQuery = new SolrQuery("*:*");
+        solrQuery.set(CommonParams.FL, keyIdField);
+        final String queryFilter = buildQueryFilter(query.getCondition(), 
information.get(collection));
+        solrQuery.addFilterQuery(queryFilter);
+        if (!query.getOrder().isEmpty()) {
+            final List<IndexQuery.OrderEntry> orders = query.getOrder();
+            for (final IndexQuery.OrderEntry order1 : orders) {
+                final String item = order1.getKey();
+                final SolrQuery.ORDER order = order1.getOrder() == Order.ASC ? 
SolrQuery.ORDER.asc : SolrQuery.ORDER.desc;
+                solrQuery.addSort(new SolrQuery.SortClause(item, order));
+            }
+        }
+        solrQuery.setStart(0);
+        if (query.hasLimit()) {
+            solrQuery.setRows(Math.min(query.getLimit(), batchSize));
+        } else {
+            solrQuery.setRows(batchSize);
+        }
+        return executeQuery(query.hasLimit() ? query.getLimit() : null, 0, 
collection, solrQuery,
+                doc -> doc.getFieldValue(keyIdField).toString());
+    }
+
+    private <E> Stream<E> executeQuery(Integer limit, int offset, String 
collection, SolrQuery solrQuery,
+                                       Function<SolrDocument, E> function) 
throws PermanentBackendException {
+        try {
+            final SolrResultIterator<E> resultIterator = new 
SolrResultIterator<>(solrClient, limit, offset,
+                    solrQuery.getRows(), collection, solrQuery, function);
+            return 
StreamSupport.stream(Spliterators.spliteratorUnknownSize(resultIterator, 
Spliterator.ORDERED),
+                    false);
+        } catch (final IOException | UncheckedIOException e) {
+            logger.error("Query did not complete : ", e);
+            throw new PermanentBackendException(e);
+        } catch (final SolrServerException | UncheckedSolrException e) {
+            logger.error("Unable to query Solr index.", e);
+            throw new PermanentBackendException(e);
+        }
+    }
+
+
+    private SolrQuery runCommonQuery(RawQuery query, 
KeyInformation.IndexRetriever information, BaseTransaction tx,
+                                     String collection, String keyIdField) 
throws BackendException {
+        final SolrQuery solrQuery = new SolrQuery(query.getQuery())
+                .addField(keyIdField)
+                .setIncludeScore(true)
+                .setStart(query.getOffset());
+        if (query.hasLimit()) {
+            solrQuery.setRows(Math.min(query.getLimit(), batchSize));
+        } else {
+            solrQuery.setRows(batchSize);
+        }
+
+        for(final Parameter parameter: query.getParameters()) {
+            if (parameter.value() instanceof String[]) {
+                solrQuery.setParam(parameter.key(), (String[]) 
parameter.value());
+            } else if (parameter.value() instanceof String) {
+                solrQuery.setParam(parameter.key(), (String) 
parameter.value());
+            }
+        }
+        return solrQuery;
+    }
+
+    @Override
+    public Stream<RawQuery.Result<String>> query(RawQuery query, 
KeyInformation.IndexRetriever information,
+                                                 BaseTransaction tx) throws 
BackendException {
+        final String collection = query.getStore();
+        final String keyIdField = getKeyFieldId(collection);
+        return executeQuery(query.hasLimit() ? query.getLimit() : null, 
query.getOffset(), collection,
+                runCommonQuery(query, information, tx, collection, 
keyIdField), doc -> {
+                    final double score = 
Double.parseDouble(doc.getFieldValue("score").toString());
+                    return new 
RawQuery.Result<>(doc.getFieldValue(keyIdField).toString(), score);
+                });
+    }
+
+    @Override
+    public Long totals(RawQuery query, KeyInformation.IndexRetriever 
information,
+                       BaseTransaction tx) throws BackendException {
+        try {
+            final String collection = query.getStore();
+            final String keyIdField = getKeyFieldId(collection);
+            final QueryResponse response = solrClient.query(collection, 
runCommonQuery(query, information, tx,
+                    collection, keyIdField));
+            logger.debug("Executed query [{}] in {} ms", query.getQuery(), 
response.getElapsedTime());
+            return response.getResults().getNumFound();
+        } catch (final IOException e) {
+            logger.error("Query did not complete : ", e);
+            throw new PermanentBackendException(e);
+        } catch (final SolrServerException e) {
+            logger.error("Unable to query Solr index.", e);
+            throw new PermanentBackendException(e);
+        }
+    }
+
+    private static String escapeValue(Object value) {
+        return ClientUtils.escapeQueryChars(value.toString());
+    }
+
+    public String buildQueryFilter(Condition<JanusGraphElement> condition, 
KeyInformation.StoreRetriever information) {
+        if (condition instanceof PredicateCondition) {
+            final PredicateCondition<String, JanusGraphElement> atom
+                    = (PredicateCondition<String, JanusGraphElement>) 
condition;
+            final Object value = atom.getValue();
+            final String key = atom.getKey();
+            final JanusGraphPredicate predicate = atom.getPredicate();
+
+            if (value instanceof Number) {
+                final String queryValue = escapeValue(value);
+                Preconditions.checkArgument(predicate instanceof Cmp,
+                        "Relation not supported on numeric types: " + 
predicate);
+                final Cmp numRel = (Cmp) predicate;
+                switch (numRel) {
+                    case EQUAL:
+                        return (key + ":" + queryValue);
+                    case NOT_EQUAL:
+                        return ("-" + key + ":" + queryValue);
+                    case LESS_THAN:
+                        //use right curly to mean up to but not including value
+                        return (key + ":[* TO " + queryValue + "}");
+                    case LESS_THAN_EQUAL:
+                        return (key + ":[* TO " + queryValue + "]");
+                    case GREATER_THAN:
+                        //use left curly to mean greater than but not 
including value
+                        return (key + ":{" + queryValue + " TO *]");
+                    case GREATER_THAN_EQUAL:
+                        return (key + ":[" + queryValue + " TO *]");
+                    default: throw new IllegalArgumentException("Unexpected 
relation: " + numRel);
+                }
+            } else if (value instanceof String) {
+                final Mapping map = getStringMapping(information.get(key));
+                assert map==Mapping.TEXT || map==Mapping.STRING;
+
+                if (map==Mapping.TEXT && 
!(Text.HAS_CONTAINS.contains(predicate) || predicate instanceof Cmp))
+                    throw new IllegalArgumentException("Text mapped string 
values only support CONTAINS and Compare queries and not: " + predicate);
+                if (map==Mapping.STRING && 
Text.HAS_CONTAINS.contains(predicate))
+                    throw new IllegalArgumentException("String mapped string 
values do not support CONTAINS queries: " + predicate);
+
+                //Special case
+                if (predicate == Text.CONTAINS) {
+                    return tokenize(information, value, key, predicate,
+                            
ParameterType.TEXT_ANALYZER.findParameter(information.get(key).getParameters(), 
null));
+                } else if (predicate == Text.PREFIX || predicate == 
Text.CONTAINS_PREFIX) {
+                    return (key + ":" + escapeValue(value) + "*");
+                } else if (predicate == Text.REGEX || predicate == 
Text.CONTAINS_REGEX) {
+                    return (key + ":/" + value + "/");
+                } else if (predicate == Cmp.EQUAL) {
+                    final String tokenizer =
+                            
ParameterType.STRING_ANALYZER.findParameter(information.get(key).getParameters(),
 null);
+                    if(tokenizer != null){
+                        return tokenize(information, value, key, 
predicate,tokenizer);
+                    } else {
+                        return (key + ":\"" + escapeValue(value) + "\"");
+                    }
+                } else if (predicate == Cmp.NOT_EQUAL) {
+                    return ("-" + key + ":\"" + escapeValue(value) + "\"");
+                } else if (predicate == Text.FUZZY || predicate == 
Text.CONTAINS_FUZZY) {
+                    return (key + ":"+escapeValue(value)+"~");
+                } else if (predicate == Cmp.LESS_THAN) {
+                    return (key + ":[* TO \"" + escapeValue(value) + "\"}");
+                } else if (predicate == Cmp.LESS_THAN_EQUAL) {
+                    return (key + ":[* TO \"" + escapeValue(value) + "\"]");
+                } else if (predicate == Cmp.GREATER_THAN) {
+                    return (key + ":{\"" + escapeValue(value) + "\" TO *]");
+                } else if (predicate == Cmp.GREATER_THAN_EQUAL) {
+                    return (key + ":[\"" + escapeValue(value) + "\" TO *]");
+                } else {
+                    throw new IllegalArgumentException("Relation is not 
supported for string value: " + predicate);
+                }
+            } else if (value instanceof Geoshape) {
+                final Mapping map = Mapping.getMapping(information.get(key));
+                Preconditions.checkArgument(predicate instanceof Geo && 
predicate != Geo.DISJOINT,
+                        "Relation not supported on geo types: " + predicate);
+                Preconditions.checkArgument(map == Mapping.PREFIX_TREE || 
predicate == Geo.WITHIN || predicate == Geo.INTERSECT,
+                        "Relation not supported on geopoint types: " + 
predicate);
+                final Geoshape geo = (Geoshape)value;
+                if (geo.getType() == Geoshape.Type.CIRCLE && (predicate == 
Geo.INTERSECT || map == Mapping.DEFAULT)) {
+                    final Geoshape.Point center = geo.getPoint();
+                    return ("{!geofilt sfield=" + key +
+                            " pt=" + center.getLatitude() + "," + 
center.getLongitude() +
+                            " d=" + geo.getRadius() + "} distErrPct=0"); 
//distance in kilometers
+                } else if (geo.getType() == Geoshape.Type.BOX && (predicate == 
Geo.INTERSECT || map == Mapping.DEFAULT)) {
+                    final Geoshape.Point southwest = geo.getPoint(0);
+                    final Geoshape.Point northeast = geo.getPoint(1);
+                    return (key + ":[" + southwest.getLatitude() + "," + 
southwest.getLongitude() +
+                            " TO " + northeast.getLatitude() + "," + 
northeast.getLongitude() + "]");
+                } else if (map == Mapping.PREFIX_TREE) {
+                    return key + ":\"" + SPATIAL_PREDICATES.get(predicate) + 
"(" + geo + ")\" distErrPct=0";
+                } else {
+                    throw new IllegalArgumentException("Unsupported or invalid 
search shape type: " + geo.getType());
+                }
+            } else if (value instanceof Date || value instanceof Instant) {
+                final String s = value.toString();
+                final String queryValue = escapeValue(value instanceof Date ? 
toIsoDate((Date) value) : value.toString());
+                Preconditions.checkArgument(predicate instanceof Cmp, 
"Relation not supported on date types: "
+                        + predicate);
+                final Cmp numRel = (Cmp) predicate;
+
+                switch (numRel) {
+                    case EQUAL:
+                        return (key + ":" + queryValue);
+                    case NOT_EQUAL:
+                        return ("-" + key + ":" + queryValue);
+                    case LESS_THAN:
+                        //use right curly to mean up to but not including value
+                        return (key + ":[* TO " + queryValue + "}");
+                    case LESS_THAN_EQUAL:
+                        return (key + ":[* TO " + queryValue + "]");
+                    case GREATER_THAN:
+                        //use left curly to mean greater than but not 
including value
+                        return (key + ":{" + queryValue + " TO *]");
+                    case GREATER_THAN_EQUAL:
+                        return (key + ":[" + queryValue + " TO *]");
+                    default: throw new IllegalArgumentException("Unexpected 
relation: " + numRel);
+                }
+            } else if (value instanceof Boolean) {
+                final Cmp numRel = (Cmp) predicate;
+                final String queryValue = escapeValue(value);
+                switch (numRel) {
+                    case EQUAL:
+                        return (key + ":" + queryValue);
+                    case NOT_EQUAL:
+                        return ("-" + key + ":" + queryValue);
+                    default:
+                        throw new IllegalArgumentException("Boolean types only 
support EQUAL or NOT_EQUAL");
+                }
+            } else if (value instanceof UUID) {
+                if (predicate == Cmp.EQUAL) {
+                    return (key + ":\"" + escapeValue(value) + "\"");
+                } else if (predicate == Cmp.NOT_EQUAL) {
+                    return ("-" + key + ":\"" + escapeValue(value) + "\"");
+                } else {
+                    throw new IllegalArgumentException("Relation is not 
supported for uuid value: " + predicate);
+                }
+            } else throw new IllegalArgumentException("Unsupported type: " + 
value);
+        } else if (condition instanceof Not) {
+            final String sub = 
buildQueryFilter(((Not)condition).getChild(),information);
+            if (StringUtils.isNotBlank(sub)) return "-("+sub+")";
+            else return "";
+        } else if (condition instanceof And) {
+            final int numChildren = ((And) condition).size();
+            final StringBuilder sb = new StringBuilder();
+            for (final Condition<JanusGraphElement> c : 
condition.getChildren()) {
+                final String sub = buildQueryFilter(c, information);
+
+                if (StringUtils.isBlank(sub))
+                    continue;
+
+                // we don't have to add "+" which means AND iff
+                // a. it's a NOT query,
+                // b. expression is a single statement in the AND.
+                if (!sub.startsWith("-") && numChildren > 1)
+                    sb.append("+");
+
+                sb.append(sub).append(" ");
+            }
+            return sb.toString();
+        } else if (condition instanceof Or) {
+            final StringBuilder sb = new StringBuilder();
+            int element=0;
+            for (final Condition<JanusGraphElement> c : 
condition.getChildren()) {
+                final String sub = buildQueryFilter(c,information);
+                if (StringUtils.isBlank(sub)) continue;
+                if (element==0) sb.append("(");
+                else sb.append(" OR ");
+                sb.append(sub);
+                element++;
+            }
+            if (element>0) sb.append(")");
+            return sb.toString();
+        } else {
+            throw new IllegalArgumentException("Invalid condition: " + 
condition);
+        }
+    }
+
+    private String tokenize(KeyInformation.StoreRetriever information, Object 
value, String key,
+                            JanusGraphPredicate janusgraphPredicate, String 
tokenizer) {
+        List<String> terms;
+        if(tokenizer != null){
+            terms = customTokenize(tokenizer, (String) value);
+        } else {
+            terms = Text.tokenize((String) value);
+        }
+        if (terms.isEmpty()) {
+            return "";
+        } else if (terms.size() == 1) {
+            return (key + ":(" + escapeValue(terms.get(0)) + ")");
+        } else {
+            final And<JanusGraphElement> andTerms = new And<>();
+            for (final String term : terms) {
+                andTerms.add(new PredicateCondition<>(key, 
janusgraphPredicate, term));
+            }
+            return buildQueryFilter(andTerms, information);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private List<String> customTokenize(String tokenizerClass, String value){
+        CachingTokenFilter stream = null;
+        try {
+            final List<String> terms = new ArrayList<>();
+            final Tokenizer tokenizer
+                    = ((Constructor<Tokenizer>) 
ClassLoader.getSystemClassLoader().loadClass(tokenizerClass)
+                    .getConstructor()).newInstance();
+            tokenizer.setReader(new StringReader(value));
+            stream = new CachingTokenFilter(tokenizer);
+            final TermToBytesRefAttribute termAtt = 
stream.getAttribute(TermToBytesRefAttribute.class);
+            stream.reset();
+            while (stream.incrementToken()) {
+                terms.add(termAtt.getBytesRef().utf8ToString());
+            }
+            return terms;
+        } catch ( ReflectiveOperationException | IOException e) {
+            throw new IllegalArgumentException(e.getMessage(),e);
+        } finally {
+            IOUtils.closeQuietly(stream);
+        }
+    }
+
+    private String toIsoDate(Date value) {
+        final TimeZone tz = TimeZone.getTimeZone("UTC");
+        final DateFormat df = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+        df.setTimeZone(tz);
+        return df.format(value);
+    }
+
+    /**
+     * Solr handles all transactions on the server-side. That means all
+     * commit, optimize, or rollback applies since the last 
commit/optimize/rollback.
+     * Solr documentation recommends best way to update Solr is in one process 
to avoid
+     * race conditions.
+     *
+     * @return New Transaction Handle
+     */
+    @Override
+    public BaseTransactionConfigurable beginTransaction(BaseTransactionConfig 
config) {
+        return new DefaultTransaction(config);
+    }
+
+    @Override
+    public void close() throws BackendException {
+        logger.trace("Shutting down connection to Solr", solrClient);
+        try {
+            solrClient.close();
+        } catch (final IOException e) {
+            throw new TemporaryBackendException(e);
+        }
+    }
+
+    @Override
+    public void clearStorage() throws BackendException {
+        try {
+            if (mode!= Mode.CLOUD) {
+                logger.error("Operation only supported for SolrCloud. Cores 
must be deleted manually through the Solr API when using HTTP mode.");
+                return;
+            }
+            logger.debug("Clearing storage from Solr: {}", solrClient);
+            final ZkStateReader zkStateReader = ((CloudSolrClient) 
solrClient).getZkStateReader();
+            zkStateReader.forciblyRefreshAllClusterStateSlow();
+            final ClusterState clusterState = zkStateReader.getClusterState();
+            for (final String collection : 
clusterState.getCollectionsMap().keySet()) {
+                logger.debug("Clearing collection [{}] in Solr",collection);
+                // Collection is not dropped because it may have been created 
externally
+                final UpdateRequest deleteAll = newUpdateRequest();
+                deleteAll.deleteByQuery("*:*");
+                solrClient.request(deleteAll, collection);
+            }
+
+        } catch (final SolrServerException e) {
+            logger.error("Unable to clear storage from index due to server 
error on Solr.", e);
+            throw new PermanentBackendException(e);
+        } catch (final IOException e) {
+            logger.error("Unable to clear storage from index due to low-level 
I/O error.", e);
+            throw new PermanentBackendException(e);
+        } catch (final Exception e) {
+            logger.error("Unable to clear storage from index due to general 
error.", e);
+            throw new PermanentBackendException(e);
+        }
+    }
+
+    @Override
+    public boolean supports(KeyInformation information, JanusGraphPredicate 
predicate) {
+        final Class<?> dataType = information.getDataType();
+        final Mapping mapping = Mapping.getMapping(information);
+        if (mapping!=Mapping.DEFAULT && !AttributeUtil.isString(dataType) &&
+                !(mapping==Mapping.PREFIX_TREE && 
AttributeUtil.isGeo(dataType))) return false;
+
+        if (Number.class.isAssignableFrom(dataType)) {
+            return predicate instanceof Cmp;
+        } else if (dataType == Geoshape.class) {
+            switch(mapping) {
+                case DEFAULT:
+                    return predicate == Geo.WITHIN || predicate == 
Geo.INTERSECT;
+                case PREFIX_TREE:
+                    return predicate == Geo.INTERSECT || predicate == 
Geo.WITHIN || predicate == Geo.CONTAINS;
+            }
+        } else if (AttributeUtil.isString(dataType)) {
+            switch(mapping) {
+                case DEFAULT:
+                case TEXT:
+                    return predicate == Text.CONTAINS || predicate == 
Text.CONTAINS_PREFIX
+                            || predicate == Text.CONTAINS_REGEX || predicate 
== Text.CONTAINS_FUZZY;
+                case STRING:
+                    return predicate instanceof Cmp || predicate==Text.REGEX 
|| predicate==Text.PREFIX  || predicate == Text.FUZZY;
+//                case TEXTSTRING:
+//                    return (janusgraphPredicate instanceof Text) || 
janusgraphPredicate == Cmp.EQUAL || janusgraphPredicate==Cmp.NOT_EQUAL;
+            }
+        } else if (dataType == Date.class || dataType == Instant.class) {
+            return predicate instanceof Cmp;
+        } else if (dataType == Boolean.class) {
+            return predicate == Cmp.EQUAL || predicate == Cmp.NOT_EQUAL;
+        } else if (dataType == UUID.class) {
+            return predicate == Cmp.EQUAL || predicate==Cmp.NOT_EQUAL;
+        }
+        return false;
+    }
+
+    @Override
+    public boolean supports(KeyInformation information) {
+        final Class<?> dataType = information.getDataType();
+        final Mapping mapping = Mapping.getMapping(information);
+        if (Number.class.isAssignableFrom(dataType) || dataType == Date.class 
|| dataType == Instant.class
+                || dataType == Boolean.class || dataType == UUID.class) {
+            return mapping == Mapping.DEFAULT;
+        } else if (AttributeUtil.isString(dataType)) {
+            return mapping == Mapping.DEFAULT || mapping == Mapping.TEXT || 
mapping == Mapping.STRING;
+        } else if (AttributeUtil.isGeo(dataType)) {
+            return mapping == Mapping.DEFAULT || mapping == 
Mapping.PREFIX_TREE;
+        }
+        return false;
+    }
+
+    @Override
+    public String mapKey2Field(String key, KeyInformation keyInfo) {
+        IndexProvider.checkKeyValidity(key);
+        key = key.replace(' ', REPLACEMENT_CHAR);
+
+        if (!dynFields) return key;
+        if (ParameterType.MAPPED_NAME.hasParameter(keyInfo.getParameters())) 
return key;
+        String postfix;
+        final Class dataType = keyInfo.getDataType();
+        if (AttributeUtil.isString(dataType)) {
+            final Mapping map = getStringMapping(keyInfo);
+            switch (map) {
+                case TEXT: postfix = "_t"; break;
+                case STRING: postfix = "_s"; break;
+                default: throw new IllegalArgumentException("Unsupported 
string mapping: " + map);
+            }
+        } else if (AttributeUtil.isWholeNumber(dataType)) {
+            if (dataType.equals(Long.class)) postfix = "_l";
+            else postfix = "_i";
+        } else if (AttributeUtil.isDecimal(dataType)) {
+            if (dataType.equals(Float.class)) postfix = "_f";
+            else postfix = "_d";
+        } else if (dataType.equals(Geoshape.class)) {
+            postfix = "_g";
+        } else if (dataType.equals(Date.class) || 
dataType.equals(Instant.class)) {
+            postfix = "_dt";
+        } else if (dataType.equals(Boolean.class)) {
+            postfix = "_b";
+        } else if (dataType.equals(UUID.class)) {
+            postfix = "_uuid";
+        } else throw new IllegalArgumentException("Unsupported data type 
["+dataType+"] for field: " + key);
+        if (keyInfo.getCardinality() == Cardinality.SET || 
keyInfo.getCardinality() == Cardinality.LIST) {
+            postfix += "s";
+        }
+        return key+postfix;
+    }
+
+    @Override
+    public IndexFeatures getFeatures() {
+        return SOLR_FEATURES;
+    }
+
+    @Override
+    public boolean exists() throws BackendException {
+        if (mode!= Mode.CLOUD) throw new 
UnsupportedOperationException("Operation only supported for SolrCloud");
+        final CloudSolrClient server = (CloudSolrClient) solrClient;
+        try {
+            final ZkStateReader zkStateReader = server.getZkStateReader();
+            zkStateReader.forciblyRefreshAllClusterStateSlow();
+            final ClusterState clusterState = zkStateReader.getClusterState();
+            final Map<String, DocCollection> collections = 
clusterState.getCollectionsMap();
+            return collections != null && !collections.isEmpty();
+        } catch (KeeperException | InterruptedException e) {
+            throw new PermanentBackendException("Unable to check if index 
exists", e);
+        }
+    }
+
+    /*
+    ################# UTILITY METHODS #######################
+     */
+
+    private static Mapping getStringMapping(KeyInformation information) {
+        assert AttributeUtil.isString(information.getDataType());
+        Mapping map = Mapping.getMapping(information);
+        if (map==Mapping.DEFAULT) map = Mapping.TEXT;
+        return map;
+    }
+
+    private static Map<Geo, String> spatialPredicates() {
+        return Collections.unmodifiableMap(Stream.of(
+                new SimpleEntry<>(Geo.WITHIN, "IsWithin"),
+                new SimpleEntry<>(Geo.CONTAINS, "Contains"),
+                new SimpleEntry<>(Geo.INTERSECT, "Intersects"),
+                new SimpleEntry<>(Geo.DISJOINT, "IsDisjointTo"))
+                .collect(Collectors.toMap(SimpleEntry::getKey, 
SimpleEntry::getValue)));
+    }
+
+    private UpdateRequest newUpdateRequest() {
+        final UpdateRequest req = new UpdateRequest();
+        if(waitSearcher) {
+            req.setAction(UpdateRequest.ACTION.COMMIT, true, true);
+        }
+        return req;
+    }
+
+    private BackendException storageException(Exception solrException) {
+        return new TemporaryBackendException("Unable to complete query on 
Solr.", solrException);
+    }
+
+    private static void createCollectionIfNotExists(CloudSolrClient client, 
Configuration config, String collection)
+            throws IOException, SolrServerException, KeeperException, 
InterruptedException {
+        if (!checkIfCollectionExists(client, collection)) {
+            final Integer numShards = config.get(NUM_SHARDS);
+            final Integer maxShardsPerNode = config.get(MAX_SHARDS_PER_NODE);
+            final Integer replicationFactor = config.get(REPLICATION_FACTOR);
+
+
+            // Ideally this property used so a new configset is not uploaded 
for every single
+            // index (collection) created in solr.
+            // if a generic configSet is not set, make the configset name the 
same as the collection.
+            // This was the default behavior before a default configSet could 
be specified
+            final String  genericConfigSet = config.has(SOLR_DEFAULT_CONFIG) ? 
config.get(SOLR_DEFAULT_CONFIG):collection;
+
+            final CollectionAdminRequest.Create createRequest = 
CollectionAdminRequest.createCollection(collection, genericConfigSet, 
numShards, replicationFactor);
+            createRequest.setMaxShardsPerNode(maxShardsPerNode);
+
+            final CollectionAdminResponse createResponse = 
createRequest.process(client);
+            if (createResponse.isSuccess()) {
+                logger.trace("Collection {} successfully created.", 
collection);
+            } else {
+                throw new 
SolrServerException(Joiner.on("\n").join(createResponse.getErrorMessages()));
+            }
+        }
+
+        waitForRecoveriesToFinish(client, collection);
+    }
+
+    /**
+     * Checks if the collection has already been created in Solr.
+     */
+    private static boolean checkIfCollectionExists(CloudSolrClient server, 
String collection) throws KeeperException, InterruptedException {
+        final ZkStateReader zkStateReader = server.getZkStateReader();
+        zkStateReader.forceUpdateCollection(collection);
+        final ClusterState clusterState = zkStateReader.getClusterState();
+        return clusterState.getCollectionOrNull(collection) != null;
+    }
+
+    /**
+     * Wait for all the collection shards to be ready.
+     */
+    private static void waitForRecoveriesToFinish(CloudSolrClient server, 
String collection) throws KeeperException, InterruptedException {
+        final ZkStateReader zkStateReader = server.getZkStateReader();
+        try {
+            boolean cont = true;
+
+            while (cont) {
+                boolean sawLiveRecovering = false;
+                zkStateReader.forceUpdateCollection(collection);
+                final ClusterState clusterState = 
zkStateReader.getClusterState();
+                final Map<String, Slice> slices = 
clusterState.getCollection(collection).getSlicesMap();
+                Preconditions.checkNotNull(slices, "Could not find 
collection:" + collection);
+
+                // change paths for Replica.State per Solr refactoring
+                // remove SYNC state per: http://tinyurl.com/pag6rwt
+                for (final Map.Entry<String, Slice> entry : slices.entrySet()) 
{
+                    final Map<String, Replica> shards = 
entry.getValue().getReplicasMap();
+                    for (final Map.Entry<String, Replica> shard : 
shards.entrySet()) {
+                        final String state = 
shard.getValue().getStr(ZkStateReader.STATE_PROP).toUpperCase();
+                        if ((Replica.State.RECOVERING.name().equals(state) || 
Replica.State.DOWN.name().equals(state))
+                                && 
clusterState.liveNodesContain(shard.getValue().getStr(
+                                ZkStateReader.NODE_NAME_PROP))) {
+                            sawLiveRecovering = true;
+                        }
+                    }
+                }
+
+
+                if (!sawLiveRecovering) {
+                    cont = false;
+                } else {
+                    Thread.sleep(1000);
+                }
+            }
+        } finally {
+            logger.info("Exiting solr wait");
+        }
+    }
+
+    /* ATLAS-2920: Update JanusGraph Solr clients to use all zookeeper entries 
– start */
+    private List<String> getZookeeperURLs(Configuration config) {
+        List<String> ret     = null;
+        String[]     zkHosts = config.get(ZOOKEEPER_URLS);
+
+        if (zkHosts != null) {
+            ret = new ArrayList<>(zkHosts.length);
+
+            for (int i = 0; i < zkHosts.length; i++) {
+                String zkHost = zkHosts[i];
+
+                if (StringUtils.isEmpty(zkHost)) {
+                    continue;
+                }
+
+                int idxSlash = zkHost.indexOf(CHROOT_START_CHAR);
+
+                if (idxSlash != -1 && i < zkHosts.length - 1) {
+                    zkHost = zkHost.substring(0, idxSlash);
+                }
+
+                if (StringUtils.isNotEmpty(zkHost)) {
+                    ret.add(zkHost);
+                }
+            }
+        }
+
+        return ret;
+    }
+    /* ATLAS-2920 – end */
+}
\ No newline at end of file

Reply via email to