ATLAS-91 Add solr configuration and documentation (suma.shivaprasad via shwethags)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/48343db9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/48343db9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/48343db9 Branch: refs/heads/master Commit: 48343db999b495458409644c8b9d2fd0bd9fa99d Parents: 147242e Author: Shwetha GS <[email protected]> Authored: Wed Aug 12 11:15:02 2015 +0530 Committer: Shwetha GS <[email protected]> Committed: Wed Aug 12 11:15:02 2015 +0530 ---------------------------------------------------------------------- client/pom.xml | 6 + docs/src/site/twiki/InstallationSteps.twiki | 40 + pom.xml | 71 +- release-log.txt | 1 + repository/pom.xml | 10 + .../titan/diskstorage/solr/Solr5Index.java | 962 +++++++++++++++++++ .../repository/graph/TitanGraphProvider.java | 36 + src/conf/solr/currency.xml | 67 ++ src/conf/solr/lang/stopwords_en.txt | 54 ++ src/conf/solr/protwords.txt | 21 + src/conf/solr/schema.xml | 534 ++++++++++ src/conf/solr/solrconfig.xml | 625 ++++++++++++ src/conf/solr/stopwords.txt | 14 + src/conf/solr/synonyms.txt | 29 + 14 files changed, 2466 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/48343db9/client/pom.xml ---------------------------------------------------------------------- diff --git a/client/pom.xml b/client/pom.xml index d393b3a..2e27930 100755 --- a/client/pom.xml +++ b/client/pom.xml @@ -37,6 +37,12 @@ <artifactId>atlas-typesystem</artifactId> </dependency> + <!-- supports simple auth handler --> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/48343db9/docs/src/site/twiki/InstallationSteps.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/InstallationSteps.twiki b/docs/src/site/twiki/InstallationSteps.twiki index 0391c2d..e056d17 100644 --- a/docs/src/site/twiki/InstallationSteps.twiki +++ b/docs/src/site/twiki/InstallationSteps.twiki @@ -35,6 +35,15 @@ Tar is structured as follows |- client.properties |- atlas-env.sh |- log4j.xml + |- solr + |- currency.xml + |- lang + |- stopwords_en.txt + |- protowords.txt + |- schema.xml + |- solrconfig.xml + |- stopwords.txt + |- synonyms.txt |- docs |- server |- webapp @@ -112,6 +121,37 @@ and change it to look as below export METADATA_SERVER_OPTS="-Djava.awt.headless=true -Djava.security.krb5.realm= -Djava.security.krb5.kdc=" </verbatim> +* Configuring SOLR as the Indexing Backend for the Graph Repository + +By default, Atlas uses Titan as the graph repository and is the only graph repository implementation available currently. +For configuring Titan to work with Solr, please follow the instructions below +<verbatim> +* Install solr if not already running. Versions of SOLR supported are 4.8.1 or 5.2.1. + +* Start solr in cloud mode. + SolrCloud mode uses a ZooKeeper Service as a highly available, central location for cluster management. + For a small cluster, running with an existing ZooKeeper quorum should be fine. For larger clusters, you would want to run separate multiple ZooKeeper quorum with atleast 3 servers. + Note: Atlas currently supports solr in "cloud" mode only. "http" mode is not supported. For more information, refer solr documentation - https://cwiki.apache.org/confluence/display/solr/SolrCloud + +* Run the following commands from SOLR_HOME directory to create collections in Solr corresponding to the indexes that Atlas uses + bin/solr create -c vertex_index -d ATLAS_HOME/conf/solr -shards #numShards -replicationFactor #replicationFactor + bin/solr create -c edge_index -d ATLAS_HOME/conf/solr -shards #numShards -replicationFactor #replicationFactor + bin/solr create -c fulltext_index -d ATLAS_HOME/conf/solr -shards #numShards -replicationFactor #replicationFactor + + Note: If numShards and replicationFactor are not specified, they default to 1 which suffices if you are trying out solr with ATLAS on a single node instance. + Otherwise specify numShards according to the number of hosts that are in the Solr cluster and the maxShardsPerNode configuration. + The number of shards cannot exceed the total number of Solr nodes in your SolrCloud cluster + +* Change ATLAS configuration to point to the Solr instance setup. Please make sure the following configurations are set to the below values in ATLAS_HOME//conf/application.properties + atlas.graph.index.search.backend=<'solr' for solr 4.8.1>/<'solr5' for solr 5.2.1> + atlas.graph.index.search.solr.mode=cloud + atlas.graph.index.search.solr.zookeeper-url=<the ZK quorum setup for solr as comma separated value> eg: 10.1.6.4:2181,10.1.6.5:2181 + +* Restart Atlas +</verbatim> + +For more information on Titan solr configuration , please refer http://s3.thinkaurelius.com/docs/titan/0.5.4/solr.htm + *Starting Atlas Server* <verbatim> bin/atlas_start.py [-port <port>] http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/48343db9/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index facd539..3d729b5 100755 --- a/pom.xml +++ b/pom.xml @@ -329,8 +329,8 @@ <titan.version>0.5.4</titan.version> <hadoop.version>2.7.0</hadoop.version> <hbase.version>0.98.9-hadoop2</hbase.version> + <solr.version>5.1.0</solr.version> <kafka.version>0.8.2.0</kafka.version> - <!-- scala versions --> <scala.version>2.10.4</scala.version> <scala.binary.version>2.10</scala.binary.version> @@ -397,8 +397,8 @@ </activation> <properties> <titan.storage.backend>hbase</titan.storage.backend> - <titan.index.backend>solr</titan.index.backend> - <solr.zk.address>localhost:9983</solr.zk.address> + <titan.index.backend>solr5</titan.index.backend> + <solr.zk.address>localhost:2181</solr.zk.address> <titan.storage.hostname>localhost</titan.storage.hostname> </properties> </profile> @@ -578,6 +578,10 @@ <groupId>org.htrace</groupId> <artifactId>*</artifactId> </exclusion> + <exclusion> + <groupId>commons-httpclient</groupId> + <artifactId>*</artifactId> + </exclusion> </exclusions> </dependency> @@ -903,11 +907,49 @@ <artifactId>*</artifactId> <groupId>org.ow2.asm</groupId> </exclusion> + <exclusion> + <artifactId>*</artifactId> + <groupId>org.apache.solr</groupId> + </exclusion> </exclusions> </dependency> <dependency> + <groupId>org.apache.solr</groupId> + <artifactId>solr-core</artifactId> + <version>${solr.version}</version> + <exclusions> + <exclusion> + <artifactId>*</artifactId> + <groupId>org.eclipse.jetty</groupId> + </exclusion> + <exclusion> + <artifactId>*</artifactId> + <groupId>org.eclipse.jetty.orbit</groupId> + </exclusion> + <exclusion> + <artifactId>*</artifactId> + <groupId>org.restlet.jee</groupId> + </exclusion> + <exclusion> + <artifactId>*</artifactId> + <groupId>org.ow2.asm</groupId> + </exclusion> + <exclusion> + <artifactId>*</artifactId> + <groupId>org.apache.lucene</groupId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.solr</groupId> + <artifactId>solr-solrj</artifactId> + <version>${solr.version}</version> + </dependency> + + <dependency> <groupId>com.thinkaurelius.titan</groupId> <artifactId>titan-lucene</artifactId> <version>${titan.version}</version> @@ -1084,7 +1126,19 @@ <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> - <version>4.2.5</version> + <version>4.4.1</version> + </dependency> + + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpcore</artifactId> + <version>4.4.1</version> + </dependency> + + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpmime</artifactId> + <version>4.4.1</version> </dependency> <!--Test dependencies--> @@ -1376,6 +1430,14 @@ </descriptors> <finalName>apache-atlas-${project.version}</finalName> </configuration> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> </plugin> <plugin> @@ -1502,6 +1564,7 @@ <exclude>**/*.iml</exclude> <exclude>**/*.json</exclude> <exclude>**/target/**</exclude> + <exclude>**/target*/**</exclude> <exclude>**/build/**</exclude> <exclude>**/*.patch</exclude> <exclude>derby.log</exclude> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/48343db9/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 5d83baa..5de705f 100644 --- a/release-log.txt +++ b/release-log.txt @@ -8,6 +8,7 @@ ATLAS-54 Rename configs in hive hook (shwethags) ATLAS-3 Mixed Index creation fails with Date types (suma.shivaprasad via shwethags) ALL CHANGES: +ATLAS-91 Add solr configuration and documentation (suma.shivaprasad via shwethags) ATLAS-95 import-hive.sh reports illegal java parameters (shwethags) ATLAS-74 Create notification framework (shwethags) ATLAS-93 import-hive.sh reports FileNotFoundException (shwethags) http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/48343db9/repository/pom.xml ---------------------------------------------------------------------- diff --git a/repository/pom.xml b/repository/pom.xml index a2f8e08..8e4d0f3 100755 --- a/repository/pom.xml +++ b/repository/pom.xml @@ -95,6 +95,16 @@ </dependency> <dependency> + <groupId>org.apache.solr</groupId> + <artifactId>solr-core</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.solr</groupId> + <artifactId>solr-solrj</artifactId> + </dependency> + + <dependency> <groupId>com.thinkaurelius.titan</groupId> <artifactId>titan-berkeleyje</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/48343db9/repository/src/main/java/com/thinkaurelius/titan/diskstorage/solr/Solr5Index.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/solr/Solr5Index.java b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/solr/Solr5Index.java new file mode 100644 index 0000000..e484c18 --- /dev/null +++ b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/solr/Solr5Index.java @@ -0,0 +1,962 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.thinkaurelius.titan.diskstorage.solr; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; +import com.thinkaurelius.titan.core.Order; +import com.thinkaurelius.titan.core.TitanElement; +import com.thinkaurelius.titan.core.attribute.Cmp; +import com.thinkaurelius.titan.core.attribute.Geo; +import com.thinkaurelius.titan.core.attribute.Geoshape; +import com.thinkaurelius.titan.core.attribute.Text; +import com.thinkaurelius.titan.core.schema.Mapping; +import com.thinkaurelius.titan.diskstorage.BackendException; +import com.thinkaurelius.titan.diskstorage.BaseTransaction; +import com.thinkaurelius.titan.diskstorage.BaseTransactionConfig; +import com.thinkaurelius.titan.diskstorage.BaseTransactionConfigurable; +import com.thinkaurelius.titan.diskstorage.PermanentBackendException; +import com.thinkaurelius.titan.diskstorage.TemporaryBackendException; +import com.thinkaurelius.titan.diskstorage.configuration.ConfigNamespace; +import com.thinkaurelius.titan.diskstorage.configuration.ConfigOption; +import com.thinkaurelius.titan.diskstorage.configuration.Configuration; +import com.thinkaurelius.titan.diskstorage.indexing.IndexEntry; +import com.thinkaurelius.titan.diskstorage.indexing.IndexFeatures; +import com.thinkaurelius.titan.diskstorage.indexing.IndexMutation; +import com.thinkaurelius.titan.diskstorage.indexing.IndexProvider; +import com.thinkaurelius.titan.diskstorage.indexing.IndexQuery; +import com.thinkaurelius.titan.diskstorage.indexing.KeyInformation; +import com.thinkaurelius.titan.diskstorage.indexing.RawQuery; +import com.thinkaurelius.titan.diskstorage.solr.transform.GeoToWktConverter; +import com.thinkaurelius.titan.diskstorage.util.DefaultTransaction; +import com.thinkaurelius.titan.graphdb.configuration.PreInitializeConfigOptions; +import com.thinkaurelius.titan.graphdb.database.serialize.AttributeUtil; +import com.thinkaurelius.titan.graphdb.database.serialize.attribute.AbstractDecimal; +import com.thinkaurelius.titan.graphdb.query.TitanPredicate; +import com.thinkaurelius.titan.graphdb.query.condition.And; +import com.thinkaurelius.titan.graphdb.query.condition.Condition; +import com.thinkaurelius.titan.graphdb.query.condition.Not; +import com.thinkaurelius.titan.graphdb.query.condition.Or; +import com.thinkaurelius.titan.graphdb.query.condition.PredicateCondition; +import com.thinkaurelius.titan.graphdb.types.ParameterType; +import org.apache.commons.lang.StringUtils; +import org.apache.http.client.HttpClient; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.impl.HttpClientUtil; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.impl.LBHttpSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.response.CollectionAdminResponse; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.client.solrj.util.ClientUtils; +import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; +import java.util.UUID; + +import static com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration.INDEX_MAX_RESULT_SET_SIZE; +import static com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration.INDEX_NS; + +/** + * NOTE: Copied from titan for supporting sol5. Do not change + */ +@PreInitializeConfigOptions +public class Solr5Index implements IndexProvider { + + private static final Logger logger = LoggerFactory.getLogger(Solr5Index.class); + + + private static final String DEFAULT_ID_FIELD = "id"; + + private enum Mode { + HTTP, CLOUD; + + public static Mode parse(String mode) { + for (Mode m : Mode.values()) { + if (m.toString().equalsIgnoreCase(mode)) return m; + } + throw new IllegalArgumentException("Unrecognized mode: "+mode); + } + } + + public static final ConfigNamespace SOLR_NS = + new ConfigNamespace(INDEX_NS, "solr", "Solr index configuration"); + + public static final ConfigOption<String> SOLR_MODE = new ConfigOption<String>(SOLR_NS,"mode", + "The operation mode for Solr which is either via HTTP (`http`) or using SolrCloud (`cloud`)", + ConfigOption.Type.GLOBAL_OFFLINE, "cloud"); + + public static final ConfigOption<Boolean> DYNAMIC_FIELDS = new ConfigOption<Boolean>(SOLR_NS,"dyn-fields", + "Whether to use dynamic fields (which appends the data type to the field name). If dynamic fields is disabled" + + "the user must map field names and define them explicitly in the schema.", + ConfigOption.Type.GLOBAL_OFFLINE, true); + + public static final ConfigOption<String[]> KEY_FIELD_NAMES = new ConfigOption<String[]>(SOLR_NS,"key-field-names", + "Field name that uniquely identifies each document in Solr. Must be specified as a list of `collection=field`.", + ConfigOption.Type.GLOBAL, String[].class); + + public static final ConfigOption<String> TTL_FIELD = new ConfigOption<String>(SOLR_NS,"ttl_field", + "Name of the TTL field for Solr collections.", + ConfigOption.Type.GLOBAL_OFFLINE, "ttl"); + + /** SolrCloud Configuration */ + + public static final ConfigOption<String> ZOOKEEPER_URL = new ConfigOption<String>(SOLR_NS,"zookeeper-url", + "URL of the Zookeeper instance coordinating the SolrCloud cluster", + ConfigOption.Type.MASKABLE, "localhost:2181"); + + public static final ConfigOption<Integer> NUM_SHARDS = new ConfigOption<Integer>(SOLR_NS,"num-shards", + "Number of shards for a collection. This applies when creating a new collection which is only supported under the SolrCloud operation mode.", + ConfigOption.Type.GLOBAL_OFFLINE, 1); + + public static final ConfigOption<Integer> MAX_SHARDS_PER_NODE = new ConfigOption<Integer>(SOLR_NS,"max-shards-per-node", + "Maximum number of shards per node. This applies when creating a new collection which is only supported under the SolrCloud operation mode.", + ConfigOption.Type.GLOBAL_OFFLINE, 1); + + public static final ConfigOption<Integer> REPLICATION_FACTOR = new ConfigOption<Integer>(SOLR_NS,"replication-factor", + "Replication factor for a collection. This applies when creating a new collection which is only supported under the SolrCloud operation mode.", + ConfigOption.Type.GLOBAL_OFFLINE, 1); + + + /** HTTP Configuration */ + + public static final ConfigOption<String[]> HTTP_URLS = new ConfigOption<String[]>(SOLR_NS,"http-urls", + "List of URLs to use to connect to Solr Servers (LBHttpSolrClient is used), don't add core or collection name to the URL.", + ConfigOption.Type.MASKABLE, new String[] { "http://localhost:8983/solr" }); + + public static final ConfigOption<Integer> HTTP_CONNECTION_TIMEOUT = new ConfigOption<Integer>(SOLR_NS,"http-connection-timeout", + "Solr HTTP connection timeout.", + ConfigOption.Type.MASKABLE, 5000); + + public static final ConfigOption<Boolean> HTTP_ALLOW_COMPRESSION = new ConfigOption<Boolean>(SOLR_NS,"http-compression", + "Enable/disable compression on the HTTP connections made to Solr.", + ConfigOption.Type.MASKABLE, false); + + public static final ConfigOption<Integer> HTTP_MAX_CONNECTIONS_PER_HOST = new ConfigOption<Integer>(SOLR_NS,"http-max-per-host", + "Maximum number of HTTP connections per Solr host.", + ConfigOption.Type.MASKABLE, 20); + + public static final ConfigOption<Integer> HTTP_GLOBAL_MAX_CONNECTIONS = new ConfigOption<Integer>(SOLR_NS,"http-max", + "Maximum number of HTTP connections in total to all Solr servers.", + ConfigOption.Type.MASKABLE, 100); + + public static final ConfigOption<Boolean> WAIT_SEARCHER = new ConfigOption<Boolean>(SOLR_NS, "wait-searcher", + "When mutating - wait for the index to reflect new mutations before returning. This can have a negative impact on performance.", + ConfigOption.Type.LOCAL, false); + + + + private static final IndexFeatures SOLR_FEATURES = new IndexFeatures.Builder().supportsDocumentTTL() + .setDefaultStringMapping(Mapping.TEXT).supportedStringMappings(Mapping.TEXT, Mapping.STRING).build(); + + private final SolrClient solrClient; + private final Configuration configuration; + private final Mode mode; + private final boolean dynFields; + private final Map<String, String> keyFieldIds; + private final String ttlField; + private final int maxResults; + private final boolean waitSearcher; + + public Solr5Index(final Configuration config) throws BackendException { + Preconditions.checkArgument(config!=null); + configuration = config; + + mode = Mode.parse(config.get(SOLR_MODE)); + dynFields = config.get(DYNAMIC_FIELDS); + keyFieldIds = parseKeyFieldsForCollections(config); + maxResults = config.get(INDEX_MAX_RESULT_SET_SIZE); + ttlField = config.get(TTL_FIELD); + waitSearcher = config.get(WAIT_SEARCHER); + + if (mode==Mode.CLOUD) { + String zookeeperUrl = config.get(Solr5Index.ZOOKEEPER_URL); + CloudSolrClient cloudServer = new CloudSolrClient(zookeeperUrl, true); + cloudServer.connect(); + solrClient = cloudServer; + } else if (mode==Mode.HTTP) { + HttpClient clientParams = HttpClientUtil.createClient(new ModifiableSolrParams() {{ + add(HttpClientUtil.PROP_ALLOW_COMPRESSION, config.get(HTTP_ALLOW_COMPRESSION).toString()); + add(HttpClientUtil.PROP_CONNECTION_TIMEOUT, config.get(HTTP_CONNECTION_TIMEOUT).toString()); + add(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, config.get(HTTP_MAX_CONNECTIONS_PER_HOST).toString()); + add(HttpClientUtil.PROP_MAX_CONNECTIONS, config.get(HTTP_GLOBAL_MAX_CONNECTIONS).toString()); + }}); + + solrClient = new LBHttpSolrClient(clientParams, config.get(HTTP_URLS)); + + + } else { + throw new IllegalArgumentException("Unsupported Solr operation mode: " + mode); + } + } + + private Map<String, String> parseKeyFieldsForCollections(Configuration config) throws BackendException { + Map<String, String> keyFieldNames = new HashMap<String, String>(); + String[] collectionFieldStatements = config.has(KEY_FIELD_NAMES)?config.get(KEY_FIELD_NAMES):new String[0]; + for (String collectionFieldStatement : collectionFieldStatements) { + String[] parts = collectionFieldStatement.trim().split("="); + if (parts.length != 2) { + throw new PermanentBackendException("Unable to parse the collection name / key field name pair. It should be of the format collection=field"); + } + String collectionName = parts[0]; + String keyFieldName = parts[1]; + keyFieldNames.put(collectionName, keyFieldName); + } + return keyFieldNames; + } + + private String getKeyFieldId(String collection) { + String field = keyFieldIds.get(collection); + if (field==null) field = DEFAULT_ID_FIELD; + return field; + } + + /** + * Unlike the ElasticSearch Index, which is schema free, Solr requires a schema to + * support searching. This means that you will need to modify the solr schema with the + * appropriate field definitions in order to work properly. If you have a running instance + * of Solr and you modify its schema with new fields, don't forget to re-index! + * @param store Index store + * @param key New key to register + * @param information Datatype to register for the key + * @param tx enclosing transaction + * @throws com.thinkaurelius.titan.diskstorage.BackendException + */ + @Override + public void register(String store, String key, KeyInformation information, BaseTransaction tx) throws BackendException { + if (mode==Mode.CLOUD) { + CloudSolrClient client = (CloudSolrClient) solrClient; + try { + createCollectionIfNotExists(client, configuration, store); + } catch (IOException e) { + throw new PermanentBackendException(e); + } catch (SolrServerException e) { + throw new PermanentBackendException(e); + } catch (InterruptedException e) { + throw new PermanentBackendException(e); + } catch (KeeperException e) { + throw new PermanentBackendException(e); + } + } + //Since all data types must be defined in the schema.xml, pre-registering a type does not work + } + + @Override + public void mutate(Map<String, Map<String, IndexMutation>> mutations, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException { + logger.debug("Mutating SOLR"); + try { + for (Map.Entry<String, Map<String, IndexMutation>> stores : mutations.entrySet()) { + String collectionName = stores.getKey(); + String keyIdField = getKeyFieldId(collectionName); + + List<String> deleteIds = new ArrayList<String>(); + Collection<SolrInputDocument> changes = new ArrayList<SolrInputDocument>(); + + for (Map.Entry<String, IndexMutation> entry : stores.getValue().entrySet()) { + String docId = entry.getKey(); + IndexMutation mutation = entry.getValue(); + Preconditions.checkArgument(!(mutation.isNew() && mutation.isDeleted())); + Preconditions.checkArgument(!mutation.isNew() || !mutation.hasDeletions()); + Preconditions.checkArgument(!mutation.isDeleted() || !mutation.hasAdditions()); + + //Handle any deletions + if (mutation.hasDeletions()) { + if (mutation.isDeleted()) { + logger.trace("Deleting entire document {}", docId); + deleteIds.add(docId); + } else { + HashSet<IndexEntry> fieldDeletions = Sets.newHashSet(mutation.getDeletions()); + if (mutation.hasAdditions()) { + for (IndexEntry indexEntry : mutation.getAdditions()) { + fieldDeletions.remove(indexEntry); + } + } + deleteIndividualFieldsFromIndex(collectionName, keyIdField, docId, fieldDeletions); + } + } + + if (mutation.hasAdditions()) { + int ttl = mutation.determineTTL(); + + SolrInputDocument doc = new SolrInputDocument(); + doc.setField(keyIdField, docId); + + boolean isNewDoc = mutation.isNew(); + + if (isNewDoc) + logger.trace("Adding new document {}", docId); + + for (IndexEntry e : mutation.getAdditions()) { + final Object fieldValue = convertValue(e.value); + doc.setField(e.field, isNewDoc + ? fieldValue : new HashMap<String, Object>(1) {{ put("set", fieldValue); }}); + } + if (ttl>0) { + Preconditions.checkArgument(isNewDoc,"Solr only supports TTL on new documents [%s]",docId); + doc.setField(ttlField, String.format("+%dSECONDS", ttl)); + } + changes.add(doc); + } + } + + commitDeletes(collectionName, deleteIds); + commitDocumentChanges(collectionName, changes); + } + } catch (Exception e) { + throw storageException(e); + } + } + + private Object convertValue(Object value) throws BackendException { + if (value instanceof Geoshape) + return GeoToWktConverter.convertToWktString((Geoshape) value); + // in order to serialize/deserialize properly Solr will have to have an + // access to Titan source which has Decimal type, so for now we simply convert to + // double and let Solr do the same thing or fail. + if (value instanceof AbstractDecimal) + return ((AbstractDecimal) value).doubleValue(); + if (value instanceof UUID) + return value.toString(); + return value; + } + + @Override + public void restore(Map<String, Map<String, List<IndexEntry>>> documents, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException { + try { + for (Map.Entry<String, Map<String, List<IndexEntry>>> stores : documents.entrySet()) { + final String collectionName = stores.getKey(); + + List<String> deleteIds = new ArrayList<String>(); + List<SolrInputDocument> newDocuments = new ArrayList<SolrInputDocument>(); + + for (Map.Entry<String, List<IndexEntry>> entry : stores.getValue().entrySet()) { + final String docID = entry.getKey(); + final List<IndexEntry> content = entry.getValue(); + + if (content == null || content.isEmpty()) { + if (logger.isTraceEnabled()) + logger.trace("Deleting document [{}]", docID); + + deleteIds.add(docID); + continue; + } + + newDocuments.add(new SolrInputDocument() {{ + setField(getKeyFieldId(collectionName), docID); + + for (IndexEntry addition : content) { + Object fieldValue = addition.value; + setField(addition.field, convertValue(fieldValue)); + } + }}); + } + + commitDeletes(collectionName, deleteIds); + commitDocumentChanges(collectionName, newDocuments); + } + } catch (Exception e) { + throw new TemporaryBackendException("Could not restore Solr index", e); + } + } + + private void deleteIndividualFieldsFromIndex(String collectionName, String keyIdField, String docId, HashSet<IndexEntry> fieldDeletions) throws SolrServerException, IOException { + if (fieldDeletions.isEmpty()) return; + + Map<String, String> fieldDeletes = new HashMap<String, String>(1) {{ put("set", null); }}; + + SolrInputDocument doc = new SolrInputDocument(); + doc.addField(keyIdField, docId); + StringBuilder sb = new StringBuilder(); + for (IndexEntry fieldToDelete : fieldDeletions) { + doc.addField(fieldToDelete.field, fieldDeletes); + sb.append(fieldToDelete).append(","); + } + + if (logger.isTraceEnabled()) + logger.trace("Deleting individual fields [{}] for document {}", sb.toString(), docId); + + UpdateRequest singleDocument = newUpdateRequest(); + singleDocument.add(doc); + solrClient.request(singleDocument, collectionName); + } + + private void commitDocumentChanges(String collectionName, Collection<SolrInputDocument> documents) throws SolrServerException, IOException { + if (documents.size() == 0) return; + + try { + solrClient.request(newUpdateRequest().add(documents), collectionName); + } catch (HttpSolrClient.RemoteSolrException rse) { + logger.error("Unable to save documents to Solr as one of the shape objects stored were not compatible with Solr.", rse); + logger.error("Details in failed document batch: "); + for (SolrInputDocument d : documents) { + Collection<String> fieldNames = d.getFieldNames(); + for (String name : fieldNames) { + logger.error(name + ":" + d.getFieldValue(name).toString()); + } + } + + throw rse; + } + } + + private void commitDeletes(String collectionName, List<String> deleteIds) throws SolrServerException, IOException { + if (deleteIds.size() == 0) return; + solrClient.request(newUpdateRequest().deleteById(deleteIds), collectionName); + } + + @Override + public List<String> query(IndexQuery query, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException { + List<String> result; + String collection = query.getStore(); + String keyIdField = getKeyFieldId(collection); + SolrQuery solrQuery = new SolrQuery("*:*"); + String queryFilter = buildQueryFilter(query.getCondition(), informations.get(collection)); + solrQuery.addFilterQuery(queryFilter); + if (!query.getOrder().isEmpty()) { + List<IndexQuery.OrderEntry> orders = query.getOrder(); + for (IndexQuery.OrderEntry order1 : orders) { + String item = order1.getKey(); + SolrQuery.ORDER order = order1.getOrder() == Order.ASC ? SolrQuery.ORDER.asc : SolrQuery.ORDER.desc; + solrQuery.addSort(new SolrQuery.SortClause(item, order)); + } + } + solrQuery.setStart(0); + if (query.hasLimit()) { + solrQuery.setRows(query.getLimit()); + } else { + solrQuery.setRows(maxResults); + } + try { + QueryResponse response = solrClient.query(collection, solrQuery); + + if (logger.isDebugEnabled()) + logger.debug("Executed query [{}] in {} ms", query.getCondition(), response.getElapsedTime()); + + int totalHits = response.getResults().size(); + + if (!query.hasLimit() && totalHits >= maxResults) + logger.warn("Query result set truncated to first [{}] elements for query: {}", maxResults, query); + + result = new ArrayList<String>(totalHits); + for (SolrDocument hit : response.getResults()) { + result.add(hit.getFieldValue(keyIdField).toString()); + } + } catch (IOException e) { + logger.error("Query did not complete : ", e); + throw new PermanentBackendException(e); + } catch (SolrServerException e) { + logger.error("Unable to query Solr index.", e); + throw new PermanentBackendException(e); + } + return result; + } + + @Override + public Iterable<RawQuery.Result<String>> query(RawQuery query, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException { + List<RawQuery.Result<String>> result; + String collection = query.getStore(); + String keyIdField = getKeyFieldId(collection); + SolrQuery solrQuery = new SolrQuery(query.getQuery()) + .addField(keyIdField) + .setIncludeScore(true) + .setStart(query.getOffset()) + .setRows(query.hasLimit() ? query.getLimit() : maxResults); + + try { + QueryResponse response = solrClient.query(collection, solrQuery); + if (logger.isDebugEnabled()) + logger.debug("Executed query [{}] in {} ms", query.getQuery(), response.getElapsedTime()); + + int totalHits = response.getResults().size(); + if (!query.hasLimit() && totalHits >= maxResults) { + logger.warn("Query result set truncated to first [{}] elements for query: {}", maxResults, query); + } + result = new ArrayList<RawQuery.Result<String>>(totalHits); + + for (SolrDocument hit : response.getResults()) { + double score = Double.parseDouble(hit.getFieldValue("score").toString()); + result.add(new RawQuery.Result<String>(hit.getFieldValue(keyIdField).toString(), score)); + } + } catch (IOException e) { + logger.error("Query did not complete : ", e); + throw new PermanentBackendException(e); + } catch (SolrServerException e) { + logger.error("Unable to query Solr index.", e); + throw new PermanentBackendException(e); + } + return result; + } + + private static String escapeValue(Object value) { + return ClientUtils.escapeQueryChars(value.toString()); + } + + public String buildQueryFilter(Condition<TitanElement> condition, KeyInformation.StoreRetriever informations) { + if (condition instanceof PredicateCondition) { + PredicateCondition<String, TitanElement> atom = (PredicateCondition<String, TitanElement>) condition; + Object value = atom.getValue(); + String key = atom.getKey(); + TitanPredicate titanPredicate = atom.getPredicate(); + + if (value instanceof Number) { + String queryValue = escapeValue(value); + Preconditions.checkArgument(titanPredicate instanceof Cmp, "Relation not supported on numeric types: " + titanPredicate); + Cmp numRel = (Cmp) titanPredicate; + switch (numRel) { + case EQUAL: + return (key + ":" + queryValue); + case NOT_EQUAL: + return ("-" + key + ":" + queryValue); + case LESS_THAN: + //use right curly to mean up to but not including value + return (key + ":[* TO " + queryValue + "}"); + case LESS_THAN_EQUAL: + return (key + ":[* TO " + queryValue + "]"); + case GREATER_THAN: + //use left curly to mean greater than but not including value + return (key + ":{" + queryValue + " TO *]"); + case GREATER_THAN_EQUAL: + return (key + ":[" + queryValue + " TO *]"); + default: throw new IllegalArgumentException("Unexpected relation: " + numRel); + } + } else if (value instanceof String) { + Mapping map = getStringMapping(informations.get(key)); + assert map==Mapping.TEXT || map==Mapping.STRING; + if (map==Mapping.TEXT && !titanPredicate.toString().startsWith("CONTAINS")) + throw new IllegalArgumentException("Text mapped string values only support CONTAINS queries and not: " + titanPredicate); + if (map==Mapping.STRING && titanPredicate.toString().startsWith("CONTAINS")) + throw new IllegalArgumentException("String mapped string values do not support CONTAINS queries: " + titanPredicate); + + //Special case + if (titanPredicate == Text.CONTAINS) { + //e.g. - if terms tomorrow and world were supplied, and fq=text:(tomorrow world) + //sample data set would return 2 documents: one where text = Tomorrow is the World, + //and the second where text = Hello World. Hence, we are decomposing the query string + //and building an AND query explicitly because we need AND semantics + value = ((String) value).toLowerCase(); + List<String> terms = Text.tokenize((String) value); + + if (terms.isEmpty()) { + return ""; + } else if (terms.size() == 1) { + return (key + ":(" + escapeValue(terms.get(0)) + ")"); + } else { + And<TitanElement> andTerms = new And<TitanElement>(); + for (String term : terms) { + andTerms.add(new PredicateCondition<String, TitanElement>(key, titanPredicate, term)); + } + return buildQueryFilter(andTerms, informations); + } + } + if (titanPredicate == Text.PREFIX || titanPredicate == Text.CONTAINS_PREFIX) { + return (key + ":" + escapeValue(value) + "*"); + } else if (titanPredicate == Text.REGEX || titanPredicate == Text.CONTAINS_REGEX) { + return (key + ":/" + value + "/"); + } else if (titanPredicate == Cmp.EQUAL) { + return (key + ":\"" + escapeValue(value) + "\""); + } else if (titanPredicate == Cmp.NOT_EQUAL) { + return ("-" + key + ":\"" + escapeValue(value) + "\""); + } else { + throw new IllegalArgumentException("Relation is not supported for string value: " + titanPredicate); + } + } else if (value instanceof Geoshape) { + Geoshape geo = (Geoshape)value; + if (geo.getType() == Geoshape.Type.CIRCLE) { + Geoshape.Point center = geo.getPoint(); + return ("{!geofilt sfield=" + key + + " pt=" + center.getLatitude() + "," + center.getLongitude() + + " d=" + geo.getRadius() + "} distErrPct=0"); //distance in kilometers + } else if (geo.getType() == Geoshape.Type.BOX) { + Geoshape.Point southwest = geo.getPoint(0); + Geoshape.Point northeast = geo.getPoint(1); + return (key + ":[" + southwest.getLatitude() + "," + southwest.getLongitude() + + " TO " + northeast.getLatitude() + "," + northeast.getLongitude() + "]"); + } else if (geo.getType() == Geoshape.Type.POLYGON) { + List<Geoshape.Point> coordinates = getPolygonPoints(geo); + StringBuilder poly = new StringBuilder(key + ":\"IsWithin(POLYGON(("); + for (Geoshape.Point coordinate : coordinates) { + poly.append(coordinate.getLongitude()).append(" ").append(coordinate.getLatitude()).append(", "); + } + //close the polygon with the first coordinate + poly.append(coordinates.get(0).getLongitude()).append(" ").append(coordinates.get(0).getLatitude()); + poly.append(")))\" distErrPct=0"); + return (poly.toString()); + } + } else if (value instanceof Date) { + String queryValue = escapeValue(toIsoDate((Date)value)); + Preconditions.checkArgument(titanPredicate instanceof Cmp, "Relation not supported on date types: " + titanPredicate); + Cmp numRel = (Cmp) titanPredicate; + + switch (numRel) { + case EQUAL: + return (key + ":" + queryValue); + case NOT_EQUAL: + return ("-" + key + ":" + queryValue); + case LESS_THAN: + //use right curly to mean up to but not including value + return (key + ":[* TO " + queryValue + "}"); + case LESS_THAN_EQUAL: + return (key + ":[* TO " + queryValue + "]"); + case GREATER_THAN: + //use left curly to mean greater than but not including value + return (key + ":{" + queryValue + " TO *]"); + case GREATER_THAN_EQUAL: + return (key + ":[" + queryValue + " TO *]"); + default: throw new IllegalArgumentException("Unexpected relation: " + numRel); + } + } else if (value instanceof Boolean) { + Cmp numRel = (Cmp) titanPredicate; + String queryValue = escapeValue(value); + switch (numRel) { + case EQUAL: + return (key + ":" + queryValue); + case NOT_EQUAL: + return ("-" + key + ":" + queryValue); + default: + throw new IllegalArgumentException("Boolean types only support EQUAL or NOT_EQUAL"); + } + } else if (value instanceof UUID) { + if (titanPredicate == Cmp.EQUAL) { + return (key + ":\"" + escapeValue(value) + "\""); + } else if (titanPredicate == Cmp.NOT_EQUAL) { + return ("-" + key + ":\"" + escapeValue(value) + "\""); + } else { + throw new IllegalArgumentException("Relation is not supported for uuid value: " + titanPredicate); + } + } else throw new IllegalArgumentException("Unsupported type: " + value); + } else if (condition instanceof Not) { + String sub = buildQueryFilter(((Not)condition).getChild(),informations); + if (StringUtils.isNotBlank(sub)) return "-("+sub+")"; + else return ""; + } else if (condition instanceof And) { + int numChildren = ((And) condition).size(); + StringBuilder sb = new StringBuilder(); + for (Condition<TitanElement> c : condition.getChildren()) { + String sub = buildQueryFilter(c, informations); + + if (StringUtils.isBlank(sub)) + continue; + + // we don't have to add "+" which means AND iff + // a. it's a NOT query, + // b. expression is a single statement in the AND. + if (!sub.startsWith("-") && numChildren > 1) + sb.append("+"); + + sb.append(sub).append(" "); + } + return sb.toString(); + } else if (condition instanceof Or) { + StringBuilder sb = new StringBuilder(); + int element=0; + for (Condition<TitanElement> c : condition.getChildren()) { + String sub = buildQueryFilter(c,informations); + if (StringUtils.isBlank(sub)) continue; + if (element==0) sb.append("("); + else sb.append(" OR "); + sb.append(sub); + element++; + } + if (element>0) sb.append(")"); + return sb.toString(); + } else { + throw new IllegalArgumentException("Invalid condition: " + condition); + } + return null; + } + + private String toIsoDate(Date value) { + TimeZone tz = TimeZone.getTimeZone("UTC"); + DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); + df.setTimeZone(tz); + return df.format(value); + } + + private List<Geoshape.Point> getPolygonPoints(Geoshape polygon) { + List<Geoshape.Point> locations = new ArrayList<Geoshape.Point>(); + + int index = 0; + boolean hasCoordinates = true; + while (hasCoordinates) { + try { + locations.add(polygon.getPoint(index)); + } catch (ArrayIndexOutOfBoundsException ignore) { + //just means we asked for a point past the size of the list + //of known coordinates + hasCoordinates = false; + } + } + + return locations; + } + + /** + * Solr handles all transactions on the server-side. That means all + * commit, optimize, or rollback applies since the last commit/optimize/rollback. + * Solr documentation recommends best way to update Solr is in one process to avoid + * race conditions. + * + * @return New Transaction Handle + * @throws com.thinkaurelius.titan.diskstorage.BackendException + */ + @Override + public BaseTransactionConfigurable beginTransaction(BaseTransactionConfig config) throws BackendException { + return new DefaultTransaction(config); + } + + @Override + public void close() throws BackendException { + logger.trace("Shutting down connection to Solr", solrClient); + try { + solrClient.close(); + } catch (IOException e) { + throw new TemporaryBackendException(e); + } + } + + @Override + public void clearStorage() throws BackendException { + try { + if (mode!=Mode.CLOUD) throw new UnsupportedOperationException("Operation only supported for SolrCloud"); + logger.debug("Clearing storage from Solr: {}", solrClient); + ZkStateReader zkStateReader = ((CloudSolrClient) solrClient).getZkStateReader(); + zkStateReader.updateClusterState(true); + ClusterState clusterState = zkStateReader.getClusterState(); + for (String collection : clusterState.getCollections()) { + logger.debug("Clearing collection [{}] in Solr",collection); + UpdateRequest deleteAll = newUpdateRequest(); + deleteAll.deleteByQuery("*:*"); + solrClient.request(deleteAll, collection); + } + + } catch (SolrServerException e) { + logger.error("Unable to clear storage from index due to server error on Solr.", e); + throw new PermanentBackendException(e); + } catch (IOException e) { + logger.error("Unable to clear storage from index due to low-level I/O error.", e); + throw new PermanentBackendException(e); + } catch (Exception e) { + logger.error("Unable to clear storage from index due to general error.", e); + throw new PermanentBackendException(e); + } + } + + @Override + public boolean supports(KeyInformation information, TitanPredicate titanPredicate) { + Class<?> dataType = information.getDataType(); + Mapping mapping = Mapping.getMapping(information); + if (mapping!=Mapping.DEFAULT && !AttributeUtil.isString(dataType)) return false; + + if (Number.class.isAssignableFrom(dataType)) { + return titanPredicate instanceof Cmp; + } else if (dataType == Geoshape.class) { + return titanPredicate == Geo.WITHIN; + } else if (AttributeUtil.isString(dataType)) { + switch(mapping) { + case DEFAULT: + case TEXT: + return titanPredicate == Text.CONTAINS || titanPredicate == Text.CONTAINS_PREFIX || titanPredicate == Text.CONTAINS_REGEX; + case STRING: + return titanPredicate == Cmp.EQUAL || titanPredicate==Cmp.NOT_EQUAL || titanPredicate==Text.REGEX || titanPredicate==Text.PREFIX; + // case TEXTSTRING: + // return (titanPredicate instanceof Text) || titanPredicate == Cmp.EQUAL || titanPredicate==Cmp.NOT_EQUAL; + } + } else if (dataType == Date.class) { + if (titanPredicate instanceof Cmp) return true; + } else if (dataType == Boolean.class) { + return titanPredicate == Cmp.EQUAL || titanPredicate == Cmp.NOT_EQUAL; + } else if (dataType == UUID.class) { + return titanPredicate == Cmp.EQUAL || titanPredicate==Cmp.NOT_EQUAL; + } + return false; + } + + @Override + public boolean supports(KeyInformation information) { + Class<?> dataType = information.getDataType(); + Mapping mapping = Mapping.getMapping(information); + if (Number.class.isAssignableFrom(dataType) || dataType == Geoshape.class || dataType == Date.class || dataType == Boolean.class || dataType == UUID.class) { + if (mapping==Mapping.DEFAULT) return true; + } else if (AttributeUtil.isString(dataType)) { + if (mapping==Mapping.DEFAULT || mapping==Mapping.TEXT || mapping==Mapping.STRING) return true; + } + return false; + } + + @Override + public String mapKey2Field(String key, KeyInformation keyInfo) { + Preconditions.checkArgument(!StringUtils.containsAny(key, new char[]{' '}),"Invalid key name provided: %s",key); + if (!dynFields) return key; + if (ParameterType.MAPPED_NAME.hasParameter(keyInfo.getParameters())) return key; + String postfix; + Class datatype = keyInfo.getDataType(); + if (AttributeUtil.isString(datatype)) { + Mapping map = getStringMapping(keyInfo); + switch (map) { + case TEXT: postfix = "_t"; break; + case STRING: postfix = "_s"; break; + default: throw new IllegalArgumentException("Unsupported string mapping: " + map); + } + } else if (AttributeUtil.isWholeNumber(datatype)) { + if (datatype.equals(Long.class)) postfix = "_l"; + else postfix = "_i"; + } else if (AttributeUtil.isDecimal(datatype)) { + if (datatype.equals(Float.class)) postfix = "_f"; + else postfix = "_d"; + } else if (datatype.equals(Geoshape.class)) { + postfix = "_g"; + } else if (datatype.equals(Date.class)) { + postfix = "_dt"; + } else if (datatype.equals(Boolean.class)) { + postfix = "_b"; + } else if (datatype.equals(UUID.class)) { + postfix = "_uuid"; + } else throw new IllegalArgumentException("Unsupported data type ["+datatype+"] for field: " + key); + return key+postfix; + } + + @Override + public IndexFeatures getFeatures() { + return SOLR_FEATURES; + } + + /* + ################# UTILITY METHODS ####################### + */ + + private static Mapping getStringMapping(KeyInformation information) { + assert AttributeUtil.isString(information.getDataType()); + Mapping map = Mapping.getMapping(information); + if (map==Mapping.DEFAULT) map = Mapping.TEXT; + return map; + } + + private UpdateRequest newUpdateRequest() { + UpdateRequest req = new UpdateRequest(); + req.setAction(UpdateRequest.ACTION.COMMIT, true, true); + if (waitSearcher) { + req.setAction(UpdateRequest.ACTION.COMMIT, true, true); + } + return req; + } + + private BackendException storageException(Exception solrException) { + return new TemporaryBackendException("Unable to complete query on Solr.", solrException); + } + + private static void createCollectionIfNotExists(CloudSolrClient client, Configuration config, String collection) + throws IOException, SolrServerException, KeeperException, InterruptedException { + if (!checkIfCollectionExists(client, collection)) { + Integer numShards = config.get(NUM_SHARDS); + Integer maxShardsPerNode = config.get(MAX_SHARDS_PER_NODE); + Integer replicationFactor = config.get(REPLICATION_FACTOR); + + CollectionAdminRequest.Create createRequest = new CollectionAdminRequest.Create(); + + createRequest.setConfigName(collection); + createRequest.setCollectionName(collection); + createRequest.setNumShards(numShards); + createRequest.setMaxShardsPerNode(maxShardsPerNode); + createRequest.setReplicationFactor(replicationFactor); + + CollectionAdminResponse createResponse = createRequest.process(client); + if (createResponse.isSuccess()) { + logger.trace("Collection {} successfully created.", collection); + } else { + throw new SolrServerException(Joiner.on("\n").join(createResponse.getErrorMessages())); + } + } + + waitForRecoveriesToFinish(client, collection); + } + + /** + * Checks if the collection has already been created in Solr. + */ + private static boolean checkIfCollectionExists(CloudSolrClient server, String collection) throws KeeperException, InterruptedException { + ZkStateReader zkStateReader = server.getZkStateReader(); + zkStateReader.updateClusterState(true); + ClusterState clusterState = zkStateReader.getClusterState(); + return clusterState.getCollectionOrNull(collection) != null; + } + + /** + * Wait for all the collection shards to be ready. + */ + private static void waitForRecoveriesToFinish(CloudSolrClient server, String collection) throws KeeperException, InterruptedException { + ZkStateReader zkStateReader = server.getZkStateReader(); + try { + boolean cont = true; + + while (cont) { + boolean sawLiveRecovering = false; + zkStateReader.updateClusterState(true); + ClusterState clusterState = zkStateReader.getClusterState(); + Map<String, Slice> slices = clusterState.getSlicesMap(collection); + Preconditions.checkNotNull("Could not find collection:" + collection, slices); + + for (Map.Entry<String, Slice> entry : slices.entrySet()) { + Map<String, Replica> shards = entry.getValue().getReplicasMap(); + for (Map.Entry<String, Replica> shard : shards.entrySet()) { + String state = shard.getValue().getStr(ZkStateReader.STATE_PROP); + if ((state.equals(ZkStateReader.RECOVERING) + || state.equals(ZkStateReader.SYNC) || state + .equals(ZkStateReader.DOWN)) + && clusterState.liveNodesContain(shard.getValue().getStr( + ZkStateReader.NODE_NAME_PROP))) { + sawLiveRecovering = true; + } + } + } + if (!sawLiveRecovering) { + cont = false; + } else { + Thread.sleep(1000); + } + } + } finally { + logger.info("Exiting solr wait"); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/48343db9/repository/src/main/java/org/apache/atlas/repository/graph/TitanGraphProvider.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/TitanGraphProvider.java b/repository/src/main/java/org/apache/atlas/repository/graph/TitanGraphProvider.java index 5e61b9a..6605ae7 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/TitanGraphProvider.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/TitanGraphProvider.java @@ -18,9 +18,12 @@ package org.apache.atlas.repository.graph; +import com.google.common.collect.ImmutableMap; import com.google.inject.Provides; import com.thinkaurelius.titan.core.TitanFactory; import com.thinkaurelius.titan.core.TitanGraph; +import com.thinkaurelius.titan.diskstorage.StandardIndexProvider; +import com.thinkaurelius.titan.diskstorage.solr.Solr5Index; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasException; import org.apache.commons.configuration.Configuration; @@ -28,6 +31,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.inject.Singleton; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.HashMap; +import java.util.Map; /** * Default implementation for Graph Provider that doles out Titan Graph. @@ -48,6 +55,35 @@ public class TitanGraphProvider implements GraphProvider<TitanGraph> { return ApplicationProperties.getSubsetConfiguration(configProperties, GRAPH_PREFIX); } + static { + addSolr5Index(); + } + + /** + * Titan loads index backend name to implementation using StandardIndexProvider.ALL_MANAGER_CLASSES + * But StandardIndexProvider.ALL_MANAGER_CLASSES is a private static final ImmutableMap + * Only way to inject Solr5Index is to modify this field. So, using hacky reflection to add Sol5Index + */ + private static void addSolr5Index() { + try { + Field field = StandardIndexProvider.class.getDeclaredField("ALL_MANAGER_CLASSES"); + field.setAccessible(true); + + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); + + Map<String, String> customMap = new HashMap(StandardIndexProvider.getAllProviderClasses()); + customMap.put("solr5", Solr5Index.class.getName()); + ImmutableMap<String, String> immap = ImmutableMap.copyOf(customMap); + field.set(null, immap); + + LOG.debug("Injected solr5 index - {}", Solr5Index.class.getName()); + } catch(Exception e) { + throw new RuntimeException(e); + } + } + @Override @Singleton @Provides http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/48343db9/src/conf/solr/currency.xml ---------------------------------------------------------------------- diff --git a/src/conf/solr/currency.xml b/src/conf/solr/currency.xml new file mode 100644 index 0000000..3a9c58a --- /dev/null +++ b/src/conf/solr/currency.xml @@ -0,0 +1,67 @@ +<?xml version="1.0" ?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- Example exchange rates file for CurrencyField type named "currency" in example schema --> + +<currencyConfig version="1.0"> + <rates> + <!-- Updated from http://www.exchangerate.com/ at 2011-09-27 --> + <rate from="USD" to="ARS" rate="4.333871" comment="ARGENTINA Peso" /> + <rate from="USD" to="AUD" rate="1.025768" comment="AUSTRALIA Dollar" /> + <rate from="USD" to="EUR" rate="0.743676" comment="European Euro" /> + <rate from="USD" to="BRL" rate="1.881093" comment="BRAZIL Real" /> + <rate from="USD" to="CAD" rate="1.030815" comment="CANADA Dollar" /> + <rate from="USD" to="CLP" rate="519.0996" comment="CHILE Peso" /> + <rate from="USD" to="CNY" rate="6.387310" comment="CHINA Yuan" /> + <rate from="USD" to="CZK" rate="18.47134" comment="CZECH REP. Koruna" /> + <rate from="USD" to="DKK" rate="5.515436" comment="DENMARK Krone" /> + <rate from="USD" to="HKD" rate="7.801922" comment="HONG KONG Dollar" /> + <rate from="USD" to="HUF" rate="215.6169" comment="HUNGARY Forint" /> + <rate from="USD" to="ISK" rate="118.1280" comment="ICELAND Krona" /> + <rate from="USD" to="INR" rate="49.49088" comment="INDIA Rupee" /> + <rate from="USD" to="XDR" rate="0.641358" comment="INTNL MON. FUND SDR" /> + <rate from="USD" to="ILS" rate="3.709739" comment="ISRAEL Sheqel" /> + <rate from="USD" to="JPY" rate="76.32419" comment="JAPAN Yen" /> + <rate from="USD" to="KRW" rate="1169.173" comment="KOREA (SOUTH) Won" /> + <rate from="USD" to="KWD" rate="0.275142" comment="KUWAIT Dinar" /> + <rate from="USD" to="MXN" rate="13.85895" comment="MEXICO Peso" /> + <rate from="USD" to="NZD" rate="1.285159" comment="NEW ZEALAND Dollar" /> + <rate from="USD" to="NOK" rate="5.859035" comment="NORWAY Krone" /> + <rate from="USD" to="PKR" rate="87.57007" comment="PAKISTAN Rupee" /> + <rate from="USD" to="PEN" rate="2.730683" comment="PERU Sol" /> + <rate from="USD" to="PHP" rate="43.62039" comment="PHILIPPINES Peso" /> + <rate from="USD" to="PLN" rate="3.310139" comment="POLAND Zloty" /> + <rate from="USD" to="RON" rate="3.100932" comment="ROMANIA Leu" /> + <rate from="USD" to="RUB" rate="32.14663" comment="RUSSIA Ruble" /> + <rate from="USD" to="SAR" rate="3.750465" comment="SAUDI ARABIA Riyal" /> + <rate from="USD" to="SGD" rate="1.299352" comment="SINGAPORE Dollar" /> + <rate from="USD" to="ZAR" rate="8.329761" comment="SOUTH AFRICA Rand" /> + <rate from="USD" to="SEK" rate="6.883442" comment="SWEDEN Krona" /> + <rate from="USD" to="CHF" rate="0.906035" comment="SWITZERLAND Franc" /> + <rate from="USD" to="TWD" rate="30.40283" comment="TAIWAN Dollar" /> + <rate from="USD" to="THB" rate="30.89487" comment="THAILAND Baht" /> + <rate from="USD" to="AED" rate="3.672955" comment="U.A.E. Dirham" /> + <rate from="USD" to="UAH" rate="7.988582" comment="UKRAINE Hryvnia" /> + <rate from="USD" to="GBP" rate="0.647910" comment="UNITED KINGDOM Pound" /> + + <!-- Cross-rates for some common currencies --> + <rate from="EUR" to="GBP" rate="0.869914" /> + <rate from="EUR" to="NOK" rate="7.800095" /> + <rate from="GBP" to="NOK" rate="8.966508" /> + </rates> +</currencyConfig> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/48343db9/src/conf/solr/lang/stopwords_en.txt ---------------------------------------------------------------------- diff --git a/src/conf/solr/lang/stopwords_en.txt b/src/conf/solr/lang/stopwords_en.txt new file mode 100644 index 0000000..2c164c0 --- /dev/null +++ b/src/conf/solr/lang/stopwords_en.txt @@ -0,0 +1,54 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# a couple of test stopwords to test that the words are really being +# configured from this file: +stopworda +stopwordb + +# Standard english stop words taken from Lucene's StopAnalyzer +a +an +and +are +as +at +be +but +by +for +if +in +into +is +it +no +not +of +on +or +such +that +the +their +then +there +these +they +this +to +was +will +with http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/48343db9/src/conf/solr/protwords.txt ---------------------------------------------------------------------- diff --git a/src/conf/solr/protwords.txt b/src/conf/solr/protwords.txt new file mode 100644 index 0000000..1dfc0ab --- /dev/null +++ b/src/conf/solr/protwords.txt @@ -0,0 +1,21 @@ +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#----------------------------------------------------------------------- +# Use a protected word file to protect against the stemmer reducing two +# unrelated words to the same base word. + +# Some non-words that normally won't be encountered, +# just to test that they won't be stemmed. +dontstems +zwhacky +
