http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/repository/src/main/java/com/thinkaurelius/titan/diskstorage/solr/Solr5Index.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/solr/Solr5Index.java b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/solr/Solr5Index.java deleted file mode 100644 index 4a33221..0000000 --- a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/solr/Solr5Index.java +++ /dev/null @@ -1,972 +0,0 @@ -/* - * Copyright 2012-2013 Aurelius LLC - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.thinkaurelius.titan.diskstorage.solr; - -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.Sets; -import com.thinkaurelius.titan.core.Order; -import com.thinkaurelius.titan.core.TitanElement; -import com.thinkaurelius.titan.core.attribute.Cmp; -import com.thinkaurelius.titan.core.attribute.Geo; -import com.thinkaurelius.titan.core.attribute.Geoshape; -import com.thinkaurelius.titan.core.attribute.Text; -import com.thinkaurelius.titan.core.schema.Mapping; -import com.thinkaurelius.titan.diskstorage.BackendException; -import com.thinkaurelius.titan.diskstorage.BaseTransaction; -import com.thinkaurelius.titan.diskstorage.BaseTransactionConfig; -import com.thinkaurelius.titan.diskstorage.BaseTransactionConfigurable; -import com.thinkaurelius.titan.diskstorage.PermanentBackendException; -import com.thinkaurelius.titan.diskstorage.TemporaryBackendException; -import com.thinkaurelius.titan.diskstorage.configuration.ConfigNamespace; -import com.thinkaurelius.titan.diskstorage.configuration.ConfigOption; -import com.thinkaurelius.titan.diskstorage.configuration.Configuration; -import com.thinkaurelius.titan.diskstorage.indexing.IndexEntry; -import com.thinkaurelius.titan.diskstorage.indexing.IndexFeatures; -import com.thinkaurelius.titan.diskstorage.indexing.IndexMutation; -import com.thinkaurelius.titan.diskstorage.indexing.IndexProvider; -import com.thinkaurelius.titan.diskstorage.indexing.IndexQuery; -import com.thinkaurelius.titan.diskstorage.indexing.KeyInformation; -import com.thinkaurelius.titan.diskstorage.indexing.RawQuery; -import com.thinkaurelius.titan.diskstorage.util.DefaultTransaction; -import com.thinkaurelius.titan.graphdb.configuration.PreInitializeConfigOptions; -import com.thinkaurelius.titan.graphdb.database.serialize.AttributeUtil; -import com.thinkaurelius.titan.graphdb.database.serialize.attribute.AbstractDecimal; -import com.thinkaurelius.titan.graphdb.query.TitanPredicate; -import com.thinkaurelius.titan.graphdb.query.condition.And; -import com.thinkaurelius.titan.graphdb.query.condition.Condition; -import com.thinkaurelius.titan.graphdb.query.condition.Not; -import com.thinkaurelius.titan.graphdb.query.condition.Or; -import com.thinkaurelius.titan.graphdb.query.condition.PredicateCondition; -import com.thinkaurelius.titan.graphdb.types.ParameterType; -import org.apache.commons.lang.StringUtils; -import org.apache.http.client.HttpClient; -import org.apache.solr.client.solrj.SolrClient; -import org.apache.solr.client.solrj.SolrQuery; -import org.apache.solr.client.solrj.SolrServerException; -import org.apache.solr.client.solrj.impl.CloudSolrClient; -import org.apache.solr.client.solrj.impl.HttpClientUtil; -import org.apache.solr.client.solrj.impl.HttpSolrClient; -import org.apache.solr.client.solrj.impl.LBHttpSolrClient; -import org.apache.solr.client.solrj.request.CollectionAdminRequest; -import org.apache.solr.client.solrj.request.UpdateRequest; -import org.apache.solr.client.solrj.response.CollectionAdminResponse; -import org.apache.solr.client.solrj.response.QueryResponse; -import org.apache.solr.client.solrj.util.ClientUtils; -import org.apache.solr.common.SolrDocument; -import org.apache.solr.common.SolrInputDocument; -import org.apache.solr.common.cloud.ClusterState; -import org.apache.solr.common.cloud.Replica; -import org.apache.solr.common.cloud.Slice; -import org.apache.solr.common.cloud.ZkStateReader; -import org.apache.solr.common.params.ModifiableSolrParams; -import org.apache.zookeeper.KeeperException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.TimeZone; -import java.util.UUID; - -import static com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration.INDEX_MAX_RESULT_SET_SIZE; -import static com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration.INDEX_NS; - -/** - * NOTE: Copied from titan for supporting sol5. Do not change - */ -@PreInitializeConfigOptions -public class Solr5Index implements IndexProvider { - - private static final Logger logger = LoggerFactory.getLogger(Solr5Index.class); - - - private static final String DEFAULT_ID_FIELD = "id"; - - private enum Mode { - HTTP, CLOUD; - - public static Mode parse(String mode) { - for (Mode m : Mode.values()) { - if (m.toString().equalsIgnoreCase(mode)) return m; - } - throw new IllegalArgumentException("Unrecognized mode: "+mode); - } - } - - public static final ConfigNamespace SOLR_NS = - new ConfigNamespace(INDEX_NS, "solr", "Solr index configuration"); - - public static final ConfigOption<String> SOLR_MODE = new ConfigOption<String>(SOLR_NS,"mode", - "The operation mode for Solr which is either via HTTP (`http`) or using SolrCloud (`cloud`)", - ConfigOption.Type.GLOBAL_OFFLINE, "cloud"); - - public static final ConfigOption<Boolean> DYNAMIC_FIELDS = new ConfigOption<Boolean>(SOLR_NS,"dyn-fields", - "Whether to use dynamic fields (which appends the data type to the field name). If dynamic fields is disabled" + - "the user must map field names and define them explicitly in the schema.", - ConfigOption.Type.GLOBAL_OFFLINE, true); - - public static final ConfigOption<String[]> KEY_FIELD_NAMES = new ConfigOption<String[]>(SOLR_NS,"key-field-names", - "Field name that uniquely identifies each document in Solr. Must be specified as a list of `collection=field`.", - ConfigOption.Type.GLOBAL, String[].class); - - public static final ConfigOption<String> TTL_FIELD = new ConfigOption<String>(SOLR_NS,"ttl_field", - "Name of the TTL field for Solr collections.", - ConfigOption.Type.GLOBAL_OFFLINE, "ttl"); - - /** SolrCloud Configuration */ - - public static final ConfigOption<String> ZOOKEEPER_URL = new ConfigOption<String>(SOLR_NS,"zookeeper-url", - "URL of the Zookeeper instance coordinating the SolrCloud cluster", - ConfigOption.Type.MASKABLE, "localhost:2181"); - - public static final ConfigOption<Integer> NUM_SHARDS = new ConfigOption<Integer>(SOLR_NS,"num-shards", - "Number of shards for a collection. This applies when creating a new collection which is only supported under the SolrCloud operation mode.", - ConfigOption.Type.GLOBAL_OFFLINE, 1); - - public static final ConfigOption<Integer> MAX_SHARDS_PER_NODE = new ConfigOption<Integer>(SOLR_NS,"max-shards-per-node", - "Maximum number of shards per node. This applies when creating a new collection which is only supported under the SolrCloud operation mode.", - ConfigOption.Type.GLOBAL_OFFLINE, 1); - - public static final ConfigOption<Integer> REPLICATION_FACTOR = new ConfigOption<Integer>(SOLR_NS,"replication-factor", - "Replication factor for a collection. This applies when creating a new collection which is only supported under the SolrCloud operation mode.", - ConfigOption.Type.GLOBAL_OFFLINE, 1); - - - /** HTTP Configuration */ - - public static final ConfigOption<String[]> HTTP_URLS = new ConfigOption<String[]>(SOLR_NS,"http-urls", - "List of URLs to use to connect to Solr Servers (LBHttpSolrClient is used), don't add core or collection name to the URL.", - ConfigOption.Type.MASKABLE, new String[] { "http://localhost:8983/solr" }); - - public static final ConfigOption<Integer> HTTP_CONNECTION_TIMEOUT = new ConfigOption<Integer>(SOLR_NS,"http-connection-timeout", - "Solr HTTP connection timeout.", - ConfigOption.Type.MASKABLE, 5000); - - public static final ConfigOption<Boolean> HTTP_ALLOW_COMPRESSION = new ConfigOption<Boolean>(SOLR_NS,"http-compression", - "Enable/disable compression on the HTTP connections made to Solr.", - ConfigOption.Type.MASKABLE, false); - - public static final ConfigOption<Integer> HTTP_MAX_CONNECTIONS_PER_HOST = new ConfigOption<Integer>(SOLR_NS,"http-max-per-host", - "Maximum number of HTTP connections per Solr host.", - ConfigOption.Type.MASKABLE, 20); - - public static final ConfigOption<Integer> HTTP_GLOBAL_MAX_CONNECTIONS = new ConfigOption<Integer>(SOLR_NS,"http-max", - "Maximum number of HTTP connections in total to all Solr servers.", - ConfigOption.Type.MASKABLE, 100); - - public static final ConfigOption<Boolean> WAIT_SEARCHER = new ConfigOption<Boolean>(SOLR_NS, "wait-searcher", - "When mutating - wait for the index to reflect new mutations before returning. This can have a negative impact on performance.", - ConfigOption.Type.LOCAL, false); - - - - private static final IndexFeatures SOLR_FEATURES = new IndexFeatures.Builder().supportsDocumentTTL() - .setDefaultStringMapping(Mapping.TEXT).supportedStringMappings(Mapping.TEXT, Mapping.STRING).build(); - - private final SolrClient solrClient; - private final Configuration configuration; - private final Mode mode; - private final boolean dynFields; - private final Map<String, String> keyFieldIds; - private final String ttlField; - private final int maxResults; - private final boolean waitSearcher; - - public Solr5Index(final Configuration config) throws BackendException { - Preconditions.checkArgument(config!=null); - configuration = config; - - mode = Mode.parse(config.get(SOLR_MODE)); - dynFields = config.get(DYNAMIC_FIELDS); - keyFieldIds = parseKeyFieldsForCollections(config); - maxResults = config.get(INDEX_MAX_RESULT_SET_SIZE); - ttlField = config.get(TTL_FIELD); - waitSearcher = config.get(WAIT_SEARCHER); - - if (mode==Mode.CLOUD) { - String zookeeperUrl = config.get(Solr5Index.ZOOKEEPER_URL); - CloudSolrClient cloudServer = new CloudSolrClient(zookeeperUrl, true); - cloudServer.connect(); - solrClient = cloudServer; - } else if (mode==Mode.HTTP) { - HttpClient clientParams = HttpClientUtil.createClient(new ModifiableSolrParams() {{ - add(HttpClientUtil.PROP_ALLOW_COMPRESSION, config.get(HTTP_ALLOW_COMPRESSION).toString()); - add(HttpClientUtil.PROP_CONNECTION_TIMEOUT, config.get(HTTP_CONNECTION_TIMEOUT).toString()); - add(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, config.get(HTTP_MAX_CONNECTIONS_PER_HOST).toString()); - add(HttpClientUtil.PROP_MAX_CONNECTIONS, config.get(HTTP_GLOBAL_MAX_CONNECTIONS).toString()); - }}); - - solrClient = new LBHttpSolrClient(clientParams, config.get(HTTP_URLS)); - - - } else { - throw new IllegalArgumentException("Unsupported Solr operation mode: " + mode); - } - } - - private Map<String, String> parseKeyFieldsForCollections(Configuration config) throws BackendException { - Map<String, String> keyFieldNames = new HashMap<String, String>(); - String[] collectionFieldStatements = config.has(KEY_FIELD_NAMES)?config.get(KEY_FIELD_NAMES):new String[0]; - for (String collectionFieldStatement : collectionFieldStatements) { - String[] parts = collectionFieldStatement.trim().split("="); - if (parts.length != 2) { - throw new PermanentBackendException("Unable to parse the collection name / key field name pair. It should be of the format collection=field"); - } - String collectionName = parts[0]; - String keyFieldName = parts[1]; - keyFieldNames.put(collectionName, keyFieldName); - } - return keyFieldNames; - } - - private String getKeyFieldId(String collection) { - String field = keyFieldIds.get(collection); - if (field==null) field = DEFAULT_ID_FIELD; - return field; - } - - /** - * Unlike the ElasticSearch Index, which is schema free, Solr requires a schema to - * support searching. This means that you will need to modify the solr schema with the - * appropriate field definitions in order to work properly. If you have a running instance - * of Solr and you modify its schema with new fields, don't forget to re-index! - * @param store Index store - * @param key New key to register - * @param information Datatype to register for the key - * @param tx enclosing transaction - * @throws com.thinkaurelius.titan.diskstorage.BackendException - */ - @Override - public void register(String store, String key, KeyInformation information, BaseTransaction tx) throws BackendException { - if (mode==Mode.CLOUD) { - CloudSolrClient client = (CloudSolrClient) solrClient; - try { - createCollectionIfNotExists(client, configuration, store); - } catch (IOException e) { - throw new PermanentBackendException(e); - } catch (SolrServerException e) { - throw new PermanentBackendException(e); - } catch (InterruptedException e) { - throw new PermanentBackendException(e); - } catch (KeeperException e) { - throw new PermanentBackendException(e); - } - } - //Since all data types must be defined in the schema.xml, pre-registering a type does not work - } - - @Override - public void mutate(Map<String, Map<String, IndexMutation>> mutations, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException { - logger.debug("Mutating SOLR"); - try { - for (Map.Entry<String, Map<String, IndexMutation>> stores : mutations.entrySet()) { - String collectionName = stores.getKey(); - String keyIdField = getKeyFieldId(collectionName); - - List<String> deleteIds = new ArrayList<String>(); - Collection<SolrInputDocument> changes = new ArrayList<SolrInputDocument>(); - - for (Map.Entry<String, IndexMutation> entry : stores.getValue().entrySet()) { - String docId = entry.getKey(); - IndexMutation mutation = entry.getValue(); - Preconditions.checkArgument(!(mutation.isNew() && mutation.isDeleted())); - Preconditions.checkArgument(!mutation.isNew() || !mutation.hasDeletions()); - Preconditions.checkArgument(!mutation.isDeleted() || !mutation.hasAdditions()); - - //Handle any deletions - if (mutation.hasDeletions()) { - if (mutation.isDeleted()) { - logger.trace("Deleting entire document {}", docId); - deleteIds.add(docId); - } else { - HashSet<IndexEntry> fieldDeletions = Sets.newHashSet(mutation.getDeletions()); - if (mutation.hasAdditions()) { - for (IndexEntry indexEntry : mutation.getAdditions()) { - fieldDeletions.remove(indexEntry); - } - } - deleteIndividualFieldsFromIndex(collectionName, keyIdField, docId, fieldDeletions); - } - } - - if (mutation.hasAdditions()) { - int ttl = mutation.determineTTL(); - - SolrInputDocument doc = new SolrInputDocument(); - doc.setField(keyIdField, docId); - - boolean isNewDoc = mutation.isNew(); - - if (isNewDoc) - logger.trace("Adding new document {}", docId); - - for (IndexEntry e : mutation.getAdditions()) { - final Object fieldValue = convertValue(e.value); - doc.setField(e.field, isNewDoc - ? fieldValue : new HashMap<String, Object>(1) {{ put("set", fieldValue); }}); - } - if (ttl>0) { - Preconditions.checkArgument(isNewDoc,"Solr only supports TTL on new documents [%s]",docId); - doc.setField(ttlField, String.format("+%dSECONDS", ttl)); - } - changes.add(doc); - } - } - - commitDeletes(collectionName, deleteIds); - commitDocumentChanges(collectionName, changes); - } - } catch (Exception e) { - throw storageException(e); - } - } - - private Object convertValue(Object value) throws BackendException { - if (value instanceof Geoshape) - return GeoToWktConverter.convertToWktString((Geoshape) value); - // in order to serialize/deserialize properly Solr will have to have an - // access to Titan source which has Decimal type, so for now we simply convert to - // double and let Solr do the same thing or fail. - if (value instanceof AbstractDecimal) - return ((AbstractDecimal) value).doubleValue(); - if (value instanceof UUID) - return value.toString(); - return value; - } - - @Override - public void restore(Map<String, Map<String, List<IndexEntry>>> documents, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException { - try { - for (Map.Entry<String, Map<String, List<IndexEntry>>> stores : documents.entrySet()) { - final String collectionName = stores.getKey(); - - List<String> deleteIds = new ArrayList<String>(); - List<SolrInputDocument> newDocuments = new ArrayList<SolrInputDocument>(); - - for (Map.Entry<String, List<IndexEntry>> entry : stores.getValue().entrySet()) { - final String docID = entry.getKey(); - final List<IndexEntry> content = entry.getValue(); - - if (content == null || content.isEmpty()) { - if (logger.isTraceEnabled()) - logger.trace("Deleting document [{}]", docID); - - deleteIds.add(docID); - continue; - } - - newDocuments.add(new SolrInputDocument() {{ - setField(getKeyFieldId(collectionName), docID); - - for (IndexEntry addition : content) { - Object fieldValue = addition.value; - setField(addition.field, convertValue(fieldValue)); - } - }}); - } - - commitDeletes(collectionName, deleteIds); - commitDocumentChanges(collectionName, newDocuments); - } - } catch (Exception e) { - throw new TemporaryBackendException("Could not restore Solr index", e); - } - } - - private void deleteIndividualFieldsFromIndex(String collectionName, String keyIdField, String docId, HashSet<IndexEntry> fieldDeletions) throws SolrServerException, IOException { - if (fieldDeletions.isEmpty()) return; - - Map<String, String> fieldDeletes = new HashMap<String, String>(1) {{ put("set", null); }}; - - SolrInputDocument doc = new SolrInputDocument(); - doc.addField(keyIdField, docId); - StringBuilder sb = new StringBuilder(); - for (IndexEntry fieldToDelete : fieldDeletions) { - doc.addField(fieldToDelete.field, fieldDeletes); - sb.append(fieldToDelete).append(","); - } - - if (logger.isTraceEnabled()) - logger.trace("Deleting individual fields [{}] for document {}", sb.toString(), docId); - - UpdateRequest singleDocument = newUpdateRequest(); - singleDocument.add(doc); - solrClient.request(singleDocument, collectionName); - } - - private void commitDocumentChanges(String collectionName, Collection<SolrInputDocument> documents) throws SolrServerException, IOException { - if (documents.size() == 0) return; - - try { - solrClient.request(newUpdateRequest().add(documents), collectionName); - } catch (HttpSolrClient.RemoteSolrException rse) { - logger.error("Unable to save documents to Solr as one of the shape objects stored were not compatible with Solr.", rse); - logger.error("Details in failed document batch: "); - for (SolrInputDocument d : documents) { - Collection<String> fieldNames = d.getFieldNames(); - for (String name : fieldNames) { - logger.error(name + ":" + d.getFieldValue(name).toString()); - } - } - - throw rse; - } - } - - private void commitDeletes(String collectionName, List<String> deleteIds) throws SolrServerException, IOException { - if (deleteIds.size() == 0) return; - solrClient.request(newUpdateRequest().deleteById(deleteIds), collectionName); - } - - @Override - public List<String> query(IndexQuery query, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException { - List<String> result; - String collection = query.getStore(); - String keyIdField = getKeyFieldId(collection); - SolrQuery solrQuery = new SolrQuery("*:*"); - String queryFilter = buildQueryFilter(query.getCondition(), informations.get(collection)); - solrQuery.addFilterQuery(queryFilter); - if (!query.getOrder().isEmpty()) { - List<IndexQuery.OrderEntry> orders = query.getOrder(); - for (IndexQuery.OrderEntry order1 : orders) { - String item = order1.getKey(); - SolrQuery.ORDER order = order1.getOrder() == Order.ASC ? SolrQuery.ORDER.asc : SolrQuery.ORDER.desc; - solrQuery.addSort(new SolrQuery.SortClause(item, order)); - } - } - solrQuery.setStart(0); - if (query.hasLimit()) { - solrQuery.setRows(query.getLimit()); - } else { - solrQuery.setRows(maxResults); - } - try { - QueryResponse response = solrClient.query(collection, solrQuery); - - if (logger.isDebugEnabled()) - logger.debug("Executed query [{}] in {} ms", query.getCondition(), response.getElapsedTime()); - - int totalHits = response.getResults().size(); - - if (!query.hasLimit() && totalHits >= maxResults) - logger.warn("Query result set truncated to first [{}] elements for query: {}", maxResults, query); - - result = new ArrayList<String>(totalHits); - for (SolrDocument hit : response.getResults()) { - result.add(hit.getFieldValue(keyIdField).toString()); - } - } catch (IOException e) { - logger.error("Query did not complete : ", e); - throw new PermanentBackendException(e); - } catch (SolrServerException e) { - logger.error("Unable to query Solr index.", e); - throw new PermanentBackendException(e); - } - return result; - } - - @Override - public Iterable<RawQuery.Result<String>> query(RawQuery query, KeyInformation.IndexRetriever informations, BaseTransaction tx) throws BackendException { - List<RawQuery.Result<String>> result; - String collection = query.getStore(); - String keyIdField = getKeyFieldId(collection); - SolrQuery solrQuery = new SolrQuery(query.getQuery()) - .addField(keyIdField) - .setIncludeScore(true) - .setStart(query.getOffset()) - .setRows(query.hasLimit() ? query.getLimit() : maxResults); - - try { - QueryResponse response = solrClient.query(collection, solrQuery); - if (logger.isDebugEnabled()) - logger.debug("Executed query [{}] in {} ms", query.getQuery(), response.getElapsedTime()); - - int totalHits = response.getResults().size(); - if (!query.hasLimit() && totalHits >= maxResults) { - logger.warn("Query result set truncated to first [{}] elements for query: {}", maxResults, query); - } - result = new ArrayList<RawQuery.Result<String>>(totalHits); - - for (SolrDocument hit : response.getResults()) { - double score = Double.parseDouble(hit.getFieldValue("score").toString()); - result.add(new RawQuery.Result<String>(hit.getFieldValue(keyIdField).toString(), score)); - } - } catch (IOException e) { - logger.error("Query did not complete : ", e); - throw new PermanentBackendException(e); - } catch (SolrServerException e) { - logger.error("Unable to query Solr index.", e); - throw new PermanentBackendException(e); - } - return result; - } - - private static String escapeValue(Object value) { - return ClientUtils.escapeQueryChars(value.toString()); - } - - public String buildQueryFilter(Condition<TitanElement> condition, KeyInformation.StoreRetriever informations) { - if (condition instanceof PredicateCondition) { - PredicateCondition<String, TitanElement> atom = (PredicateCondition<String, TitanElement>) condition; - Object value = atom.getValue(); - String key = atom.getKey(); - TitanPredicate titanPredicate = atom.getPredicate(); - - if (value instanceof Number) { - String queryValue = escapeValue(value); - Preconditions.checkArgument(titanPredicate instanceof Cmp, "Relation not supported on numeric types: " + titanPredicate); - Cmp numRel = (Cmp) titanPredicate; - switch (numRel) { - case EQUAL: - return (key + ":" + queryValue); - case NOT_EQUAL: - return ("-" + key + ":" + queryValue); - case LESS_THAN: - //use right curly to mean up to but not including value - return (key + ":[* TO " + queryValue + "}"); - case LESS_THAN_EQUAL: - return (key + ":[* TO " + queryValue + "]"); - case GREATER_THAN: - //use left curly to mean greater than but not including value - return (key + ":{" + queryValue + " TO *]"); - case GREATER_THAN_EQUAL: - return (key + ":[" + queryValue + " TO *]"); - default: throw new IllegalArgumentException("Unexpected relation: " + numRel); - } - } else if (value instanceof String) { - Mapping map = getStringMapping(informations.get(key)); - assert map==Mapping.TEXT || map==Mapping.STRING; - if (map==Mapping.TEXT && !titanPredicate.toString().startsWith("CONTAINS")) - throw new IllegalArgumentException("Text mapped string values only support CONTAINS queries and not: " + titanPredicate); - if (map==Mapping.STRING && titanPredicate.toString().startsWith("CONTAINS")) - throw new IllegalArgumentException("String mapped string values do not support CONTAINS queries: " + titanPredicate); - - //Special case - if (titanPredicate == Text.CONTAINS) { - //e.g. - if terms tomorrow and world were supplied, and fq=text:(tomorrow world) - //sample data set would return 2 documents: one where text = Tomorrow is the World, - //and the second where text = Hello World. Hence, we are decomposing the query string - //and building an AND query explicitly because we need AND semantics - value = ((String) value).toLowerCase(); - List<String> terms = Text.tokenize((String) value); - - if (terms.isEmpty()) { - return ""; - } else if (terms.size() == 1) { - return (key + ":(" + escapeValue(terms.get(0)) + ")"); - } else { - And<TitanElement> andTerms = new And<TitanElement>(); - for (String term : terms) { - andTerms.add(new PredicateCondition<String, TitanElement>(key, titanPredicate, term)); - } - return buildQueryFilter(andTerms, informations); - } - } - if (titanPredicate == Text.PREFIX || titanPredicate == Text.CONTAINS_PREFIX) { - return (key + ":" + escapeValue(value) + "*"); - } else if (titanPredicate == Text.REGEX || titanPredicate == Text.CONTAINS_REGEX) { - return (key + ":/" + value + "/"); - } else if (titanPredicate == Cmp.EQUAL) { - return (key + ":\"" + escapeValue(value) + "\""); - } else if (titanPredicate == Cmp.NOT_EQUAL) { - return ("-" + key + ":\"" + escapeValue(value) + "\""); - } else { - throw new IllegalArgumentException("Relation is not supported for string value: " + titanPredicate); - } - } else if (value instanceof Geoshape) { - Geoshape geo = (Geoshape)value; - if (geo.getType() == Geoshape.Type.CIRCLE) { - Geoshape.Point center = geo.getPoint(); - return ("{!geofilt sfield=" + key + - " pt=" + center.getLatitude() + "," + center.getLongitude() + - " d=" + geo.getRadius() + "} distErrPct=0"); //distance in kilometers - } else if (geo.getType() == Geoshape.Type.BOX) { - Geoshape.Point southwest = geo.getPoint(0); - Geoshape.Point northeast = geo.getPoint(1); - return (key + ":[" + southwest.getLatitude() + "," + southwest.getLongitude() + - " TO " + northeast.getLatitude() + "," + northeast.getLongitude() + "]"); - } else if (geo.getType() == Geoshape.Type.POLYGON) { - List<Geoshape.Point> coordinates = getPolygonPoints(geo); - StringBuilder poly = new StringBuilder(key + ":\"IsWithin(POLYGON(("); - for (Geoshape.Point coordinate : coordinates) { - poly.append(coordinate.getLongitude()).append(" ").append(coordinate.getLatitude()).append(", "); - } - //close the polygon with the first coordinate - poly.append(coordinates.get(0).getLongitude()).append(" ").append(coordinates.get(0).getLatitude()); - poly.append(")))\" distErrPct=0"); - return (poly.toString()); - } - } else if (value instanceof Date) { - String queryValue = escapeValue(toIsoDate((Date)value)); - Preconditions.checkArgument(titanPredicate instanceof Cmp, "Relation not supported on date types: " + titanPredicate); - Cmp numRel = (Cmp) titanPredicate; - - switch (numRel) { - case EQUAL: - return (key + ":" + queryValue); - case NOT_EQUAL: - return ("-" + key + ":" + queryValue); - case LESS_THAN: - //use right curly to mean up to but not including value - return (key + ":[* TO " + queryValue + "}"); - case LESS_THAN_EQUAL: - return (key + ":[* TO " + queryValue + "]"); - case GREATER_THAN: - //use left curly to mean greater than but not including value - return (key + ":{" + queryValue + " TO *]"); - case GREATER_THAN_EQUAL: - return (key + ":[" + queryValue + " TO *]"); - default: throw new IllegalArgumentException("Unexpected relation: " + numRel); - } - } else if (value instanceof Boolean) { - Cmp numRel = (Cmp) titanPredicate; - String queryValue = escapeValue(value); - switch (numRel) { - case EQUAL: - return (key + ":" + queryValue); - case NOT_EQUAL: - return ("-" + key + ":" + queryValue); - default: - throw new IllegalArgumentException("Boolean types only support EQUAL or NOT_EQUAL"); - } - } else if (value instanceof UUID) { - if (titanPredicate == Cmp.EQUAL) { - return (key + ":\"" + escapeValue(value) + "\""); - } else if (titanPredicate == Cmp.NOT_EQUAL) { - return ("-" + key + ":\"" + escapeValue(value) + "\""); - } else { - throw new IllegalArgumentException("Relation is not supported for uuid value: " + titanPredicate); - } - } else throw new IllegalArgumentException("Unsupported type: " + value); - } else if (condition instanceof Not) { - String sub = buildQueryFilter(((Not)condition).getChild(),informations); - if (StringUtils.isNotBlank(sub)) return "-("+sub+")"; - else return ""; - } else if (condition instanceof And) { - int numChildren = ((And) condition).size(); - StringBuilder sb = new StringBuilder(); - for (Condition<TitanElement> c : condition.getChildren()) { - String sub = buildQueryFilter(c, informations); - - if (StringUtils.isBlank(sub)) - continue; - - // we don't have to add "+" which means AND iff - // a. it's a NOT query, - // b. expression is a single statement in the AND. - if (!sub.startsWith("-") && numChildren > 1) - sb.append("+"); - - sb.append(sub).append(" "); - } - return sb.toString(); - } else if (condition instanceof Or) { - StringBuilder sb = new StringBuilder(); - int element=0; - for (Condition<TitanElement> c : condition.getChildren()) { - String sub = buildQueryFilter(c,informations); - if (StringUtils.isBlank(sub)) continue; - if (element==0) sb.append("("); - else sb.append(" OR "); - sb.append(sub); - element++; - } - if (element>0) sb.append(")"); - return sb.toString(); - } else { - throw new IllegalArgumentException("Invalid condition: " + condition); - } - return null; - } - - private String toIsoDate(Date value) { - TimeZone tz = TimeZone.getTimeZone("UTC"); - DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); - df.setTimeZone(tz); - return df.format(value); - } - - private List<Geoshape.Point> getPolygonPoints(Geoshape polygon) { - List<Geoshape.Point> locations = new ArrayList<Geoshape.Point>(); - - int index = 0; - boolean hasCoordinates = true; - while (hasCoordinates) { - try { - locations.add(polygon.getPoint(index)); - } catch (ArrayIndexOutOfBoundsException ignore) { - //just means we asked for a point past the size of the list - //of known coordinates - hasCoordinates = false; - } - } - - return locations; - } - - /** - * Solr handles all transactions on the server-side. That means all - * commit, optimize, or rollback applies since the last commit/optimize/rollback. - * Solr documentation recommends best way to update Solr is in one process to avoid - * race conditions. - * - * @return New Transaction Handle - * @throws com.thinkaurelius.titan.diskstorage.BackendException - */ - @Override - public BaseTransactionConfigurable beginTransaction(BaseTransactionConfig config) throws BackendException { - return new DefaultTransaction(config); - } - - @Override - public void close() throws BackendException { - logger.trace("Shutting down connection to Solr", solrClient); - try { - solrClient.close(); - } catch (IOException e) { - throw new TemporaryBackendException(e); - } - } - - @Override - public void clearStorage() throws BackendException { - try { - if (mode!=Mode.CLOUD) throw new UnsupportedOperationException("Operation only supported for SolrCloud"); - logger.debug("Clearing storage from Solr: {}", solrClient); - ZkStateReader zkStateReader = ((CloudSolrClient) solrClient).getZkStateReader(); - zkStateReader.updateClusterState(true); - ClusterState clusterState = zkStateReader.getClusterState(); - for (String collection : clusterState.getCollections()) { - logger.debug("Clearing collection [{}] in Solr",collection); - UpdateRequest deleteAll = newUpdateRequest(); - deleteAll.deleteByQuery("*:*"); - solrClient.request(deleteAll, collection); - } - - } catch (SolrServerException e) { - logger.error("Unable to clear storage from index due to server error on Solr.", e); - throw new PermanentBackendException(e); - } catch (IOException e) { - logger.error("Unable to clear storage from index due to low-level I/O error.", e); - throw new PermanentBackendException(e); - } catch (Exception e) { - logger.error("Unable to clear storage from index due to general error.", e); - throw new PermanentBackendException(e); - } - } - - @Override - public boolean supports(KeyInformation information, TitanPredicate titanPredicate) { - Class<?> dataType = information.getDataType(); - Mapping mapping = Mapping.getMapping(information); - if (mapping!=Mapping.DEFAULT && !AttributeUtil.isString(dataType)) return false; - - if (Number.class.isAssignableFrom(dataType)) { - return titanPredicate instanceof Cmp; - } else if (dataType == Geoshape.class) { - return titanPredicate == Geo.WITHIN; - } else if (AttributeUtil.isString(dataType)) { - switch(mapping) { - case DEFAULT: - case TEXT: - return titanPredicate == Text.CONTAINS || titanPredicate == Text.CONTAINS_PREFIX || titanPredicate == Text.CONTAINS_REGEX; - case STRING: - return titanPredicate == Cmp.EQUAL || titanPredicate==Cmp.NOT_EQUAL || titanPredicate==Text.REGEX || titanPredicate==Text.PREFIX; - // case TEXTSTRING: - // return (titanPredicate instanceof Text) || titanPredicate == Cmp.EQUAL || titanPredicate==Cmp.NOT_EQUAL; - } - } else if (dataType == Date.class) { - if (titanPredicate instanceof Cmp) return true; - } else if (dataType == Boolean.class) { - return titanPredicate == Cmp.EQUAL || titanPredicate == Cmp.NOT_EQUAL; - } else if (dataType == UUID.class) { - return titanPredicate == Cmp.EQUAL || titanPredicate==Cmp.NOT_EQUAL; - } - return false; - } - - @Override - public boolean supports(KeyInformation information) { - Class<?> dataType = information.getDataType(); - Mapping mapping = Mapping.getMapping(information); - if (Number.class.isAssignableFrom(dataType) || dataType == Geoshape.class || dataType == Date.class || dataType == Boolean.class || dataType == UUID.class) { - if (mapping==Mapping.DEFAULT) return true; - } else if (AttributeUtil.isString(dataType)) { - if (mapping==Mapping.DEFAULT || mapping==Mapping.TEXT || mapping==Mapping.STRING) return true; - } - return false; - } - - @Override - public String mapKey2Field(String key, KeyInformation keyInfo) { - Preconditions.checkArgument(!StringUtils.containsAny(key, new char[]{' '}),"Invalid key name provided: %s",key); - if (!dynFields) return key; - if (ParameterType.MAPPED_NAME.hasParameter(keyInfo.getParameters())) return key; - String postfix; - Class datatype = keyInfo.getDataType(); - if (AttributeUtil.isString(datatype)) { - Mapping map = getStringMapping(keyInfo); - switch (map) { - case TEXT: postfix = "_t"; break; - case STRING: postfix = "_s"; break; - default: throw new IllegalArgumentException("Unsupported string mapping: " + map); - } - } else if (AttributeUtil.isWholeNumber(datatype)) { - if (datatype.equals(Long.class)) postfix = "_l"; - else postfix = "_i"; - } else if (AttributeUtil.isDecimal(datatype)) { - if (datatype.equals(Float.class)) postfix = "_f"; - else postfix = "_d"; - } else if (datatype.equals(Geoshape.class)) { - postfix = "_g"; - } else if (datatype.equals(Date.class)) { - postfix = "_dt"; - } else if (datatype.equals(Boolean.class)) { - postfix = "_b"; - } else if (datatype.equals(UUID.class)) { - postfix = "_uuid"; - } else throw new IllegalArgumentException("Unsupported data type ["+datatype+"] for field: " + key); - return key+postfix; - } - - @Override - public IndexFeatures getFeatures() { - return SOLR_FEATURES; - } - - /* - ################# UTILITY METHODS ####################### - */ - - private static Mapping getStringMapping(KeyInformation information) { - assert AttributeUtil.isString(information.getDataType()); - Mapping map = Mapping.getMapping(information); - if (map==Mapping.DEFAULT) map = Mapping.TEXT; - return map; - } - - private UpdateRequest newUpdateRequest() { - UpdateRequest req = new UpdateRequest(); - req.setAction(UpdateRequest.ACTION.COMMIT, true, true); - if (waitSearcher) { - req.setAction(UpdateRequest.ACTION.COMMIT, true, true); - } - return req; - } - - private BackendException storageException(Exception solrException) { - return new TemporaryBackendException("Unable to complete query on Solr.", solrException); - } - - private static void createCollectionIfNotExists(CloudSolrClient client, Configuration config, String collection) - throws IOException, SolrServerException, KeeperException, InterruptedException { - if (!checkIfCollectionExists(client, collection)) { - Integer numShards = config.get(NUM_SHARDS); - Integer maxShardsPerNode = config.get(MAX_SHARDS_PER_NODE); - Integer replicationFactor = config.get(REPLICATION_FACTOR); - - CollectionAdminRequest.Create createRequest = new CollectionAdminRequest.Create(); - - createRequest.setConfigName(collection); - createRequest.setCollectionName(collection); - createRequest.setNumShards(numShards); - createRequest.setMaxShardsPerNode(maxShardsPerNode); - createRequest.setReplicationFactor(replicationFactor); - - CollectionAdminResponse createResponse = createRequest.process(client); - if (createResponse.isSuccess()) { - logger.trace("Collection {} successfully created.", collection); - } else { - throw new SolrServerException(Joiner.on("\n").join(createResponse.getErrorMessages())); - } - } - - waitForRecoveriesToFinish(client, collection); - } - - /** - * Checks if the collection has already been created in Solr. - */ - private static boolean checkIfCollectionExists(CloudSolrClient server, String collection) throws KeeperException, InterruptedException { - ZkStateReader zkStateReader = server.getZkStateReader(); - zkStateReader.updateClusterState(true); - ClusterState clusterState = zkStateReader.getClusterState(); - return clusterState.getCollectionOrNull(collection) != null; - } - - /** - * Wait for all the collection shards to be ready. - */ - private static void waitForRecoveriesToFinish(CloudSolrClient server, String collection) throws KeeperException, InterruptedException { - ZkStateReader zkStateReader = server.getZkStateReader(); - try { - boolean cont = true; - - while (cont) { - boolean sawLiveRecovering = false; - zkStateReader.updateClusterState(true); - ClusterState clusterState = zkStateReader.getClusterState(); - Map<String, Slice> slices = clusterState.getSlicesMap(collection); - Preconditions.checkNotNull("Could not find collection:" + collection, slices); - - for (Map.Entry<String, Slice> entry : slices.entrySet()) { - Map<String, Replica> shards = entry.getValue().getReplicasMap(); - for (Map.Entry<String, Replica> shard : shards.entrySet()) { - String state = shard.getValue().getStr(ZkStateReader.STATE_PROP); - if ((state.equals(ZkStateReader.RECOVERING) - || state.equals(ZkStateReader.SYNC) || state - .equals(ZkStateReader.DOWN)) - && clusterState.liveNodesContain(shard.getValue().getStr( - ZkStateReader.NODE_NAME_PROP))) { - sawLiveRecovering = true; - } - } - } - if (!sawLiveRecovering) { - cont = false; - } else { - Thread.sleep(1000); - } - } - } finally { - logger.info("Exiting solr wait"); - } - } - - private static class GeoToWktConverter { - /** - * {@link com.thinkaurelius.titan.core.attribute.Geoshape} stores Points in the String format: point[X.0,Y.0]. - * Solr needs it to be in Well-Known Text format: POINT(X.0 Y.0) - */ - static String convertToWktString(Geoshape fieldValue) throws BackendException { - if (fieldValue.getType() == Geoshape.Type.POINT) { - Geoshape.Point point = fieldValue.getPoint(); - return "POINT(" + point.getLongitude() + " " + point.getLatitude() + ")"; - } else { - throw new PermanentBackendException("Cannot index " + fieldValue.getType()); - } - } - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java b/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java index 34f2ba3..b12c9a8 100755 --- a/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java +++ b/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java @@ -27,6 +27,7 @@ import org.aopalliance.intercept.MethodInterceptor; import org.apache.atlas.discovery.DiscoveryService; import org.apache.atlas.discovery.HiveLineageService; import org.apache.atlas.discovery.LineageService; +import org.apache.atlas.discovery.SearchIndexer; import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService; import org.apache.atlas.listener.TypesChangeListener; import org.apache.atlas.repository.MetadataRepository; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/repository/src/test/java/org/apache/atlas/repository/graph/GraphRepoMapperScaleTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/graph/GraphRepoMapperScaleTest.java b/repository/src/test/java/org/apache/atlas/repository/graph/GraphRepoMapperScaleTest.java index 85ab738..11e8219 100755 --- a/repository/src/test/java/org/apache/atlas/repository/graph/GraphRepoMapperScaleTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/graph/GraphRepoMapperScaleTest.java @@ -18,9 +18,11 @@ package org.apache.atlas.repository.graph; +import com.google.inject.Inject; import com.thinkaurelius.titan.core.TitanFactory; import com.thinkaurelius.titan.core.TitanGraph; import com.thinkaurelius.titan.core.TitanIndexQuery; +import com.thinkaurelius.titan.core.util.TitanCleanup; import com.thinkaurelius.titan.diskstorage.BackendException; import com.thinkaurelius.titan.diskstorage.configuration.ReadConfiguration; import com.thinkaurelius.titan.diskstorage.configuration.backend.CommonsConfiguration; @@ -30,6 +32,7 @@ import com.tinkerpop.blueprints.GraphQuery; import com.tinkerpop.blueprints.Predicate; import com.tinkerpop.blueprints.Vertex; import org.apache.atlas.GraphTransaction; +import org.apache.atlas.RepositoryMetadataModule; import org.apache.atlas.TestUtils; import org.apache.atlas.repository.Constants; import org.apache.atlas.typesystem.ITypedReferenceableInstance; @@ -43,6 +46,7 @@ import org.apache.commons.io.FileUtils; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.Guice; import org.testng.annotations.Test; import java.io.File; @@ -53,6 +57,7 @@ import java.util.Date; import java.util.Random; @Test +@Guice(modules = RepositoryMetadataModule.class) public class GraphRepoMapperScaleTest { private static final String DATABASE_NAME = "foo"; @@ -61,50 +66,21 @@ public class GraphRepoMapperScaleTest { private static final String INDEX_DIR = System.getProperty("java.io.tmpdir", "/tmp") + "/atlas-test" + new Random().nextLong(); - private GraphProvider<TitanGraph> graphProvider = new GraphProvider<TitanGraph>() { - - private TitanGraph graph = null; - - //Ensure separate directory for graph provider to avoid issues with index merging - @Override - public TitanGraph get() { - try { - if (graph == null) { - synchronized (GraphRepoMapperScaleTest.class) { - if (graph == null) { - ReadConfiguration config = new CommonsConfiguration() {{ - set("storage.backend", "inmemory"); - set("index.search.directory", INDEX_DIR); - set("index.search.backend", "elasticsearch"); - set("index.search.elasticsearch.local-mode", "true"); - set("index.search.elasticsearch.client-only", "false"); - }}; - GraphDatabaseConfiguration graphconfig = new GraphDatabaseConfiguration(config); - graphconfig.getBackend().clearStorage(); - graph = TitanFactory.open(config); - } - } - } - } catch (BackendException e) { - e.printStackTrace(); - } - return graph; - } - }; + @Inject + GraphProvider<TitanGraph> graphProvider; + @Inject private GraphBackedMetadataRepository repositoryService; private GraphBackedSearchIndexer searchIndexer; private TypeSystem typeSystem = TypeSystem.getInstance(); + private String dbGUID; @BeforeClass @GraphTransaction public void setUp() throws Exception { - //Make sure we can cleanup the index directory - repositoryService = new GraphBackedMetadataRepository(graphProvider); - searchIndexer = new GraphBackedSearchIndexer(graphProvider); Collection<IDataType> typesAdded = TestUtils.createHiveTypes(typeSystem); searchIndexer.onAdd(typesAdded); @@ -112,11 +88,17 @@ public class GraphRepoMapperScaleTest { @AfterClass public void tearDown() throws Exception { - graphProvider.get().shutdown(); + TypeSystem.getInstance().reset(); try { - FileUtils.deleteDirectory(new File(INDEX_DIR)); - } catch (IOException ioe) { - System.err.println("Failed to cleanup index directory"); + //TODO - Fix failure during shutdown while using BDB + graphProvider.get().shutdown(); + } catch (Exception e) { + e.printStackTrace(); + } + try { + TitanCleanup.clear(graphProvider.get()); + } catch (Exception e) { + e.printStackTrace(); } } @@ -142,6 +124,10 @@ public class GraphRepoMapperScaleTest { @Test(dependsOnMethods = "testSubmitEntity") public void testSearchIndex() throws Exception { + + //Elasticsearch requires some time before index is updated + Thread.sleep(5000); + searchWithOutIndex(Constants.GUID_PROPERTY_KEY, dbGUID); searchWithOutIndex(Constants.ENTITY_TYPE_PROPERTY_KEY, "column_type"); searchWithOutIndex(Constants.ENTITY_TYPE_PROPERTY_KEY, TestUtils.TABLE_TYPE); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/titan/pom.xml ---------------------------------------------------------------------- diff --git a/titan/pom.xml b/titan/pom.xml new file mode 100644 index 0000000..d22b607 --- /dev/null +++ b/titan/pom.xml @@ -0,0 +1,105 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>apache-atlas</artifactId> + <groupId>org.apache.atlas</groupId> + <version>0.6-incubating-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + <artifactId>atlas-titan</artifactId> + <description>Apache Atlas Titan Overrides</description> + <name>Apache Atlas Titan</name> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>com.thinkaurelius.titan</groupId> + <artifactId>titan-core</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + </dependency> + + <dependency> + <groupId>com.vividsolutions</groupId> + <artifactId>jts</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.solr</groupId> + <artifactId>solr-core</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.solr</groupId> + <artifactId>solr-solrj</artifactId> + </dependency> + + <dependency> + <groupId>com.thinkaurelius.titan</groupId> + <artifactId>titan-es</artifactId> + </dependency> + + <dependency> + <groupId>com.thinkaurelius.titan</groupId> + <artifactId>titan-berkeleyje</artifactId> + </dependency> + + <dependency> + <groupId>com.thinkaurelius.titan</groupId> + <artifactId>titan-lucene</artifactId> + </dependency> + + <dependency> + <groupId>org.testng</groupId> + <artifactId>testng</artifactId> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + </dependency> + + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>2.4</version> + <configuration> + <excludes> + <exclude>**/log4j.xml</exclude> + </excludes> + </configuration> + </plugin> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/AdminMask.java ---------------------------------------------------------------------- diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/AdminMask.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/AdminMask.java new file mode 100644 index 0000000..e255f1b --- /dev/null +++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/AdminMask.java @@ -0,0 +1,62 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.hbase; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.HBaseAdmin; + +/** + * This interface hides ABI/API breaking changes that HBase has made to its Admin/HBaseAdmin over the course + * of development from 0.94 to 1.0 and beyond. + */ +public interface AdminMask extends Closeable +{ + + void clearTable(String tableName, long timestamp) throws IOException; + + HTableDescriptor getTableDescriptor(String tableName) throws TableNotFoundException, IOException; + + boolean tableExists(String tableName) throws IOException; + + void createTable(HTableDescriptor desc) throws IOException; + + void createTable(HTableDescriptor desc, byte[] startKey, byte[] endKey, int numRegions) throws IOException; + + /** + * Estimate the number of regionservers in the HBase cluster. + * + * This is usually implemented by calling + * {@link HBaseAdmin#getClusterStatus()} and then + * {@link ClusterStatus#getServers()} and finally {@code size()} on the + * returned server list. + * + * @return the number of servers in the cluster or -1 if it could not be determined + */ + int getEstimatedRegionServerCount(); + + void disableTable(String tableName) throws IOException; + + void enableTable(String tableName) throws IOException; + + boolean isTableDisabled(String tableName) throws IOException; + + void addColumn(String tableName, HColumnDescriptor columnDescriptor) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/ConnectionMask.java ---------------------------------------------------------------------- diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/ConnectionMask.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/ConnectionMask.java new file mode 100644 index 0000000..feb578b --- /dev/null +++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/ConnectionMask.java @@ -0,0 +1,30 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.hbase; + +import java.io.Closeable; +import java.io.IOException; + +/** + * This interface hides ABI/API breaking changes that HBase has made to its (H)Connection class over the course + * of development from 0.94 to 1.0 and beyond. + */ +public interface ConnectionMask extends Closeable +{ + + TableMask getTable(String name) throws IOException; + + AdminMask getAdmin() throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin0_98.java ---------------------------------------------------------------------- diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin0_98.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin0_98.java new file mode 100644 index 0000000..0cd4795 --- /dev/null +++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin0_98.java @@ -0,0 +1,152 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.hbase; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.thinkaurelius.titan.util.system.IOUtils; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; + +public class HBaseAdmin0_98 implements AdminMask +{ + + private static final Logger log = LoggerFactory.getLogger(HBaseAdmin0_98.class); + + private final HBaseAdmin adm; + + public HBaseAdmin0_98(HBaseAdmin adm) + { + this.adm = adm; + } + + @Override + public void clearTable(String tableName, long timestamp) throws IOException + { + if (!adm.tableExists(tableName)) { + log.debug("clearStorage() called before table {} was created, skipping.", tableName); + return; + } + + // Unfortunately, linear scanning and deleting tables is faster in HBase < 1 when running integration tests than + // disabling and deleting tables. + HTable table = null; + + try { + table = new HTable(adm.getConfiguration(), tableName); + + Scan scan = new Scan(); + scan.setBatch(100); + scan.setCacheBlocks(false); + scan.setCaching(2000); + scan.setTimeRange(0, Long.MAX_VALUE); + scan.setMaxVersions(1); + + ResultScanner scanner = null; + + try { + scanner = table.getScanner(scan); + + for (Result res : scanner) { + Delete d = new Delete(res.getRow()); + + d.setTimestamp(timestamp); + table.delete(d); + } + } finally { + IOUtils.closeQuietly(scanner); + } + } finally { + IOUtils.closeQuietly(table); + } + } + + @Override + public HTableDescriptor getTableDescriptor(String tableName) throws TableNotFoundException, IOException + { + return adm.getTableDescriptor(tableName.getBytes()); + } + + @Override + public boolean tableExists(String tableName) throws IOException + { + return adm.tableExists(tableName); + } + + @Override + public void createTable(HTableDescriptor desc) throws IOException + { + adm.createTable(desc); + } + + @Override + public void createTable(HTableDescriptor desc, byte[] startKey, byte[] endKey, int numRegions) throws IOException + { + adm.createTable(desc, startKey, endKey, numRegions); + } + + @Override + public int getEstimatedRegionServerCount() + { + int serverCount = -1; + try { + serverCount = adm.getClusterStatus().getServers().size(); + log.debug("Read {} servers from HBase ClusterStatus", serverCount); + } catch (IOException e) { + log.debug("Unable to retrieve HBase cluster status", e); + } + return serverCount; + } + + @Override + public void disableTable(String tableName) throws IOException + { + adm.disableTable(tableName); + } + + @Override + public void enableTable(String tableName) throws IOException + { + adm.enableTable(tableName); + } + + @Override + public boolean isTableDisabled(String tableName) throws IOException + { + return adm.isTableDisabled(tableName); + } + + @Override + public void addColumn(String tableName, HColumnDescriptor columnDescriptor) throws IOException + { + adm.addColumn(tableName, columnDescriptor); + } + + @Override + public void close() throws IOException + { + adm.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin1_0.java ---------------------------------------------------------------------- diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin1_0.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin1_0.java new file mode 100644 index 0000000..7e8f72d --- /dev/null +++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin1_0.java @@ -0,0 +1,135 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.hbase; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotDisabledException; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.HBaseAdmin; + +public class HBaseAdmin1_0 implements AdminMask +{ + + private static final Logger log = LoggerFactory.getLogger(HBaseAdmin1_0.class); + + private final Admin adm; + + public HBaseAdmin1_0(HBaseAdmin adm) + { + this.adm = adm; + } + @Override + public void clearTable(String tableString, long timestamp) throws IOException + { + TableName tableName = TableName.valueOf(tableString); + + if (!adm.tableExists(tableName)) { + log.debug("Attempted to clear table {} before it exists (noop)", tableString); + return; + } + + if (!adm.isTableDisabled(tableName)) + adm.disableTable(tableName); + + if (!adm.isTableDisabled(tableName)) + throw new RuntimeException("Unable to disable table " + tableName); + + // This API call appears to both truncate and reenable the table. + log.info("Truncating table {}", tableName); + adm.truncateTable(tableName, true /* preserve splits */); + + try { + adm.enableTable(tableName); + } catch (TableNotDisabledException e) { + // This triggers seemingly every time in testing with 1.0.2. + log.debug("Table automatically reenabled by truncation: {}", tableName, e); + } + } + + @Override + public HTableDescriptor getTableDescriptor(String tableString) throws TableNotFoundException, IOException + { + return adm.getTableDescriptor(TableName.valueOf(tableString)); + } + + @Override + public boolean tableExists(String tableString) throws IOException + { + return adm.tableExists(TableName.valueOf(tableString)); + } + + @Override + public void createTable(HTableDescriptor desc) throws IOException + { + adm.createTable(desc); + } + + @Override + public void createTable(HTableDescriptor desc, byte[] startKey, byte[] endKey, int numRegions) throws IOException + { + adm.createTable(desc, startKey, endKey, numRegions); + } + + @Override + public int getEstimatedRegionServerCount() + { + int serverCount = -1; + try { + serverCount = adm.getClusterStatus().getServers().size(); + log.debug("Read {} servers from HBase ClusterStatus", serverCount); + } catch (IOException e) { + log.debug("Unable to retrieve HBase cluster status", e); + } + return serverCount; + } + + @Override + public void disableTable(String tableString) throws IOException + { + adm.disableTable(TableName.valueOf(tableString)); + } + + @Override + public void enableTable(String tableString) throws IOException + { + adm.enableTable(TableName.valueOf(tableString)); + } + + @Override + public boolean isTableDisabled(String tableString) throws IOException + { + return adm.isTableDisabled(TableName.valueOf(tableString)); + } + + @Override + public void addColumn(String tableString, HColumnDescriptor columnDescriptor) throws IOException + { + adm.addColumn(TableName.valueOf(tableString), columnDescriptor); + } + + @Override + public void close() throws IOException + { + adm.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat.java ---------------------------------------------------------------------- diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat.java new file mode 100644 index 0000000..c9b03aa --- /dev/null +++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat.java @@ -0,0 +1,60 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.hbase; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.Delete; + +public interface HBaseCompat { + + /** + * Configure the compression scheme {@code algo} on a column family + * descriptor {@code cd}. The {@code algo} parameter is a string value + * corresponding to one of the values of HBase's Compression enum. The + * Compression enum has moved between packages as HBase has evolved, which + * is why this method has a String argument in the signature instead of the + * enum itself. + * + * @param cd + * column family to configure + * @param algo + * compression type to use + */ + public void setCompression(HColumnDescriptor cd, String algo); + + /** + * Create and return a HTableDescriptor instance with the given name. The + * constructors on this method have remained stable over HBase development + * so far, but the old HTableDescriptor(String) constructor & byte[] friends + * are now marked deprecated and may eventually be removed in favor of the + * HTableDescriptor(TableName) constructor. That constructor (and the + * TableName type) only exists in newer HBase versions. Hence this method. + * + * @param tableName + * HBase table name + * @return a new table descriptor instance + */ + public HTableDescriptor newTableDescriptor(String tableName); + + ConnectionMask createConnection(Configuration conf) throws IOException; + + void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc); + + void setTimestamp(Delete d, long timestamp); +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat0_98.java ---------------------------------------------------------------------- diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat0_98.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat0_98.java new file mode 100644 index 0000000..2c0f3b4 --- /dev/null +++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat0_98.java @@ -0,0 +1,58 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.hbase; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.io.compress.Compression; + +public class HBaseCompat0_98 implements HBaseCompat { + + @Override + public void setCompression(HColumnDescriptor cd, String algo) { + cd.setCompressionType(Compression.Algorithm.valueOf(algo)); + } + + @Override + public HTableDescriptor newTableDescriptor(String tableName) { + TableName tn = TableName.valueOf(tableName); + return new HTableDescriptor(tn); + } + + @Override + public ConnectionMask createConnection(Configuration conf) throws IOException + { + return new HConnection0_98(HConnectionManager.createConnection(conf)); + } + + @Override + public void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc) + { + tdesc.addFamily(cdesc); + } + + @Override + public void setTimestamp(Delete d, long timestamp) + { + d.setTimestamp(timestamp); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_0.java ---------------------------------------------------------------------- diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_0.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_0.java new file mode 100644 index 0000000..bb3fb3b --- /dev/null +++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_0.java @@ -0,0 +1,58 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.hbase; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.io.compress.Compression; + +public class HBaseCompat1_0 implements HBaseCompat { + + @Override + public void setCompression(HColumnDescriptor cd, String algo) { + cd.setCompressionType(Compression.Algorithm.valueOf(algo)); + } + + @Override + public HTableDescriptor newTableDescriptor(String tableName) { + TableName tn = TableName.valueOf(tableName); + return new HTableDescriptor(tn); + } + + @Override + public ConnectionMask createConnection(Configuration conf) throws IOException + { + return new HConnection1_0(ConnectionFactory.createConnection(conf)); + } + + @Override + public void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc) + { + tdesc.addFamily(cdesc); + } + + @Override + public void setTimestamp(Delete d, long timestamp) + { + d.setTimestamp(timestamp); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_1.java ---------------------------------------------------------------------- diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_1.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_1.java new file mode 100644 index 0000000..e5c3d31 --- /dev/null +++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_1.java @@ -0,0 +1,58 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.hbase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.io.compress.Compression; + +import java.io.IOException; + +public class HBaseCompat1_1 implements HBaseCompat { + + @Override + public void setCompression(HColumnDescriptor cd, String algo) { + cd.setCompressionType(Compression.Algorithm.valueOf(algo)); + } + + @Override + public HTableDescriptor newTableDescriptor(String tableName) { + TableName tn = TableName.valueOf(tableName); + return new HTableDescriptor(tn); + } + + @Override + public ConnectionMask createConnection(Configuration conf) throws IOException + { + return new HConnection1_0(ConnectionFactory.createConnection(conf)); + } + + @Override + public void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc) + { + tdesc.addFamily(cdesc); + } + + @Override + public void setTimestamp(Delete d, long timestamp) + { + d.setTimestamp(timestamp); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java ---------------------------------------------------------------------- diff --git a/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java new file mode 100644 index 0000000..2c0d6fe --- /dev/null +++ b/titan/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java @@ -0,0 +1,80 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.hbase; + +import java.util.Arrays; + +import org.apache.hadoop.hbase.util.VersionInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HBaseCompatLoader { + + private static final Logger log = LoggerFactory.getLogger(HBaseCompatLoader.class); + + private static final String DEFAULT_HBASE_COMPAT_VERSION = "1.1"; + + private static final String DEFAULT_HBASE_CLASS_NAME = "com.thinkaurelius.titan.diskstorage.hbase.HBaseCompat1_1"; + + private static HBaseCompat cachedCompat; + + public synchronized static HBaseCompat getCompat(String classOverride) { + + if (null != cachedCompat) { + log.debug("Returning cached HBase compatibility layer: {}", cachedCompat); + return cachedCompat; + } + + HBaseCompat compat; + String className = null; + String classNameSource = null; + + if (null != classOverride) { + className = classOverride; + classNameSource = "from explicit configuration"; + } else { + String hbaseVersion = VersionInfo.getVersion(); + for (String supportedVersion : Arrays.asList("0.94", "0.96", "0.98", "1.0", "1.1")) { + if (hbaseVersion.startsWith(supportedVersion + ".")) { + className = "com.thinkaurelius.titan.diskstorage.hbase.HBaseCompat" + supportedVersion.replaceAll("\\.", "_"); + classNameSource = "supporting runtime HBase version " + hbaseVersion; + break; + } + } + if (null == className) { + log.info("The HBase version {} is not explicitly supported by Titan. " + + "Loading Titan's compatibility layer for its most recent supported HBase version ({})", + hbaseVersion, DEFAULT_HBASE_COMPAT_VERSION); + className = DEFAULT_HBASE_CLASS_NAME; + classNameSource = " by default"; + } + } + + final String errTemplate = " when instantiating HBase compatibility class " + className; + + try { + compat = (HBaseCompat)Class.forName(className).newInstance(); + log.info("Instantiated HBase compatibility layer {}: {}", classNameSource, compat.getClass().getCanonicalName()); + } catch (IllegalAccessException e) { + throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e); + } catch (InstantiationException e) { + throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e); + } + + return cachedCompat = compat; + } +}
