http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/indexing/TemporalInstant.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/TemporalInstant.java b/extras/indexing/src/main/java/mvm/rya/indexing/TemporalInstant.java deleted file mode 100644 index f4e6d95..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/TemporalInstant.java +++ /dev/null @@ -1,83 +0,0 @@ -package mvm.rya.indexing; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import java.io.Serializable; - -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; - -/** - * Time and date interface for building intervals. - * - *Implementations: - * Implementation should have a factory method for TemporalInterval since TemporalIntervals reference only this - * interface for begin & end, so it injects an implementation. - * public static TemporalInterval parseInterval(String dateTimeInterval) - * - * The following are notes and may not have been implemented. - * - * = rfc3339 - *https://www.ietf.org/rfc/rfc3339.txt - * a subset of ISO-8601 - * YYYY-MM-DDThh:mm:ss.fffZ - * Limits: - *All dates and times are assumed to be in the "current era", - somewhere between 0000AD and 9999AD. - * resolution: to the second, or millisecond if the optional fraction is used. - * - * = epoch - * 32bit or 64bit integer specifying the number of seconds since a standard date-time (1970) - * 32bit is good until 2038. - * 64bit is good until after the heat death of our universe - * - */ -public interface TemporalInstant extends Comparable<TemporalInstant>, Serializable { - @Override - public boolean equals(Object obj) ; - - @Override - public int compareTo(TemporalInstant o) ; - - @Override - public int hashCode() ; - /** - * Get the date as a byte array. - */ - public byte[] getAsKeyBytes(); - /** - * Get the date as a String. - */ - public String getAsKeyString(); - /** - * Get the date as a human readable for reporting with timeZone. - */ - public String getAsReadable(DateTimeZone tz); - /** - * Get the date as a human readable for reporting, timeZone is implementation specific. - */ - public String getAsReadable(); - /** - * Get the date as a Joda/Java v8 DateTime. - */ - public DateTime getAsDateTime(); - -}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/indexing/TemporalInstantRfc3339.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/TemporalInstantRfc3339.java b/extras/indexing/src/main/java/mvm/rya/indexing/TemporalInstantRfc3339.java deleted file mode 100644 index f47bb92..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/TemporalInstantRfc3339.java +++ /dev/null @@ -1,219 +0,0 @@ -/** - * - */ -package mvm.rya.indexing; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.commons.codec.binary.StringUtils; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; -import org.joda.time.format.DateTimeFormatter; -import org.joda.time.format.ISODateTimeFormat; - -/** - * Immutable date and time instance returning a human readable key. - * Preserves the Time zone, but not stored in the key. - * Converts fields (hours, etc) correctly for tz=Zulu when stored, - * so the original timezone is not preserved when retrieved. - * - * Uses rfc 3339, which looks like: YYYY-MM-DDThh:mm:ssZ a subset - * of ISO-8601 : https://www.ietf.org/rfc/rfc3339.txt - * - * Limits: All dates and times are assumed to be in the "current era", no BC, - * somewhere between 0000AD and 9999AD. - * - * Resolution: to the second, or millisecond if the optional fraction is used. - * - * This is really a wrapper for Joda DateTime. if you need functionality from - * that wonderful class, simply use t.getAsDateTime(). - * - */ -public class TemporalInstantRfc3339 implements TemporalInstant { - - private static final long serialVersionUID = -7790000399142290309L; - - private final DateTime dateTime; - /** - * Format key like this: YYYY-MM-DDThh:mm:ssZ - */ - public final static DateTimeFormatter FORMATTER = ISODateTimeFormat.dateTimeNoMillis(); - - public static final Pattern PATTERN = Pattern.compile("\\[(.*)\\,(.*)\\].*"); - - /** - * New date assumed UTC time zone. - * - * @param year - * @param month - * @param day - * @param hour - * @param minute - * @param second - */ - public TemporalInstantRfc3339(final int year, final int month, final int day, final int hour, final int minute, final int second) { - dateTime = new DateTime(year, month, day, hour, minute, second, DateTimeZone.UTC); - } - - /** - * Construct with a Joda/java v8 DateTime; - * TZ is preserved, but not in the key. - * - * @param dateTime - * initialize with this date time. Converted to zulu time zone for key generation. - * @return - */ - public TemporalInstantRfc3339(final DateTime datetime) { - dateTime = datetime; - } - /** - * Get an interval setting beginning and end with this implementation of {@link TemporalInstant}. - * beginning must be less than end. - * - * @param dateTimeInterval String in the form [dateTime1,dateTime2] - */ - public static TemporalInterval parseInterval(final String dateTimeInterval) { - - final Matcher matcher = PATTERN.matcher(dateTimeInterval); - if (matcher.find()) { - // Got a date time pair, parse into an interval. - return new TemporalInterval( - new TemporalInstantRfc3339(new DateTime(matcher.group(1))), - new TemporalInstantRfc3339(new DateTime(matcher.group(2)))); - } - throw new IllegalArgumentException("Can't parse interval, expecting '[ISO8601dateTime1,ISO8601dateTime2]', actual: "+dateTimeInterval); - } - - /** - * if this is older returns -1, equal 0, else 1 - * - */ - @Override - public int compareTo(final TemporalInstant that) { - return getAsKeyString().compareTo(that.getAsKeyString()); - } - - @Override - public byte[] getAsKeyBytes() { - return StringUtils.getBytesUtf8(getAsKeyString()); - } - - @Override - public String getAsKeyString() { - return dateTime.withZone(DateTimeZone.UTC).toString(FORMATTER); - } - - /** - * Readable string, formated local time at {@link DateTimeZone}. - * If the timezone is UTC (Z), it was probably a key from the database. - * If the server and client are in different Time zone, should probably use the client timezone. - * - * Time at specified time zone: - * instant.getAsReadable(DateTimeZone.forID("-05:00"))); - * instant.getAsReadable(DateTimeZone.getDefault())); - * - * Use original time zone set in the constructor: - * instant.getAsDateTime().toString(TemporalInstantRfc3339.FORMATTER)); - * - */ - @Override - public String getAsReadable(final DateTimeZone dateTimeZone) { - return dateTime.withZone(dateTimeZone).toString(FORMATTER); - } - - /** - * Use original time zone set in the constructor, or UTC if from parsing the key. - */ - @Override - public String getAsReadable() { - return dateTime.toString(FORMATTER); - } - - /** - * default toString, same as getAsReadable(). - */ - @Override - public String toString() { - return getAsReadable(); - } - - /** - * Show readable time converted to the default timezone. - */ - @Override - public DateTime getAsDateTime() { - return dateTime; - } - - /** - * Minimum Date, used for infinitely past. - */ - private static final TemporalInstant MINIMUM = new TemporalInstantRfc3339(new DateTime(Long.MIN_VALUE)); - /** - * maximum date/time is used for infinitely in the future. - */ - private static final TemporalInstant MAXIMUM = new TemporalInstantRfc3339(new DateTime(Long.MAX_VALUE)); - - /** - * infinite past date. - * @return an instant that will compare as NEWER than anything but itself. - */ - public static TemporalInstant getMinimumInstance() { - return MINIMUM; - } - /** - * infinite future date. - * @return an instant that will compare as OLDER than anything but itself - */ - - public static TemporalInstant getMaximumInstance() { - return MAXIMUM; - } - - /* (non-Javadoc) - * @see java.lang.Object#hashCode() - */ - @Override - public int hashCode() { - return getAsKeyString().hashCode(); - } - - /* (non-Javadoc) - * @see java.lang.Object#equals(java.lang.Object) - */ - @Override - public boolean equals(final Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - final TemporalInstantRfc3339 other = (TemporalInstantRfc3339) obj; - return (getAsKeyString().equals(other.getAsKeyString())); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/indexing/TemporalInterval.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/TemporalInterval.java b/extras/indexing/src/main/java/mvm/rya/indexing/TemporalInterval.java deleted file mode 100644 index b23b99c..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/TemporalInterval.java +++ /dev/null @@ -1,181 +0,0 @@ -package mvm.rya.indexing; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import java.io.UnsupportedEncodingException; - -/** - * A time with beginning and end date and time, which could be indefinitely in - * the past or future. Immutable, so it's thread safe. For use in reading and - * writing from Rya's temporal indexing scheme. - * - */ -public class TemporalInterval implements Comparable<TemporalInterval> { - - // the beginning and end. Read-only because they are final references to immutable objects. - private final TemporalInstant hasBeginning; - private final TemporalInstant hasEnd; - - /** - * Separate the beginning and end with this. - * Used because Joda time library's interval uses this. - * TODO: Move this down to the TemporalInterval implementation. - * TODO: Then add a TemporalInterval.keyConcatenate(). - */ - public static final String DELIMITER = "/"; - -// /** -// * Empty constructor -- not allowed, no defaults. -// * For an infinite span of time: do it like this: -// * new TemporalInterval(TemporalInstantImpl.getMinimum, TemporalInstantImpl.getMaximum) -// */ -// public TemporalInterval() { -// hasBeginning = null; -// hasEnd = null; -// } - - /** - * Constructor setting beginning and end with an implementation of {@link TemporalInstant}. - * beginning must be less than end. - * - * @param hasBeginning - * @param hasEnd - */ - public TemporalInterval(TemporalInstant hasBeginning, TemporalInstant hasEnd) { - super(); - if (hasBeginning != null && hasEnd != null && 0 < hasBeginning.compareTo(hasEnd)) - throw new IllegalArgumentException("The Beginning instance must not compare greater than the end."); - this.hasBeginning = hasBeginning; - this.hasEnd = hasEnd; - } - - /** - * @return the hasBeginning - */ - public TemporalInstant getHasBeginning() { - return hasBeginning; - } - - /** - * @return the hasEnd - */ - public TemporalInstant getHasEnd() { - return hasEnd; - } - - /** - * True if CompareTo() says equal (0) - */ - @Override - public boolean equals(Object other) { - return other instanceof TemporalInterval - && this.compareTo((TemporalInterval) other) == 0; - }; - - /** - * Compare beginnings, if the same then compare ends, or equal if beginnings equal and endings equal. - * Nulls represent infinity. - */ - @Override - public int compareTo(TemporalInterval other) { - int compBegins = this.hasBeginning.compareTo(other.hasBeginning); - if (0 == compBegins) - return this.hasEnd.compareTo(other.hasEnd); - else - return compBegins; - - } - - /** - * Hashcode for - */ - @Override - public int hashCode() { - if (hasBeginning == null) - if (hasEnd == null) - return 0; - else - return hasEnd.hashCode(); - else - return hashboth(this.hasBeginning.hashCode(), - this.hasEnd.hashCode()); - } - - /** - * Hashcode combining two string hashcodes. - */ - protected static int hashboth(int i1, int i2) { - // return (int) (( 1L * i1 * i2) ; % (1L + Integer.MAX_VALUE)); - // let the overflow happen. It won't throw an error. - return (i1 + i2); - } - - /** - * Get the key use for rowid for the beginning of the interval. Use ascii - * for conversion to catch and prevent multi-byte chars. - * - * @return - */ - public byte[] getAsKeyBeginning() { - try { - return (hasBeginning.getAsKeyString() + DELIMITER + hasEnd - .getAsKeyString()).getBytes("US-ASCII"); - } catch (UnsupportedEncodingException e) { - // this is a code error, the strings are mostly numbers. - throw new Error("while converting key string to ascii bytes", e); - } - } - - /** - * get the key used for indexing the end of the interval. Use ascii for - * conversion to catch and prevent multi-byte chars. - * - * @return - */ - public byte[] getAsKeyEnd() { - try { - return (hasEnd.getAsKeyString() + DELIMITER + hasBeginning - .getAsKeyString()).getBytes("US-ASCII"); - } catch (UnsupportedEncodingException e) { - // this is a code error, the strings are mostly numbers and ascii - // symbols. - throw new Error("while converting key string to ascii bytes", e); - } - } - - /** - * Format as a "period" in this paper. This is not a standard, really. - * http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.298.8948&rep=rep1&type=pdf - * also consider using the typed literal syntax: - * "[2010-01-01,2010-01-31]"^^xs:period - * @return [begindate,enddate] for example: [2010-01-01,2010-01-31] - * - */ - public String getAsPair() { - return "["+hasBeginning.getAsReadable() + "," + hasEnd.getAsReadable() + "]"; - } - - @Override - public String toString() { - return getAsPair() ; - // return hasBeginning.getAsReadable() + DELIMITER + hasEnd.getAsReadable(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/indexing/TemporalTupleSet.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/TemporalTupleSet.java b/extras/indexing/src/main/java/mvm/rya/indexing/TemporalTupleSet.java deleted file mode 100644 index 1677fed..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/TemporalTupleSet.java +++ /dev/null @@ -1,287 +0,0 @@ -package mvm.rya.indexing; - -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.conf.Configuration; -import org.joda.time.DateTime; -import org.openrdf.model.Statement; -import org.openrdf.model.URI; -import org.openrdf.model.impl.URIImpl; -import org.openrdf.query.BindingSet; -import org.openrdf.query.QueryEvaluationException; -import org.openrdf.query.algebra.QueryModelVisitor; - -import com.google.common.base.Joiner; -import com.google.common.collect.Maps; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import info.aduna.iteration.CloseableIteration; -import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; - -//Indexing Node for temporal expressions to be inserted into execution plan -//to delegate temporal portion of query to temporal index -public class TemporalTupleSet extends ExternalTupleSet { - - private final Configuration conf; - private final TemporalIndexer temporalIndexer; - private final IndexingExpr filterInfo; - - public TemporalTupleSet(final IndexingExpr filterInfo, final TemporalIndexer temporalIndexer) { - this.filterInfo = filterInfo; - this.temporalIndexer = temporalIndexer; - conf = temporalIndexer.getConf(); - } - - /** - * {@inheritDoc} - */ - @Override - public Set<String> getBindingNames() { - return filterInfo.getBindingNames(); - } - - /** - * {@inheritDoc} - * <p> - * Note that we need a deep copy for everything that (during optimizations) - * can be altered via {@link #visitChildren(QueryModelVisitor)} - */ - @Override - public TemporalTupleSet clone() { - return new TemporalTupleSet(filterInfo, temporalIndexer); - } - - @Override - public double cardinality() { - return 0.0; // No idea how the estimate cardinality here. - } - - @Override - public String getSignature() { - - return "(TemporalTuple Projection) " + "variables: " + Joiner.on(", ").join(getBindingNames()).replaceAll("\\s+", " "); - } - - @Override - public boolean equals(final Object other) { - if (other == this) { - return true; - } - if (!(other instanceof TemporalTupleSet)) { - return false; - } - final TemporalTupleSet arg = (TemporalTupleSet) other; - return filterInfo.equals(arg.filterInfo); - } - - @Override - public int hashCode() { - int result = 17; - result = 31*result + filterInfo.hashCode(); - - return result; - } - - /** - * Returns an iterator over the result set associated with contained IndexingExpr. - * <p> - * Should be thread-safe (concurrent invocation {@link OfflineIterable} this - * method can be expected with some query evaluators. - */ - @Override - public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(final BindingSet bindings) - throws QueryEvaluationException { - final URI funcURI = filterInfo.getFunction(); - final SearchFunction searchFunction = (new TemporalSearchFunctionFactory(conf)).getSearchFunction(funcURI); - - if(filterInfo.getArguments().length > 1) { - throw new IllegalArgumentException("Index functions do not support more than two arguments."); - } - - final String queryText = filterInfo.getArguments()[0].stringValue(); - return IteratorFactory.getIterator(filterInfo.getSpConstraint(), bindings, queryText, searchFunction); - } - - //returns appropriate search function for a given URI - //search functions used by TemporalIndexer to query Temporal Index - private class TemporalSearchFunctionFactory { - private final Map<URI, SearchFunction> SEARCH_FUNCTION_MAP = Maps.newHashMap(); - Configuration conf; - - public TemporalSearchFunctionFactory(final Configuration conf) { - this.conf = conf; - } - - /** - * Get a {@link TemporalSearchFunction} for a give URI. - * - * @param searchFunction - * @return - */ - public SearchFunction getSearchFunction(final URI searchFunction) { - SearchFunction geoFunc = null; - try { - geoFunc = getSearchFunctionInternal(searchFunction); - } catch (final QueryEvaluationException e) { - e.printStackTrace(); - } - - return geoFunc; - } - - private SearchFunction getSearchFunctionInternal(final URI searchFunction) throws QueryEvaluationException { - final SearchFunction sf = SEARCH_FUNCTION_MAP.get(searchFunction); - - if (sf != null) { - return sf; - } else { - throw new QueryEvaluationException("Unknown Search Function: " + searchFunction.stringValue()); - } - } - - private final SearchFunction TEMPORAL_InstantAfterInstant = new SearchFunction() { - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String searchTerms, - final StatementConstraints contraints) throws QueryEvaluationException { - final TemporalInstant queryInstant = new TemporalInstantRfc3339(DateTime.parse(searchTerms)); - return temporalIndexer.queryInstantAfterInstant(queryInstant, contraints); - } - - @Override - public String toString() { - return "TEMPORAL_InstantAfterInstant"; - }; - }; - private final SearchFunction TEMPORAL_InstantBeforeInstant = new SearchFunction() { - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String searchTerms, - final StatementConstraints contraints) throws QueryEvaluationException { - final TemporalInstant queryInstant = new TemporalInstantRfc3339(DateTime.parse(searchTerms)); - return temporalIndexer.queryInstantBeforeInstant(queryInstant, contraints); - } - - @Override - public String toString() { - return "TEMPORAL_InstantBeforeInstant"; - }; - }; - - private final SearchFunction TEMPORAL_InstantEqualsInstant = new SearchFunction() { - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String searchTerms, - final StatementConstraints contraints) throws QueryEvaluationException { - final TemporalInstant queryInstant = new TemporalInstantRfc3339(DateTime.parse(searchTerms)); - return temporalIndexer.queryInstantEqualsInstant(queryInstant, contraints); - } - - @Override - public String toString() { - return "TEMPORAL_InstantEqualsInstant"; - }; - }; - - private final SearchFunction TEMPORAL_InstantAfterInterval = new SearchFunction() { - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String searchTerms, - final StatementConstraints contraints) throws QueryEvaluationException { - final TemporalInterval queryInterval = TemporalInstantRfc3339.parseInterval(searchTerms); - return temporalIndexer.queryInstantAfterInterval(queryInterval, contraints); - } - - @Override - public String toString() { - return "TEMPORAL_InstantAfterInterval"; - }; - }; - - private final SearchFunction TEMPORAL_InstantBeforeInterval = new SearchFunction() { - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String searchTerms, - final StatementConstraints contraints) throws QueryEvaluationException { - final TemporalInterval queryInterval = TemporalInstantRfc3339.parseInterval(searchTerms); - return temporalIndexer.queryInstantBeforeInterval(queryInterval, contraints); - } - - @Override - public String toString() { - return "TEMPORAL_InstantBeforeInterval"; - }; - }; - - private final SearchFunction TEMPORAL_InstantInsideInterval = new SearchFunction() { - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String searchTerms, - final StatementConstraints contraints) throws QueryEvaluationException { - final TemporalInterval queryInterval = TemporalInstantRfc3339.parseInterval(searchTerms); - return temporalIndexer.queryInstantInsideInterval(queryInterval, contraints); - } - - @Override - public String toString() { - return "TEMPORAL_InstantInsideInterval"; - }; - }; - - private final SearchFunction TEMPORAL_InstantHasBeginningInterval = new SearchFunction() { - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String searchTerms, - final StatementConstraints contraints) throws QueryEvaluationException { - final TemporalInterval queryInterval = TemporalInstantRfc3339.parseInterval(searchTerms); - return temporalIndexer.queryInstantHasBeginningInterval(queryInterval, contraints); - } - - @Override - public String toString() { - return "TEMPORAL_InstantHasBeginningInterval"; - }; - }; - - private final SearchFunction TEMPORAL_InstantHasEndInterval = new SearchFunction() { - @Override - public CloseableIteration<Statement, QueryEvaluationException> performSearch(final String searchTerms, - final StatementConstraints contraints) throws QueryEvaluationException { - final TemporalInterval queryInterval = TemporalInstantRfc3339.parseInterval(searchTerms); - return temporalIndexer.queryInstantHasEndInterval(queryInterval, contraints); - } - - @Override - public String toString() { - return "TEMPORAL_InstantHasEndInterval"; - }; - }; - - { - final String TEMPORAL_NS = "tag:rya-rdf.org,2015:temporal#"; - - SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"after"), TEMPORAL_InstantAfterInstant); - SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"before"), TEMPORAL_InstantBeforeInstant); - SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"equals"), TEMPORAL_InstantEqualsInstant); - - SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"beforeInterval"), TEMPORAL_InstantBeforeInterval); - SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"afterInterval"), TEMPORAL_InstantAfterInterval); - SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"insideInterval"), TEMPORAL_InstantInsideInterval); - SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"hasBeginningInterval"), - TEMPORAL_InstantHasBeginningInterval); - SEARCH_FUNCTION_MAP.put(new URIImpl(TEMPORAL_NS+"hasEndInterval"), TEMPORAL_InstantHasEndInterval); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java deleted file mode 100644 index 7c608de..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/ConfigUtils.java +++ /dev/null @@ -1,418 +0,0 @@ -package mvm.rya.indexing.accumulo; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import static java.util.Objects.requireNonNull; - -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.BatchScanner; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.MultiTableBatchWriter; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableExistsException; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.ZooKeeperInstance; -import org.apache.accumulo.core.client.admin.TableOperations; -import org.apache.accumulo.core.client.mock.MockInstance; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.log4j.Logger; -import org.openrdf.model.URI; -import org.openrdf.model.impl.URIImpl; - -import com.google.common.base.Optional; -import com.google.common.collect.Lists; - -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.api.instance.RyaDetails; -import mvm.rya.indexing.FilterFunctionOptimizer; -import mvm.rya.indexing.accumulo.entity.EntityCentricIndex; -import mvm.rya.indexing.accumulo.entity.EntityOptimizer; -import mvm.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer; -import mvm.rya.indexing.accumulo.freetext.LuceneTokenizer; -import mvm.rya.indexing.accumulo.freetext.Tokenizer; -import mvm.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer; -import mvm.rya.indexing.external.PrecomputedJoinIndexer; -import mvm.rya.indexing.mongodb.freetext.MongoFreeTextIndexer; -import mvm.rya.indexing.pcj.matching.PCJOptimizer; - -/** - * A set of configuration utils to read a Hadoop {@link Configuration} object and create Cloudbase/Accumulo objects. - * Soon will deprecate this class. Use installer for the set methods, use {@link RyaDetails} for the get methods. - * New code must separate parameters that are set at Rya install time from that which is specific to the client. - * Also Accumulo index tables are pushed down to the implementation and not configured in conf. - */ -public class ConfigUtils { - private static final Logger logger = Logger.getLogger(ConfigUtils.class); - - public static final String CLOUDBASE_TBL_PREFIX = "sc.cloudbase.tableprefix"; - public static final String CLOUDBASE_AUTHS = "sc.cloudbase.authorizations"; - public static final String CLOUDBASE_INSTANCE = "sc.cloudbase.instancename"; - public static final String CLOUDBASE_ZOOKEEPERS = "sc.cloudbase.zookeepers"; - public static final String CLOUDBASE_USER = "sc.cloudbase.username"; - public static final String CLOUDBASE_PASSWORD = "sc.cloudbase.password"; - - public static final String CLOUDBASE_WRITER_MAX_WRITE_THREADS = "sc.cloudbase.writer.maxwritethreads"; - public static final String CLOUDBASE_WRITER_MAX_LATENCY = "sc.cloudbase.writer.maxlatency"; - public static final String CLOUDBASE_WRITER_MAX_MEMORY = "sc.cloudbase.writer.maxmemory"; - - public static final String FREE_TEXT_QUERY_TERM_LIMIT = "sc.freetext.querytermlimit"; - - public static final String USE_FREETEXT = "sc.use_freetext"; - public static final String USE_TEMPORAL = "sc.use_temporal"; - public static final String USE_ENTITY = "sc.use_entity"; - public static final String USE_PCJ = "sc.use_pcj"; - public static final String USE_OPTIMAL_PCJ = "sc.use.optimal.pcj"; - public static final String USE_PCJ_UPDATER_INDEX = "sc.use.updater"; - - public static final String FLUO_APP_NAME = "rya.indexing.pcj.fluo.fluoAppName"; - public static final String USE_PCJ_FLUO_UPDATER = "rya.indexing.pcj.updater.fluo"; - public static final String PCJ_STORAGE_TYPE = "rya.indexing.pcj.storageType"; - public static final String PCJ_UPDATER_TYPE = "rya.indexing.pcj.updaterType"; - - - public static final String USE_INDEXING_SAIL = "sc.use.indexing.sail"; - public static final String USE_EXTERNAL_SAIL = "sc.use.external.sail"; - - public static final String USE_MOCK_INSTANCE = ".useMockInstance"; - - public static final String NUM_PARTITIONS = "sc.cloudbase.numPartitions"; - - private static final int WRITER_MAX_WRITE_THREADS = 1; - private static final long WRITER_MAX_LATNECY = Long.MAX_VALUE; - private static final long WRITER_MAX_MEMORY = 10000L; - - public static final String DISPLAY_QUERY_PLAN = "query.printqueryplan"; - - public static final String FREETEXT_PREDICATES_LIST = "sc.freetext.predicates"; - public static final String FREETEXT_DOC_NUM_PARTITIONS = "sc.freetext.numPartitions.text"; - public static final String FREETEXT_TERM_NUM_PARTITIONS = "sc.freetext.numPartitions.term"; - - public static final String TOKENIZER_CLASS = "sc.freetext.tokenizer.class"; - - public static final String GEO_PREDICATES_LIST = "sc.geo.predicates"; - - public static final String TEMPORAL_PREDICATES_LIST = "sc.temporal.predicates"; - - public static final String USE_MONGO = "sc.useMongo"; - - public static boolean isDisplayQueryPlan(final Configuration conf){ - return conf.getBoolean(DISPLAY_QUERY_PLAN, false); - } - - /** - * get a value from the configuration file and throw an exception if the value does not exist. - * - * @param conf - * @param key - * @return - */ - private static String getStringCheckSet(final Configuration conf, final String key) { - final String value = conf.get(key); - requireNonNull(value, key + " not set"); - return value; - } - - /** - * @param conf - * @param tablename - * @return if the table was created - * @throws AccumuloException - * @throws AccumuloSecurityException - * @throws TableExistsException - */ - public static boolean createTableIfNotExists(final Configuration conf, final String tablename) throws AccumuloException, AccumuloSecurityException, - TableExistsException { - final TableOperations tops = getConnector(conf).tableOperations(); - if (!tops.exists(tablename)) { - logger.info("Creating table: " + tablename); - tops.create(tablename); - return true; - } - return false; - } - - /** - * Lookup the table name prefix in the conf and throw an error if it is null. - * Future, get table prefix from RyaDetails -- the Rya instance name - * -- also getting info from the RyaDetails should happen within RyaSailFactory and not ConfigUtils. - * @param conf Rya configuration map where it extracts the prefix (instance name) - * @return index table prefix corresponding to this Rya instance - */ - public static String getTablePrefix(final Configuration conf) { - final String tablePrefix; - tablePrefix = conf.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX); - requireNonNull(tablePrefix, "Configuration key: " + RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX - + " not set. Cannot generate table name."); - return tablePrefix; - } - - public static int getFreeTextTermLimit(final Configuration conf) { - return conf.getInt(FREE_TEXT_QUERY_TERM_LIMIT, 100); - } - - public static Set<URI> getFreeTextPredicates(final Configuration conf) { - return getPredicates(conf, FREETEXT_PREDICATES_LIST); - } - - public static Set<URI> getGeoPredicates(final Configuration conf) { - return getPredicates(conf, GEO_PREDICATES_LIST); - } - /** - * Used for indexing statements about date & time instances and intervals. - * @param conf - * @return Set of predicate URI's whose objects should be date time literals. - */ - public static Set<URI> getTemporalPredicates(final Configuration conf) { - return getPredicates(conf, TEMPORAL_PREDICATES_LIST); - } - - protected static Set<URI> getPredicates(final Configuration conf, final String confName) { - final String[] validPredicateStrings = conf.getStrings(confName, new String[] {}); - final Set<URI> predicates = new HashSet<URI>(); - for (final String prediateString : validPredicateStrings) { - predicates.add(new URIImpl(prediateString)); - } - return predicates; - } - - public static Tokenizer getFreeTextTokenizer(final Configuration conf) { - final Class<? extends Tokenizer> c = conf.getClass(TOKENIZER_CLASS, LuceneTokenizer.class, Tokenizer.class); - return ReflectionUtils.newInstance(c, conf); - } - - public static BatchWriter createDefaultBatchWriter(final String tablename, final Configuration conf) throws TableNotFoundException, - AccumuloException, AccumuloSecurityException { - final Long DEFAULT_MAX_MEMORY = getWriterMaxMemory(conf); - final Long DEFAULT_MAX_LATENCY = getWriterMaxLatency(conf); - final Integer DEFAULT_MAX_WRITE_THREADS = getWriterMaxWriteThreads(conf); - final Connector connector = ConfigUtils.getConnector(conf); - return connector.createBatchWriter(tablename, DEFAULT_MAX_MEMORY, DEFAULT_MAX_LATENCY, DEFAULT_MAX_WRITE_THREADS); - } - - public static MultiTableBatchWriter createMultitableBatchWriter(final Configuration conf) throws AccumuloException, AccumuloSecurityException { - final Long DEFAULT_MAX_MEMORY = getWriterMaxMemory(conf); - final Long DEFAULT_MAX_LATENCY = getWriterMaxLatency(conf); - final Integer DEFAULT_MAX_WRITE_THREADS = getWriterMaxWriteThreads(conf); - final Connector connector = ConfigUtils.getConnector(conf); - return connector.createMultiTableBatchWriter(DEFAULT_MAX_MEMORY, DEFAULT_MAX_LATENCY, DEFAULT_MAX_WRITE_THREADS); - } - - public static Scanner createScanner(final String tablename, final Configuration conf) throws AccumuloException, AccumuloSecurityException, - TableNotFoundException { - final Connector connector = ConfigUtils.getConnector(conf); - final Authorizations auths = ConfigUtils.getAuthorizations(conf); - return connector.createScanner(tablename, auths); - - } - - public static BatchScanner createBatchScanner(final String tablename, final Configuration conf) throws AccumuloException, AccumuloSecurityException, - TableNotFoundException { - final Connector connector = ConfigUtils.getConnector(conf); - final Authorizations auths = ConfigUtils.getAuthorizations(conf); - Integer numThreads = null; - if (conf instanceof RdfCloudTripleStoreConfiguration) { - numThreads = ((RdfCloudTripleStoreConfiguration) conf).getNumThreads(); - } else { - numThreads = conf.getInt(RdfCloudTripleStoreConfiguration.CONF_NUM_THREADS, 2); - } - return connector.createBatchScanner(tablename, auths, numThreads); - } - - public static int getWriterMaxWriteThreads(final Configuration conf) { - return conf.getInt(CLOUDBASE_WRITER_MAX_WRITE_THREADS, WRITER_MAX_WRITE_THREADS); - } - - public static long getWriterMaxLatency(final Configuration conf) { - return conf.getLong(CLOUDBASE_WRITER_MAX_LATENCY, WRITER_MAX_LATNECY); - } - - public static long getWriterMaxMemory(final Configuration conf) { - return conf.getLong(CLOUDBASE_WRITER_MAX_MEMORY, WRITER_MAX_MEMORY); - } - - public static String getUsername(final JobContext job) { - return getUsername(job.getConfiguration()); - } - - public static String getUsername(final Configuration conf) { - return conf.get(CLOUDBASE_USER); - } - - public static Authorizations getAuthorizations(final JobContext job) { - return getAuthorizations(job.getConfiguration()); - } - - public static Authorizations getAuthorizations(final Configuration conf) { - final String authString = conf.get(CLOUDBASE_AUTHS, ""); - if (authString.isEmpty()) { - return new Authorizations(); - } - return new Authorizations(authString.split(",")); - } - - public static Instance getInstance(final JobContext job) { - return getInstance(job.getConfiguration()); - } - - public static Instance getInstance(final Configuration conf) { - if (useMockInstance(conf)) { - return new MockInstance(conf.get(CLOUDBASE_INSTANCE)); - } - return new ZooKeeperInstance(conf.get(CLOUDBASE_INSTANCE), conf.get(CLOUDBASE_ZOOKEEPERS)); - } - - public static String getPassword(final JobContext job) { - return getPassword(job.getConfiguration()); - } - - public static String getPassword(final Configuration conf) { - return conf.get(CLOUDBASE_PASSWORD, ""); - } - - public static Connector getConnector(final JobContext job) throws AccumuloException, AccumuloSecurityException { - return getConnector(job.getConfiguration()); - } - - public static Connector getConnector(final Configuration conf) throws AccumuloException, AccumuloSecurityException { - final Instance instance = ConfigUtils.getInstance(conf); - - return instance.getConnector(getUsername(conf), getPassword(conf)); - } - - public static boolean useMockInstance(final Configuration conf) { - return conf.getBoolean(USE_MOCK_INSTANCE, false); - } - - protected static int getNumPartitions(final Configuration conf) { - return conf.getInt(NUM_PARTITIONS, 25); - } - - public static int getFreeTextDocNumPartitions(final Configuration conf) { - return conf.getInt(FREETEXT_DOC_NUM_PARTITIONS, getNumPartitions(conf)); - } - - public static int getFreeTextTermNumPartitions(final Configuration conf) { - return conf.getInt(FREETEXT_TERM_NUM_PARTITIONS, getNumPartitions(conf)); - } - - public static boolean getUseFreeText(final Configuration conf) { - return conf.getBoolean(USE_FREETEXT, false); - } - - public static boolean getUseTemporal(final Configuration conf) { - return conf.getBoolean(USE_TEMPORAL, false); - } - - public static boolean getUseEntity(final Configuration conf) { - return conf.getBoolean(USE_ENTITY, false); - } - - public static boolean getUsePCJ(final Configuration conf) { - return conf.getBoolean(USE_PCJ, false); - } - - public static boolean getUseOptimalPCJ(final Configuration conf) { - return conf.getBoolean(USE_OPTIMAL_PCJ, false); - } - - public static boolean getUsePcjUpdaterIndex(final Configuration conf) { - return conf.getBoolean(USE_PCJ_UPDATER_INDEX, false); - } - - - /** - * @return The name of the Fluo Application this instance of RYA is - * using to incrementally update PCJs. - */ - //TODO delete this eventually and use Details table - public Optional<String> getFluoAppName(Configuration conf) { - return Optional.fromNullable(conf.get(FLUO_APP_NAME)); - } - - - public static boolean getUseMongo(final Configuration conf) { - return conf.getBoolean(USE_MONGO, false); - } - - - public static void setIndexers(final RdfCloudTripleStoreConfiguration conf) { - - final List<String> indexList = Lists.newArrayList(); - final List<String> optimizers = Lists.newArrayList(); - - boolean useFilterIndex = false; - - if (ConfigUtils.getUseMongo(conf)) { - if (getUseFreeText(conf)) { - indexList.add(MongoFreeTextIndexer.class.getName()); - useFilterIndex = true; - } - } else { - - if (getUsePCJ(conf) || getUseOptimalPCJ(conf)) { - conf.setPcjOptimizer(PCJOptimizer.class); - } - - if(getUsePcjUpdaterIndex(conf)) { - indexList.add(PrecomputedJoinIndexer.class.getName()); - } - - - if (getUseFreeText(conf)) { - indexList.add(AccumuloFreeTextIndexer.class.getName()); - useFilterIndex = true; - } - - if (getUseTemporal(conf)) { - indexList.add(AccumuloTemporalIndexer.class.getName()); - useFilterIndex = true; - } - - } - - if (useFilterIndex) { - optimizers.add(FilterFunctionOptimizer.class.getName()); - } - - if (getUseEntity(conf)) { - indexList.add(EntityCentricIndex.class.getName()); - optimizers.add(EntityOptimizer.class.getName()); - - } - - conf.setStrings(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS, indexList.toArray(new String[]{})); - conf.setStrings(RdfCloudTripleStoreConfiguration.CONF_OPTIMIZERS, optimizers.toArray(new String[]{})); - - } - - - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIdIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIdIndexer.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIdIndexer.java deleted file mode 100644 index 4c1a3ad..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/AccumuloDocIdIndexer.java +++ /dev/null @@ -1,443 +0,0 @@ -package mvm.rya.indexing.accumulo.entity; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import static mvm.rya.api.RdfCloudTripleStoreConstants.DELIM_BYTE; -import static mvm.rya.api.RdfCloudTripleStoreConstants.TYPE_DELIM_BYTE; -import info.aduna.iteration.CloseableIteration; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map.Entry; -import java.util.NoSuchElementException; -import java.util.Set; - -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.accumulo.documentIndex.DocIndexIteratorUtil; -import mvm.rya.accumulo.documentIndex.DocumentIndexIntersectingIterator; -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.api.domain.RyaURI; -import mvm.rya.api.resolver.RyaContext; -import mvm.rya.api.resolver.RyaToRdfConversions; -import mvm.rya.api.resolver.RyaTypeResolverException; -import mvm.rya.indexing.DocIdIndexer; -import mvm.rya.indexing.accumulo.ConfigUtils; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.BatchScanner; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.hadoop.io.Text; -import org.openrdf.query.BindingSet; -import org.openrdf.query.MalformedQueryException; -import org.openrdf.query.QueryEvaluationException; -import org.openrdf.query.algebra.StatementPattern; -import org.openrdf.query.algebra.TupleExpr; -import org.openrdf.query.algebra.evaluation.QueryBindingSet; -import org.openrdf.query.algebra.helpers.StatementPatternCollector; -import org.openrdf.query.parser.ParsedQuery; -import org.openrdf.query.parser.sparql.SPARQLParser; - -import com.google.common.base.Preconditions; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Sets; -import com.google.common.primitives.Bytes; - -public class AccumuloDocIdIndexer implements DocIdIndexer { - - - - private BatchScanner bs; - private AccumuloRdfConfiguration conf; - - public AccumuloDocIdIndexer(RdfCloudTripleStoreConfiguration conf) throws AccumuloException, AccumuloSecurityException { - Preconditions.checkArgument(conf instanceof RdfCloudTripleStoreConfiguration, "conf must be isntance of RdfCloudTripleStoreConfiguration"); - this.conf = (AccumuloRdfConfiguration) conf; - //Connector conn = ConfigUtils.getConnector(conf); - } - - - - - public CloseableIteration<BindingSet, QueryEvaluationException> queryDocIndex(String sparqlQuery, - Collection<BindingSet> constraints) throws TableNotFoundException, QueryEvaluationException { - - SPARQLParser parser = new SPARQLParser(); - ParsedQuery pq1 = null; - try { - pq1 = parser.parseQuery(sparqlQuery, null); - } catch (MalformedQueryException e) { - e.printStackTrace(); - } - - TupleExpr te1 = pq1.getTupleExpr(); - List<StatementPattern> spList1 = StatementPatternCollector.process(te1); - - if(StarQuery.isValidStarQuery(spList1)) { - StarQuery sq1 = new StarQuery(spList1); - return queryDocIndex(sq1, constraints); - } else { - throw new IllegalArgumentException("Invalid star query!"); - } - - } - - - - - @Override - public CloseableIteration<BindingSet, QueryEvaluationException> queryDocIndex(StarQuery query, - Collection<BindingSet> constraints) throws TableNotFoundException, QueryEvaluationException { - - final StarQuery starQ = query; - final Iterator<BindingSet> bs = constraints.iterator(); - final Iterator<BindingSet> bs2 = constraints.iterator(); - final Set<String> unCommonVarNames; - final Set<String> commonVarNames; - if (bs2.hasNext()) { - BindingSet currBs = bs2.next(); - commonVarNames = StarQuery.getCommonVars(query, currBs); - unCommonVarNames = Sets.difference(currBs.getBindingNames(), commonVarNames); - } else { - commonVarNames = Sets.newHashSet(); - unCommonVarNames = Sets.newHashSet(); - } - - if( commonVarNames.size() == 1 && !query.commonVarConstant() && commonVarNames.contains(query.getCommonVarName())) { - - final HashMultimap<String, BindingSet> map = HashMultimap.create(); - final String commonVar = starQ.getCommonVarName(); - final Iterator<Entry<Key, Value>> intersections; - final BatchScanner scan; - Set<Range> ranges = Sets.newHashSet(); - - while(bs.hasNext()) { - - BindingSet currentBs = bs.next(); - - if(currentBs.getBinding(commonVar) == null) { - continue; - } - - String row = currentBs.getBinding(commonVar).getValue().stringValue(); - ranges.add(new Range(row)); - map.put(row, currentBs); - - } - scan = runQuery(starQ, ranges); - intersections = scan.iterator(); - - - return new CloseableIteration<BindingSet, QueryEvaluationException>() { - - - private QueryBindingSet currentSolutionBs = null; - private boolean hasNextCalled = false; - private boolean isEmpty = false; - private Iterator<BindingSet> inputSet = new ArrayList<BindingSet>().iterator(); - private BindingSet currentBs; - private Key key; - - - - @Override - public boolean hasNext() throws QueryEvaluationException { - if (!hasNextCalled && !isEmpty) { - while (inputSet.hasNext() || intersections.hasNext()) { - if (!inputSet.hasNext()) { - key = intersections.next().getKey(); - inputSet = map.get(key.getRow().toString()).iterator(); - } - currentBs = inputSet.next(); - currentSolutionBs = deserializeKey(key, starQ, currentBs, unCommonVarNames); - - if (currentSolutionBs.size() == unCommonVarNames.size() + starQ.getUnCommonVars().size() +1) { - hasNextCalled = true; - return true; - } - - } - - isEmpty = true; - return false; - - } else if (isEmpty) { - return false; - } else { - return true; - } - - } - - - @Override - public BindingSet next() throws QueryEvaluationException { - - if (hasNextCalled) { - hasNextCalled = false; - } else if (isEmpty) { - throw new NoSuchElementException(); - } else { - if (this.hasNext()) { - hasNextCalled = false; - } else { - throw new NoSuchElementException(); - } - } - - return currentSolutionBs; - } - - @Override - public void remove() throws QueryEvaluationException { - throw new UnsupportedOperationException(); - } - - @Override - public void close() throws QueryEvaluationException { - scan.close(); - } - - }; - - - } else { - - return new CloseableIteration<BindingSet, QueryEvaluationException>() { - - @Override - public void remove() throws QueryEvaluationException { - throw new UnsupportedOperationException(); - } - - private Iterator<Entry<Key, Value>> intersections = null; - private QueryBindingSet currentSolutionBs = null; - private boolean hasNextCalled = false; - private boolean isEmpty = false; - private boolean init = false; - private BindingSet currentBs; - private StarQuery sq = new StarQuery(starQ); - private Set<Range> emptyRangeSet = Sets.newHashSet(); - private BatchScanner scan; - - @Override - public BindingSet next() throws QueryEvaluationException { - if (hasNextCalled) { - hasNextCalled = false; - } else if (isEmpty) { - throw new NoSuchElementException(); - } else { - if (this.hasNext()) { - hasNextCalled = false; - } else { - throw new NoSuchElementException(); - } - } - return currentSolutionBs; - } - - @Override - public boolean hasNext() throws QueryEvaluationException { - - if (!init) { - if (intersections == null && bs.hasNext()) { - currentBs = bs.next(); - sq = StarQuery.getConstrainedStarQuery(sq, currentBs); - scan = runQuery(sq,emptyRangeSet); - intersections = scan.iterator(); - // binding set empty - } else if (intersections == null && !bs.hasNext()) { - currentBs = new QueryBindingSet(); - scan = runQuery(starQ,emptyRangeSet); - intersections = scan.iterator(); - } - - init = true; - } - - if (!hasNextCalled && !isEmpty) { - while (intersections.hasNext() || bs.hasNext()) { - if (!intersections.hasNext()) { - scan.close(); - currentBs = bs.next(); - sq = StarQuery.getConstrainedStarQuery(sq, currentBs); - scan = runQuery(sq,emptyRangeSet); - intersections = scan.iterator(); - } - if (intersections.hasNext()) { - currentSolutionBs = deserializeKey(intersections.next().getKey(), sq, currentBs, - unCommonVarNames); - } else { - continue; - } - - if (sq.commonVarConstant() && currentSolutionBs.size() == unCommonVarNames.size() + sq.getUnCommonVars().size()) { - hasNextCalled = true; - return true; - } else if(currentSolutionBs.size() == unCommonVarNames.size() + sq.getUnCommonVars().size() + 1) { - hasNextCalled = true; - return true; - } - } - - isEmpty = true; - return false; - - } else if (isEmpty) { - return false; - } else { - return true; - } - } - - @Override - public void close() throws QueryEvaluationException { - scan.close(); - } - }; - } - } - - private QueryBindingSet deserializeKey(Key key, StarQuery sq, BindingSet currentBs, Set<String> unCommonVar) { - - - QueryBindingSet currentSolutionBs = new QueryBindingSet(); - - Text row = key.getRow(); - Text cq = key.getColumnQualifier(); - - - String[] cqArray = cq.toString().split(DocIndexIteratorUtil.DOC_ID_INDEX_DELIM); - - boolean commonVarSet = false; - - //if common Var is constant there is no common variable to assign a value to - if(sq.commonVarConstant()) { - commonVarSet = true; - } - - if (!commonVarSet && sq.isCommonVarURI()) { - RyaURI rURI = new RyaURI(row.toString()); - currentSolutionBs.addBinding(sq.getCommonVarName(), - RyaToRdfConversions.convertValue(rURI)); - commonVarSet = true; - } - - for (String s : sq.getUnCommonVars()) { - - byte[] cqBytes = cqArray[sq.getVarPos().get(s)].getBytes(); - int firstIndex = Bytes.indexOf(cqBytes, DELIM_BYTE); - int secondIndex = Bytes.lastIndexOf(cqBytes, DELIM_BYTE); - int typeIndex = Bytes.indexOf(cqBytes, TYPE_DELIM_BYTE); - byte[] tripleComponent = Arrays.copyOfRange(cqBytes, firstIndex + 1, secondIndex); - byte[] cqContent = Arrays.copyOfRange(cqBytes, secondIndex + 1, typeIndex); - byte[] objType = Arrays.copyOfRange(cqBytes, typeIndex, cqBytes.length); - - if (new String(tripleComponent).equals("object")) { - byte[] object = Bytes.concat(cqContent, objType); - org.openrdf.model.Value v = null; - try { - v = RyaToRdfConversions.convertValue(RyaContext.getInstance().deserialize( - object)); - } catch (RyaTypeResolverException e) { - e.printStackTrace(); - } - currentSolutionBs.addBinding(s, v); - - } else if (new String(tripleComponent).equals("subject")) { - if (!commonVarSet) { - byte[] object = Bytes.concat(row.getBytes(), objType); - org.openrdf.model.Value v = null; - try { - v = RyaToRdfConversions.convertValue(RyaContext.getInstance().deserialize( - object)); - } catch (RyaTypeResolverException e) { - e.printStackTrace(); - } - currentSolutionBs.addBinding(sq.getCommonVarName(), v); - commonVarSet = true; - } - RyaURI rURI = new RyaURI(new String(cqContent)); - currentSolutionBs.addBinding(s, RyaToRdfConversions.convertValue(rURI)); - } else { - throw new IllegalArgumentException("Invalid row."); - } - } - for (String s : unCommonVar) { - currentSolutionBs.addBinding(s, currentBs.getValue(s)); - } - return currentSolutionBs; - } - - private BatchScanner runQuery(StarQuery query, Collection<Range> ranges) throws QueryEvaluationException { - - try { - if (ranges.size() == 0) { - String rangeText = query.getCommonVarValue(); - Range r; - if (rangeText != null) { - r = new Range(new Text(query.getCommonVarValue())); - } else { - r = new Range(); - } - ranges = Collections.singleton(r); - } - - Connector accCon = ConfigUtils.getConnector(conf); - IteratorSetting is = new IteratorSetting(30, "fii", DocumentIndexIntersectingIterator.class); - - DocumentIndexIntersectingIterator.setColumnFamilies(is, query.getColumnCond()); - - if (query.hasContext()) { - DocumentIndexIntersectingIterator.setContext(is, query.getContextURI()); - } - bs = accCon.createBatchScanner(EntityCentricIndex.getTableName(conf), - new Authorizations(conf.get(ConfigUtils.CLOUDBASE_AUTHS)), 15); - bs.addScanIterator(is); - bs.setRanges(ranges); - - return bs; - - } catch (TableNotFoundException | AccumuloException | AccumuloSecurityException e) { - throw new QueryEvaluationException(e); - } - } - - @Override - public void close() throws IOException { - //TODO generate an exception when BS passed in -- scanner closed -// if (bs != null) { -// bs.close(); -// } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndex.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndex.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndex.java deleted file mode 100644 index 9a9daa5..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityCentricIndex.java +++ /dev/null @@ -1,327 +0,0 @@ -package mvm.rya.indexing.accumulo.entity; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_CV; -import static mvm.rya.accumulo.AccumuloRdfConstants.EMPTY_VALUE; -import static mvm.rya.api.RdfCloudTripleStoreConstants.DELIM_BYTES; -import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_BYTES; -import static mvm.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Set; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.MultiTableBatchWriter; -import org.apache.accumulo.core.client.MutationsRejectedException; -import org.apache.accumulo.core.client.TableExistsException; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.apache.log4j.Logger; -import org.openrdf.model.URI; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.primitives.Bytes; - -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer; -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.RyaType; -import mvm.rya.api.domain.RyaURI; -import mvm.rya.api.resolver.RyaContext; -import mvm.rya.api.resolver.RyaTypeResolverException; -import mvm.rya.api.resolver.triple.TripleRow; -import mvm.rya.indexing.accumulo.ConfigUtils; - -public class EntityCentricIndex extends AbstractAccumuloIndexer { - - private static final Logger logger = Logger.getLogger(EntityCentricIndex.class); - private static final String TABLE_SUFFIX = "EntityCentricIndex"; - - private AccumuloRdfConfiguration conf; - private BatchWriter writer; - private boolean isInit = false; - - private void initInternal() throws AccumuloException, AccumuloSecurityException, TableNotFoundException, IOException, - TableExistsException { - ConfigUtils.createTableIfNotExists(conf, getTableName()); - } - - @Override - public Configuration getConf() { - return conf; - } - - //initialization occurs in setConf because index is created using reflection - @Override - public void setConf(final Configuration conf) { - if (conf instanceof AccumuloRdfConfiguration) { - this.conf = (AccumuloRdfConfiguration) conf; - } else { - this.conf = new AccumuloRdfConfiguration(conf); - } - if (!isInit) { - try { - initInternal(); - isInit = true; - } catch (final AccumuloException e) { - logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); - throw new RuntimeException(e); - } catch (final AccumuloSecurityException e) { - logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); - throw new RuntimeException(e); - } catch (final TableNotFoundException e) { - logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); - throw new RuntimeException(e); - } catch (final TableExistsException e) { - logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); - throw new RuntimeException(e); - } catch (final IOException e) { - logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); - throw new RuntimeException(e); - } - } - } - - /** - * Get the Accumulo table used by this index. - * @return table used by instances of this index - */ - @Override - public String getTableName() { - return getTableName(conf); - } - - /** - * Get the Accumulo table that will be used by this index. - * @param conf - * @return table name guaranteed to be used by instances of this index - */ - public static String getTableName(Configuration conf) { - return ConfigUtils.getTablePrefix(conf) + TABLE_SUFFIX; - } - - @Override - public void setMultiTableBatchWriter(final MultiTableBatchWriter writer) throws IOException { - try { - this.writer = writer.getBatchWriter(getTableName()); - } catch (final AccumuloException e) { - throw new IOException(e); - } catch (final AccumuloSecurityException e) { - throw new IOException(e); - } catch (final TableNotFoundException e) { - throw new IOException(e); - } - } - - @Override - public void storeStatement(final RyaStatement stmt) throws IOException { - Preconditions.checkNotNull(writer, "BatchWriter not Set"); - try { - for (final TripleRow row : serializeStatement(stmt)) { - writer.addMutation(createMutation(row)); - } - } catch (final MutationsRejectedException e) { - throw new IOException(e); - } catch (final RyaTypeResolverException e) { - throw new IOException(e); - } - } - - @Override - public void deleteStatement(final RyaStatement stmt) throws IOException { - Preconditions.checkNotNull(writer, "BatchWriter not Set"); - try { - for (final TripleRow row : serializeStatement(stmt)) { - writer.addMutation(deleteMutation(row)); - } - } catch (final MutationsRejectedException e) { - throw new IOException(e); - } catch (final RyaTypeResolverException e) { - throw new IOException(e); - } - } - - protected Mutation deleteMutation(final TripleRow tripleRow) { - final Mutation m = new Mutation(new Text(tripleRow.getRow())); - - final byte[] columnFamily = tripleRow.getColumnFamily(); - final Text cfText = columnFamily == null ? EMPTY_TEXT : new Text(columnFamily); - - final byte[] columnQualifier = tripleRow.getColumnQualifier(); - final Text cqText = columnQualifier == null ? EMPTY_TEXT : new Text(columnQualifier); - - final byte[] columnVisibility = tripleRow.getColumnVisibility(); - final ColumnVisibility cv = columnVisibility == null ? EMPTY_CV : new ColumnVisibility(columnVisibility); - - m.putDelete(cfText, cqText, cv, tripleRow.getTimestamp()); - return m; - } - - public static Collection<Mutation> createMutations(final RyaStatement stmt) throws RyaTypeResolverException{ - final Collection<Mutation> m = Lists.newArrayList(); - for (final TripleRow tr : serializeStatement(stmt)){ - m.add(createMutation(tr)); - } - return m; - } - - private static Mutation createMutation(final TripleRow tripleRow) { - final Mutation mutation = new Mutation(new Text(tripleRow.getRow())); - final byte[] columnVisibility = tripleRow.getColumnVisibility(); - final ColumnVisibility cv = columnVisibility == null ? EMPTY_CV : new ColumnVisibility(columnVisibility); - final Long timestamp = tripleRow.getTimestamp(); - final byte[] value = tripleRow.getValue(); - final Value v = value == null ? EMPTY_VALUE : new Value(value); - final byte[] columnQualifier = tripleRow.getColumnQualifier(); - final Text cqText = columnQualifier == null ? EMPTY_TEXT : new Text(columnQualifier); - final byte[] columnFamily = tripleRow.getColumnFamily(); - final Text cfText = columnFamily == null ? EMPTY_TEXT : new Text(columnFamily); - - mutation.put(cfText, cqText, cv, timestamp, v); - return mutation; - } - - private static List<TripleRow> serializeStatement(final RyaStatement stmt) throws RyaTypeResolverException { - final RyaURI subject = stmt.getSubject(); - final RyaURI predicate = stmt.getPredicate(); - final RyaType object = stmt.getObject(); - final RyaURI context = stmt.getContext(); - final Long timestamp = stmt.getTimestamp(); - final byte[] columnVisibility = stmt.getColumnVisibility(); - final byte[] value = stmt.getValue(); - assert subject != null && predicate != null && object != null; - final byte[] cf = (context == null) ? EMPTY_BYTES : context.getData().getBytes(); - final byte[] subjBytes = subject.getData().getBytes(); - final byte[] predBytes = predicate.getData().getBytes(); - final byte[][] objBytes = RyaContext.getInstance().serializeType(object); - - return Lists.newArrayList(new TripleRow(subjBytes, - predBytes, - Bytes.concat(cf, DELIM_BYTES, - "object".getBytes(), DELIM_BYTES, - objBytes[0], objBytes[1]), - timestamp, - columnVisibility, - value), - new TripleRow(objBytes[0], - predBytes, - Bytes.concat(cf, DELIM_BYTES, - "subject".getBytes(), DELIM_BYTES, - subjBytes, objBytes[1]), - timestamp, - columnVisibility, - value)); - } - - /** - * Deserialize a row from the entity-centric index. - * @param key Row key, contains statement data - * @param value Row value - * @return The statement represented by the row - * @throws IOException if edge direction can't be extracted as expected. - * @throws RyaTypeResolverException if a type error occurs deserializing the statement's object. - */ - public static RyaStatement deserializeStatement(Key key, Value value) throws RyaTypeResolverException, IOException { - assert key != null; - assert value != null; - byte[] entityBytes = key.getRowData().toArray(); - byte[] predicateBytes = key.getColumnFamilyData().toArray(); - byte[] data = key.getColumnQualifierData().toArray(); - long timestamp = key.getTimestamp(); - byte[] columnVisibility = key.getColumnVisibilityData().toArray(); - byte[] valueBytes = value.get(); - - // main entity is either the subject or object - // data contains: column family , var name of other node , data of other node + datatype of object - int split = Bytes.indexOf(data, DELIM_BYTES); - byte[] columnFamily = Arrays.copyOf(data, split); - byte[] edgeBytes = Arrays.copyOfRange(data, split + DELIM_BYTES.length, data.length); - split = Bytes.indexOf(edgeBytes, DELIM_BYTES); - String otherNodeVar = new String(Arrays.copyOf(edgeBytes, split)); - byte[] otherNodeBytes = Arrays.copyOfRange(edgeBytes, split + DELIM_BYTES.length, edgeBytes.length - 2); - byte[] typeBytes = Arrays.copyOfRange(edgeBytes, edgeBytes.length - 2, edgeBytes.length); - byte[] objectBytes; - RyaURI subject; - RyaURI predicate = new RyaURI(new String(predicateBytes)); - RyaType object; - RyaURI context = null; - // Expect either: entity=subject.data, otherNodeVar="object", otherNodeBytes={object.data, object.datatype_marker} - // or: entity=object.data, otherNodeVar="subject", otherNodeBytes={subject.data, object.datatype_marker} - switch (otherNodeVar) { - case "subject": - subject = new RyaURI(new String(otherNodeBytes)); - objectBytes = Bytes.concat(entityBytes, typeBytes); - break; - case "object": - subject = new RyaURI(new String(entityBytes)); - objectBytes = Bytes.concat(otherNodeBytes, typeBytes); - break; - default: - throw new IOException("Failed to deserialize entity-centric index row. " - + "Expected 'subject' or 'object', encountered: '" + otherNodeVar + "'"); - } - object = RyaContext.getInstance().deserialize(objectBytes); - if (columnFamily != null && columnFamily.length > 0) { - context = new RyaURI(new String(columnFamily)); - } - return new RyaStatement(subject, predicate, object, context, - null, columnVisibility, valueBytes, timestamp); - } - - @Override - public void init() { - } - - @Override - public void setConnector(final Connector connector) { - } - - @Override - public void destroy() { - } - - @Override - public void purge(final RdfCloudTripleStoreConfiguration configuration) { - } - - @Override - public void dropAndDestroy() { - } - - @Override - public Set<URI> getIndexablePredicates() { - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityLocalityGroupSetter.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityLocalityGroupSetter.java b/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityLocalityGroupSetter.java deleted file mode 100644 index 2030e58..0000000 --- a/extras/indexing/src/main/java/mvm/rya/indexing/accumulo/entity/EntityLocalityGroupSetter.java +++ /dev/null @@ -1,171 +0,0 @@ -package mvm.rya.indexing.accumulo.entity; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map.Entry; -import java.util.NoSuchElementException; -import java.util.Set; -import mvm.rya.indexing.accumulo.ConfigUtils; -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.BatchScanner; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; - -public class EntityLocalityGroupSetter { - - - String tablePrefix; - Connector conn; - Configuration conf; - - public EntityLocalityGroupSetter(String tablePrefix, Connector conn, Configuration conf) { - this.conn = conn; - this.tablePrefix = tablePrefix; - this.conf = conf; - } - - - - private Iterator<String> getPredicates() { - - String auths = conf.get(ConfigUtils.CLOUDBASE_AUTHS); - BatchScanner bs = null; - try { - bs = conn.createBatchScanner(tablePrefix + "prospects", new Authorizations(auths), 10); - } catch (TableNotFoundException e) { - e.printStackTrace(); - } - bs.setRanges(Collections.singleton(Range.prefix(new Text("predicate" + "\u0000")))); - final Iterator<Entry<Key,Value>> iter = bs.iterator(); - - return new Iterator<String>() { - - private String next = null; - private boolean hasNextCalled = false; - private boolean isEmpty = false; - - @Override - public boolean hasNext() { - - if (!hasNextCalled && !isEmpty) { - while (iter.hasNext()) { - Entry<Key,Value> temp = iter.next(); - String row = temp.getKey().getRow().toString(); - String[] rowArray = row.split("\u0000"); - next = rowArray[1]; - - hasNextCalled = true; - return true; - } - isEmpty = true; - return false; - } else if(isEmpty) { - return false; - }else { - return true; - } - } - - @Override - public String next() { - - if (hasNextCalled) { - hasNextCalled = false; - return next; - } else if(isEmpty) { - throw new NoSuchElementException(); - }else { - if (this.hasNext()) { - hasNextCalled = false; - return next; - } else { - throw new NoSuchElementException(); - } - } - } - - @Override - public void remove() { - - throw new UnsupportedOperationException("Cannot delete from iterator!"); - - } - - }; - } - - - - - - - - - public void setLocalityGroups() { - - HashMap<String, Set<Text>> localityGroups = new HashMap<String, Set<Text>>(); - Iterator<String> groups = getPredicates(); - - int i = 1; - - while(groups.hasNext()) { - HashSet<Text> tempColumn = new HashSet<Text>(); - String temp = groups.next(); - tempColumn.add(new Text(temp)); - String groupName = "predicate" + i; - localityGroups.put(groupName, tempColumn); - i++; - } - - - try { - conn.tableOperations().setLocalityGroups(tablePrefix + "doc_partitioned_index", localityGroups); - //conn.tableOperations().compact(tablePrefix + "doc_partitioned_index", null, null, true, true); - } catch (AccumuloException e) { - e.printStackTrace(); - } catch (AccumuloSecurityException e) { - e.printStackTrace(); - } catch (TableNotFoundException e) { - e.printStackTrace(); - } - - - - } - - - - - - - -}
