http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/accumulo/documentIndex/DocumentIndexIntersectingIterator.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/accumulo/documentIndex/DocumentIndexIntersectingIterator.java b/extras/indexing/src/main/java/org/apache/rya/accumulo/documentIndex/DocumentIndexIntersectingIterator.java new file mode 100644 index 0000000..ad38b2b --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/accumulo/documentIndex/DocumentIndexIntersectingIterator.java @@ -0,0 +1,850 @@ +package mvm.rya.accumulo.documentIndex; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; + +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.data.ArrayByteSequence; +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; +import org.apache.accumulo.core.util.TextUtil; +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; + +/** + * This iterator facilitates document-partitioned indexing. It involves grouping a set of documents together and indexing those documents into a single row of + * an Accumulo table. This allows a tablet server to perform boolean AND operations on terms in the index. + * + * The table structure should have the following form: + * + * row: shardID, colfam: term, colqual: docID + * + * When you configure this iterator with a set of terms (column families), it will return only the docIDs that appear with all of the specified terms. The + * result will have an empty column family, as follows: + * + * row: shardID, colfam: (empty), colqual: docID + * + * This iterator is commonly used with BatchScanner or AccumuloInputFormat, to parallelize the search over all shardIDs. + * + * This iterator will *ignore* any columnFamilies passed to {@link #seek(Range, Collection, boolean)} as it performs intersections over terms. Extending classes + * should override the {@link TermSource#seekColfams} in their implementation's {@link #init(SortedKeyValueIterator, Map, IteratorEnvironment)} method. + * + * README.shard in docs/examples shows an example of using the IntersectingIterator. + */ +public class DocumentIndexIntersectingIterator implements SortedKeyValueIterator<Key,Value> { + + + + + protected Text nullText = new Text(); + + protected Text getRow(Key key) { + return key.getRow(); + } + + protected Text getTerm(Key key) { + return key.getColumnFamily(); + } + + protected Text getTermCond(Key key) { + return key.getColumnQualifier(); + } + + protected Key buildKey(Text row, TextColumn column) { + return new Key(row, (column.getColumnFamily() == null) ? nullText: column.getColumnFamily(), column.getColumnQualifier()); + } + + protected Key buildKey(Text row, Text term) { + return new Key(row, (term == null) ? nullText : term); + } + + protected Key buildKey(Text row, Text term, Text termCond) { + return new Key(row, (term == null) ? nullText : term, termCond); + } + + protected Key buildFollowRowKey(Key key, Text term, Text termCond) { + return new Key(getRow(key.followingKey(PartialKey.ROW)),(term == null) ? nullText : term, termCond); + } + + protected static final Logger log = Logger.getLogger(DocumentIndexIntersectingIterator.class); + + public static class TermSource { + public SortedKeyValueIterator<Key, Value> iter; + public Text term; + public Text termCond; + public Collection<ByteSequence> seekColfams; + public TextColumn column; + public boolean isPrefix; + public Key top ; + public Key next ; + public Text currentCQ; + private boolean seeked = false; + + public TermSource(TermSource other) { + + this.iter = other.iter; + this.term = other.term; + this.termCond = other.termCond; + this.seekColfams = other.seekColfams; + this.column = other.column; + this.top = other.top; + this.next = other.next; + this.currentCQ = other.currentCQ; + this.isPrefix = other.isPrefix; + } + + + public TermSource(SortedKeyValueIterator<Key, Value> iter, TextColumn column) { + + this.iter = iter; + this.column = column; + this.term = column.getColumnFamily(); + this.termCond = column.getColumnQualifier(); + this.currentCQ = new Text(emptyByteArray); + this.seekColfams = Collections.<ByteSequence> singletonList(new ArrayByteSequence(term + .getBytes(), 0, term.getLength())); + + } + + + + public void seek(Range r) throws IOException { + + if (seeked) { + + if (next != null && !r.beforeStartKey(next)) { + if (next.getColumnFamily().equals(term)) { + this.updateTop(); + } + } else if (iter.hasTop()) { + iter.seek(r, seekColfams, true); + this.updateTopNext(); + } else { + top = null; + next = null; + + } + } else { + + iter.seek(r, seekColfams, true); + this.updateTopNext(); + seeked = true; + } + + } + + + public void next() throws IOException { + + this.updateTop(); + } + + public void updateTop() throws IOException { + + top = next; + if (next != null) { + iter.next(); + if (iter.hasTop()) { + next = iter.getTopKey(); + } else { + next = null; + } + } + + } + + public void updateTopNext() throws IOException { + + if (iter.hasTop()) { + top = iter.getTopKey(); + } else { + top = null; + next = null; + return; + } + + iter.next(); + + if(iter.hasTop()) { + next = iter.getTopKey(); + } else { + next = null; + } + } + + public boolean hasTop() { + return top != null; + } + + + public String getTermString() { + return (this.term == null) ? new String("Iterator") : this.term.toString(); + } + } + + TermSource[] sources; + int sourcesCount = 0; + Range overallRange; + + // query-time settings + protected Text currentRow = null; + protected Text currentTermCond = new Text(emptyByteArray); + static final byte[] emptyByteArray = new byte[0]; + + protected Key topKey = null; + protected Value value = new Value(emptyByteArray); + protected String ctxt = null; + protected boolean hasContext = false; + protected boolean termCondSet = false; + + public DocumentIndexIntersectingIterator() {} + + @Override + public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { + //log.info("Calling deep copy on " + this); + return new DocumentIndexIntersectingIterator(this, env); + } + + private DocumentIndexIntersectingIterator(DocumentIndexIntersectingIterator other, IteratorEnvironment env) { + if (other.sources != null) { + sourcesCount = other.sourcesCount; + sources = new TermSource[sourcesCount]; + for (int i = 0; i < sourcesCount; i++) { + sources[i] = new TermSource(other.sources[i].iter.deepCopy(env), other.sources[i].column); + } + } + } + + @Override + public Key getTopKey() { + + return topKey; + } + + @Override + public Value getTopValue() { + // we don't really care about values + return value; + } + + @Override + public boolean hasTop() { + return currentRow != null; + } + + // precondition: currentRow is not null + private boolean seekOneSource(int sourceID) throws IOException { + // find the next key in the appropriate column family that is at or + // beyond the cursor (currentRow, currentCQ) + // advance the cursor if this source goes beyond it + // return whether we advanced the cursor + + // within this loop progress must be made in one of the following forms: + // - currentRow or currentCQ must be increased + // - the given source must advance its iterator + // this loop will end when any of the following criteria are met + // - the iterator for the given source is pointing to the key + // (currentRow, columnFamilies[sourceID], currentCQ) + // - the given source is out of data and currentRow is set to null + // - the given source has advanced beyond the endRow and currentRow is + // set to null + boolean advancedCursor = false; + + + + + + while (true) { + +// if(currentRow.toString().equals(s)) { +// log.info("Source id is " + sourceID); +// if (sources[sourceID].top != null) { +// log.info("Top row is " + getRow(sources[sourceID].top)); +// log.info("Top cq is " + getTermCond(sources[sourceID].top)); +// } +// if (sources[sourceID].next != null) { +// log.info("Next row is " + getRow(sources[sourceID].next)); +// log.info("Next termCond is " + getTermCond(sources[sourceID].next)); +// } +// } + + if (sources[sourceID].hasTop() == false) { + currentRow = null; + // setting currentRow to null counts as advancing the cursor + return true; + } + // check if we're past the end key + int endCompare = -1; + // we should compare the row to the end of the range + + if (overallRange.getEndKey() != null) { + endCompare = overallRange.getEndKey().getRow().compareTo(sources[sourceID].top.getRow()); + if ((!overallRange.isEndKeyInclusive() && endCompare <= 0) || endCompare < 0) { + currentRow = null; + // setting currentRow to null counts as advancing the cursor + return true; + } + } + + + + int rowCompare = currentRow.compareTo(getRow(sources[sourceID].top)); + // check if this source is already at or beyond currentRow + // if not, then seek to at least the current row + + + + if (rowCompare > 0) { + // seek to at least the currentRow + Key seekKey = buildKey(currentRow, sources[sourceID].term); + sources[sourceID].seek(new Range(seekKey, true, null, false)); + + continue; + } + // check if this source has gone beyond currentRow + // if so, advance currentRow + if (rowCompare < 0) { + currentRow.set(getRow(sources[sourceID].top)); + //log.info("Current row is " + currentRow); + advancedCursor = true; + continue; + } + // we have verified that the current source is positioned in + // currentRow + // now we must make sure we're in the right columnFamily in the + // current row + // Note: Iterators are auto-magically set to the correct + // columnFamily + + if (sources[sourceID].column.isValid()) { + + boolean isPrefix = false; + boolean contextEqual = false; + String tempContext = ""; + + int termCompare; + + String[] cQ = getTermCond(sources[sourceID].top).toString().split("\u0000"); + tempContext = cQ[0]; + + if (!hasContext && ctxt == null) { + ctxt = cQ[0]; + } + + contextEqual = ctxt.equals(cQ[0]); + + String s1 = sources[sourceID].termCond.toString(); + String s2 = cQ[1] + "\u0000" + cQ[2]; + + if (sources[sourceID].isPrefix) { + isPrefix = s2.startsWith(s1 + "\u0000"); + } else { + isPrefix = s2.startsWith(s1); + } + + termCompare = (contextEqual && isPrefix) ? 0 : (ctxt + "\u0000" + s1).compareTo(cQ[0] + "\u0000" + s2); + + // if(currentRow.toString().equals(s)) { + // log.info("Term compare is " + termCompare); + // } + + // check if this source is already on the right columnFamily + // if not, then seek forwards to the right columnFamily + if (termCompare > 0) { + Key seekKey = buildKey(currentRow, sources[sourceID].term, new Text(ctxt + + "\u0000" + sources[sourceID].termCond.toString())); + sources[sourceID].seek(new Range(seekKey, true, null, false)); + + continue; + } + // check if this source is beyond the right columnFamily + // if so, then seek to the next row + if (termCompare < 0) { + // we're out of entries in the current row, so seek to the + // next one + + if (endCompare == 0) { + // we're done + currentRow = null; + // setting currentRow to null counts as advancing the + // cursor + return true; + } + + + + //advance to next row if context set - all entries in given row exhausted + if (hasContext || tempContext.length() == 0) { + Key seekKey = buildFollowRowKey(sources[sourceID].top, sources[sourceID].term, + new Text(ctxt + "\u0000" + sources[sourceID].termCond.toString())); + sources[sourceID].seek(new Range(seekKey, true, null, false)); + } else { + + if(contextEqual && !isPrefix) { + Key seekKey = buildKey(currentRow, sources[sourceID].term, new Text(ctxt + "\u0001")); + sources[sourceID].seek(new Range(seekKey, true, null, false)); + if(sources[sourceID].top != null) { + ctxt = getTermCond(sources[sourceID].top).toString().split("\u0000")[0]; + } + } else { + Key seekKey = buildKey(currentRow, sources[sourceID].term, new Text(tempContext + + "\u0000" + sources[sourceID].termCond.toString())); + sources[sourceID].seek(new Range(seekKey, true, null, false)); + if(sources[sourceID].top != null) { + ctxt = getTermCond(sources[sourceID].top).toString().split("\u0000")[0]; + } + } + + } + + +// if(currentRow.toString().equals(s)) { +// log.info("current term cond is " + currentTermCond); +// +// } + + + continue; + } + } + + + + + + + + + + + //set currentTermCond -- gets appended to end of currentKey column qualifier + //used to determine which term iterator to advance when a new iterator is created + + sources[sourceID].currentCQ.set(getTermCond(sources[sourceID].top)); + + if (sources[sourceID].next != null) { + + //is hasContext, only consider sourceID with next having designated context + //otherwise don't set currentTermCond + if (!termCondSet && hasContext) { + if (sources[sourceID].next.getRow().equals(currentRow) + && sources[sourceID].next.getColumnQualifier().toString() + .startsWith(ctxt + "\u0000" + sources[sourceID].termCond.toString())) { + currentTermCond.set(new Text(Integer.toString(sourceID))); + termCondSet = true; + } + } else if(!termCondSet){ + String[] cq = getTermCond(sources[sourceID].next).toString().split("\u0000"); + + //set currentTermCond with preference given to sourceID having next with same context + //otherwise set currentTermCond sourceID with next having termCond as prefix + if (sources[sourceID].next.getRow().equals(currentRow)) { + if (sources[sourceID].next.getColumnQualifier().toString() + .startsWith(ctxt + "\u0000" + sources[sourceID].termCond.toString())) { + currentTermCond.set(new Text(Integer.toString(sourceID))); + termCondSet = true; + } else if ((cq[1] + "\u0000" + cq[2]).startsWith(sources[sourceID].termCond.toString())) { + currentTermCond.set(new Text(Integer.toString(sourceID))); + } + } + } + } + + + break; + } + + return advancedCursor; + } + + @Override + public void next() throws IOException { + if (currentRow == null) { + return; + } + + + + if(currentTermCond.getLength() != 0) { + + int id = Integer.parseInt(currentTermCond.toString()); + + sources[id].next(); + currentTermCond.set(emptyByteArray); + termCondSet = false; + if(sources[id].top != null && !hasContext) { + ctxt = getTermCond(sources[id].top).toString().split("\u0000")[0]; + } + advanceToIntersection(); + return; + } + + sources[0].next(); + if(sources[0].top != null && !hasContext) { + ctxt = getTermCond(sources[0].top).toString().split("\u0000")[0]; + } + advanceToIntersection(); + } + + protected void advanceToIntersection() throws IOException { + boolean cursorChanged = true; + while (cursorChanged) { + // seek all of the sources to at least the highest seen column qualifier in the current row + cursorChanged = false; + for (int i = 0; i < sourcesCount; i++) { +// log.info("New sourceID is " + i); + if (currentRow == null) { + topKey = null; + return; + } + if (seekOneSource(i)) { + currentTermCond.set(emptyByteArray); + termCondSet = false; + cursorChanged = true; + break; + } + } + } + String cq = ""; + for(int i = 0; i < sourcesCount; i++) { + cq = cq + sources[i].currentCQ.toString() + DocIndexIteratorUtil.DOC_ID_INDEX_DELIM; + } + + if (currentTermCond.getLength() == 0) { + topKey = buildKey(currentRow, nullText, new Text(cq + -1)); + } else { + topKey = buildKey(currentRow, nullText, new Text(cq + currentTermCond.toString())); + } + } + + public static String stringTopKey(SortedKeyValueIterator<Key,Value> iter) { + if (iter.hasTop()) + return iter.getTopKey().toString(); + return ""; + } + + private static final String columnOptionName = "columns"; + private static final String columnPrefix = "prefixes"; + private static final String context = "context"; + + + + protected static String encodeColumns(TextColumn[] columns) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < columns.length; i++) { + sb.append(new String(Base64.encodeBase64(TextUtil.getBytes(columns[i].getColumnFamily())))); + sb.append('\n'); + sb.append(new String(Base64.encodeBase64(TextUtil.getBytes(columns[i].getColumnQualifier())))); + sb.append('\u0001'); + } + return sb.toString(); + } + + + + protected static TextColumn[] decodeColumns(String columns) { + String[] columnStrings = columns.split("\u0001"); + TextColumn[] columnTexts = new TextColumn[columnStrings.length]; + for (int i = 0; i < columnStrings.length; i++) { + String[] columnComponents = columnStrings[i].split("\n"); + columnTexts[i] = new TextColumn(new Text(Base64.decodeBase64(columnComponents[0].getBytes())), + new Text(Base64.decodeBase64(columnComponents[1].getBytes()))); + } + return columnTexts; + } + + + + + + /** + * @param context + * @return encoded context + */ + protected static String encodeContext(String context) { + + return new String(Base64.encodeBase64(context.getBytes())); + } + + + + /** + * @param context + * @return decoded context + */ + protected static String decodeContext(String context) { + + if (context == null) { + return null; + } else { + return new String(Base64.decodeBase64(context.getBytes())); + } + } + + + + + + protected static String encodeBooleans(boolean[] prefixes) { + byte[] bytes = new byte[prefixes.length]; + for (int i = 0; i < prefixes.length; i++) { + if (prefixes[i]) + bytes[i] = 1; + else + bytes[i] = 0; + } + return new String(Base64.encodeBase64(bytes)); + } + + /** + * @param flags + * @return decoded flags + */ + protected static boolean[] decodeBooleans(String prefixes) { + // return null of there were no flags + if (prefixes == null) + return null; + + byte[] bytes = Base64.decodeBase64(prefixes.getBytes()); + boolean[] bFlags = new boolean[bytes.length]; + for (int i = 0; i < bytes.length; i++) { + if (bytes[i] == 1) + bFlags[i] = true; + else + bFlags[i] = false; + } + return bFlags; + } + + + + + + + + + @Override + public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { + TextColumn[] terms = decodeColumns(options.get(columnOptionName)); + boolean[] prefixes = decodeBooleans(options.get(columnPrefix)); + ctxt = decodeContext(options.get(context)); + + if(ctxt != null) { + hasContext = true; + } + + + + if (terms.length < 2) { + throw new IllegalArgumentException("IntersectionIterator requires two or more columns families"); + } + + sources = new TermSource[terms.length]; + sources[0] = new TermSource(source, terms[0]); + for (int i = 1; i < terms.length; i++) { + //log.info("For decoded column " + i + " column family is " + terms[i].getColumnFamily() + " and qualifier is " + terms[i].getColumnQualifier()); + sources[i] = new TermSource(source.deepCopy(env), terms[i]); + sources[i].isPrefix = prefixes[i]; + } + sourcesCount = terms.length; + } + + @Override + public void seek(Range range, Collection<ByteSequence> seekColumnFamilies, boolean inclusive) throws IOException { + overallRange = new Range(range); + currentRow = new Text(); + currentTermCond.set(emptyByteArray); + termCondSet = false; + + + +// log.info("Calling seek with range " + range); + + // seek each of the sources to the right column family within the row + // given by key + + Key sourceKey; + + if (rangeCqValid(range)) { + + String[] cqInfo = cqParser(range.getStartKey().getColumnQualifier()); + int id = Integer.parseInt(cqInfo[1]); + + + + if (id >= 0) { + for (int i = 0; i < sourcesCount; i++) { + + if (i == id) { + sourceKey = buildKey(getRow(range.getStartKey()), sources[i].term, new Text(cqInfo[0])); + sources[i].seek(new Range(sourceKey, true, null, false)); + sources[i].next(); + if(!hasContext && sources[i].hasTop()) { + ctxt = getTermCond(sources[i].top).toString().split("\u0000")[0]; + } + } else { + sourceKey = buildKey(getRow(range.getStartKey()), sources[i].term); + sources[i].seek(new Range(sourceKey, true, null, false)); + } + } + } else { + + + for (int i = 0; i < sourcesCount; i++) { + sourceKey = buildKey(getRow(range.getStartKey()), sources[i].term, range.getStartKey() + .getColumnQualifier()); + sources[i].seek(new Range(sourceKey, true, null, false)); + } + } + + + } else { + +// log.info("Range is invalid."); + for (int i = 0; i < sourcesCount; i++) { + + if (range.getStartKey() != null) { + + sourceKey = buildKey(getRow(range.getStartKey()), sources[i].term); + + // Seek only to the term for this source as a column family + sources[i].seek(new Range(sourceKey, true, null, false)); + } else { + // Seek only to the term for this source as a column family + + sources[i].seek(range); + } + } + } + + advanceToIntersection(); + + } + + + private String[] cqParser(Text cq) { + + String cQ = cq.toString(); + String[] cqComponents = cQ.split(DocIndexIteratorUtil.DOC_ID_INDEX_DELIM); + int id = -1; + String[] valPos = new String[2]; + + + + + if(cqComponents.length > 1) { + id = Integer.parseInt(cqComponents[cqComponents.length-1]); + if (id >= 0) { + valPos[0] = cqComponents[id].toString(); + valPos[1] = "" + id; + } else { + valPos[0] = cqComponents[0].toString(); + valPos[1] = "" + id; + } + } else { + valPos[0] = cq.toString(); + valPos[1] = "" + -1; + } + + return valPos; + + } + + + private boolean rangeCqValid(Range range) { + return (range.getStartKey() != null) && (range.getStartKey().getColumnQualifier() != null); + } + + + + public void addSource(SortedKeyValueIterator<Key,Value> source, IteratorEnvironment env, TextColumn column) { + // Check if we have space for the added Source + if (sources == null) { + sources = new TermSource[1]; + } else { + // allocate space for node, and copy current tree. + // TODO: Should we change this to an ArrayList so that we can just add() ? - ACCUMULO-1309 + TermSource[] localSources = new TermSource[sources.length + 1]; + int currSource = 0; + for (TermSource myTerm : sources) { + // TODO: Do I need to call new here? or can I just re-use the term? - ACCUMULO-1309 + localSources[currSource] = new TermSource(myTerm); + currSource++; + } + sources = localSources; + } + sources[sourcesCount] = new TermSource(source.deepCopy(env), column); + sourcesCount++; + } + + /** + * Encode the columns to be used when iterating. + * + * @param cfg + * @param columns + */ + public static void setColumnFamilies(IteratorSetting cfg, TextColumn[] columns) { + if (columns.length < 2) + throw new IllegalArgumentException("Must supply at least two terms to intersect"); + + boolean[] prefix = new boolean[columns.length]; + + for(int i = 0; i < columns.length; i++) { + prefix[i] = columns[i].isPrefix(); + } + + + + cfg.addOption(DocumentIndexIntersectingIterator.columnPrefix, DocumentIndexIntersectingIterator.encodeBooleans(prefix)); + cfg.addOption(DocumentIndexIntersectingIterator.columnOptionName, DocumentIndexIntersectingIterator.encodeColumns(columns)); + } + + + + + + public static void setContext(IteratorSetting cfg, String context) { + + cfg.addOption(DocumentIndexIntersectingIterator.context, DocumentIndexIntersectingIterator.encodeContext(context)); + + } + + + + + + + + + + + + + +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/accumulo/documentIndex/TextColumn.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/accumulo/documentIndex/TextColumn.java b/extras/indexing/src/main/java/org/apache/rya/accumulo/documentIndex/TextColumn.java new file mode 100644 index 0000000..661f62b --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/accumulo/documentIndex/TextColumn.java @@ -0,0 +1,108 @@ +package mvm.rya.accumulo.documentIndex; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import org.apache.hadoop.io.Text; + +public class TextColumn { + + + private Text columnFamily; + private Text columnQualifier; + private boolean isPrefix = false; + + + + public TextColumn(Text columnFamily, Text columnQualifier) { + this.columnFamily = columnFamily; + this.columnQualifier = columnQualifier; + } + + + public TextColumn(TextColumn other) { + + this.columnFamily = new Text(other.columnFamily); + this.columnQualifier = new Text(other.columnQualifier); + this.isPrefix = other.isPrefix; + + } + + + public Text getColumnFamily() { + return columnFamily; + } + + + public boolean isPrefix() { + return isPrefix; + } + + + public void setIsPrefix(boolean isPrefix) { + this.isPrefix = isPrefix; + } + + + public boolean isValid() { + return (columnFamily != null && columnQualifier != null); + } + + + + public Text getColumnQualifier() { + return columnQualifier; + } + + + public void setColumnFamily(Text cf) { + this.columnFamily = cf; + } + + public void setColumnQualifier(Text cq) { + this.columnQualifier = cq; + } + + public String toString() { + + return columnFamily.toString() + ", " + columnQualifier.toString() + ", prefix:" + isPrefix; + } + + @Override + public boolean equals(Object other) { + + if(other == null) { + return false; + } + + if(!(other instanceof TextColumn)) { + return false; + } + + TextColumn tc = (TextColumn) other; + + return this.columnFamily.equals(tc.columnFamily) && this.columnQualifier.equals(tc.columnQualifier) && this.isPrefix == tc.isPrefix; + + + + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/accumulo/pcj/iterators/BindingSetHashJoinIterator.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/accumulo/pcj/iterators/BindingSetHashJoinIterator.java b/extras/indexing/src/main/java/org/apache/rya/accumulo/pcj/iterators/BindingSetHashJoinIterator.java new file mode 100644 index 0000000..0966903 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/accumulo/pcj/iterators/BindingSetHashJoinIterator.java @@ -0,0 +1,324 @@ +package mvm.rya.accumulo.pcj.iterators; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import info.aduna.iteration.CloseableIteration; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NoSuchElementException; +import java.util.Set; + +import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; + +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; + +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; + +/** + * This {@link CloseableIteration} performs a hash join by joining each + * {@link Map.Entry<String, BindingSet>} with all corresponding + * {@link BindingSet} in a Multimap with the same String key. + * + */ +public class BindingSetHashJoinIterator implements + CloseableIteration<BindingSet, QueryEvaluationException> { + + //BindingSets passed to PCJ mapped according to values + //associated with common variables with table + private Multimap<String, BindingSet> bindingJoinVarHash; + //BindingSets taken from PCJ table + private CloseableIteration<Map.Entry<String, BindingSet>, QueryEvaluationException> joinIter; + private Iterator<BindingSet> joinedBindingSets = Collections + .emptyIterator(); + //If PCJ contains LeftJoin, this is a set of variable in LeftJoin. Used when performing Join. + private Set<String> unAssuredVariables; + //indicates when HashJoin formed from a single collection of join variable or if the size and + //collection of join variables varies -- this is to optimize the join process + private HashJoinType type; + private final BindingSet EMPTY_BINDINGSET = new QueryBindingSet(); + private BindingSet next; + private boolean hasNextCalled = false; + private boolean isEmpty = false; + + /** + * Enum type to indicate whether HashJoin will be performed over a fixed + * subset of variables common to each {@link BindingSet}, or if there is a + * collection of variable subsets over which to join. + * + */ + public enum HashJoinType { + CONSTANT_JOIN_VAR, VARIABLE_JOIN_VAR + }; + + public BindingSetHashJoinIterator( + Multimap<String, BindingSet> bindingJoinVarHash, + CloseableIteration<Map.Entry<String, BindingSet>, QueryEvaluationException> joinIter, + Set<String> unAssuredVariables, HashJoinType type) { + this.bindingJoinVarHash = bindingJoinVarHash; + this.joinIter = joinIter; + this.type = type; + this.unAssuredVariables = unAssuredVariables; + } + + @Override + public boolean hasNext() throws QueryEvaluationException { + if (!hasNextCalled && !isEmpty) { + while (joinedBindingSets.hasNext() || joinIter.hasNext()) { + if (!joinedBindingSets.hasNext()) { + Entry<String, BindingSet> entry = joinIter.next(); + joinedBindingSets = joinBindingSetEntry(entry); + } + if (!joinedBindingSets.hasNext()) { + continue; + } + next = joinedBindingSets.next(); + hasNextCalled = true; + return true; + } + + isEmpty = true; + return false; + } else if (isEmpty) { + return false; + } else { + return true; + } + } + + @Override + public BindingSet next() throws QueryEvaluationException { + if (hasNextCalled) { + hasNextCalled = false; + } else if (isEmpty) { + throw new NoSuchElementException(); + } else { + if (this.hasNext()) { + hasNextCalled = false; + } else { + throw new NoSuchElementException(); + } + } + return next; + } + + @Override + public void remove() throws QueryEvaluationException { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws QueryEvaluationException { + joinIter.close(); + } + + /** + * This method takes the valOrderString, which is a key used for computing + * hash joins, and generates multiple keys by pulling off one delimiter + * separated component at a time. This is used when the size of the join key + * varies from {@link Map.Entry} to Entry. It allows the BindingSet to be + * joined using all prefixes of the key. + * + * @param valOrderString + * - key used for hash join + * @return + */ + private List<String> getValueOrders(String valOrderString) { + + List<String> valueOrders = new ArrayList<>(); + String[] splitValOrderString = valOrderString + .split(ExternalTupleSet.VALUE_DELIM); + StringBuffer buffer = new StringBuffer(); + buffer.append(splitValOrderString[0]); + valueOrders.add(buffer.substring(0)); + + for (int i = 1; i < splitValOrderString.length; i++) { + buffer.append(ExternalTupleSet.VALUE_DELIM + splitValOrderString[i]); + valueOrders.add(buffer.substring(0)); + } + + return valueOrders; + } + + /** + * This method verifies that all common variables have a common value and + * then joins the BindingSets together. In the case that the PCJ contains a + * LeftJoin, if the leftBs and rightBs have a common variable with distinct + * values and that common variable is unassured (only appears in LeftJoin), + * this method uses the value corresponding to leftBs. + * + * @param leftBs + * - BindingSet passed into PCJ + * @param rightBs + * - PCJ BindingSet + * @return - joined BindingSet + */ + private BindingSet joinBindingSets(BindingSet leftBs, BindingSet rightBs) { + + Set<String> commonVars = Sets.intersection(leftBs.getBindingNames(), + rightBs.getBindingNames()); + // compare values associated with common variables to make sure + // BindingSets can be joined. Possible for leftBs and rightBs + // to have a common unAssuredVariable in event PCJ contains LeftJoin. + // if values corresponding to common unAssuredVariable do not agree + // add value corresponding to leftBs + for (String s : commonVars) { + if (!leftBs.getValue(s).equals(rightBs.getValue(s)) + && !unAssuredVariables.contains(s)) { + return EMPTY_BINDINGSET; + } + } + QueryBindingSet bs = new QueryBindingSet(removeConstants(leftBs)); + + rightBs = removeConstants(rightBs); + // only add Bindings corresponding to variables that have no value + // assigned. This takes into account case where leftBs and rightBs + // share a common, unAssuredVariable. In this case, use value + // corresponding + // to leftBs, which is effectively performing a LeftJoin. + for (String s : rightBs.getBindingNames()) { + if (bs.getValue(s) == null) { + bs.addBinding(s, rightBs.getValue(s)); + } + } + + return bs; + } + + private BindingSet removeConstants(BindingSet bs) { + QueryBindingSet bSet = new QueryBindingSet(); + for (String s : bs.getBindingNames()) { + if (!s.startsWith(ExternalTupleSet.CONST_PREFIX)) { + bSet.addBinding(bs.getBinding(s)); + } + } + return bSet; + } + + /** + * This method returns an Iterator which joins the given Entry's BindingSet + * to all BindingSets which matching the Entry's key. + * + * @param entry - entry to be joined + * @return - Iterator over joined BindingSets + */ + private Iterator<BindingSet> joinBindingSetEntry( + Map.Entry<String, BindingSet> entry) { + + List<Collection<BindingSet>> matches = new ArrayList<>(); + if (type == HashJoinType.CONSTANT_JOIN_VAR) { + if (bindingJoinVarHash.containsKey(entry.getKey())) { + matches.add(bindingJoinVarHash.get(entry.getKey())); + } + } else { + List<String> valOrders = getValueOrders(entry.getKey()); + for (String s : valOrders) { + if (bindingJoinVarHash.containsKey(s)) { + matches.add(bindingJoinVarHash.get(s)); + } + } + } + + if (matches.size() == 0) { + return Collections.emptyIterator(); + } else { + return new BindingSetCollectionsJoinIterator(entry.getValue(), + matches); + } + + } + + /** + * Given a BindingSet and a List of Collections of BindingSets, this + * Iterator joins the BindingSet with the BindingSets in each Collection + * + */ + private class BindingSetCollectionsJoinIterator implements + Iterator<BindingSet> { + + private Iterator<Collection<BindingSet>> collectionIter; + private Iterator<BindingSet> bsIter = Collections.emptyIterator(); + private BindingSet next; + private BindingSet joinBs; + private boolean hasNextCalled = false; + private boolean isEmpty = false; + + public BindingSetCollectionsJoinIterator(BindingSet bs, + List<Collection<BindingSet>> collection) { + this.collectionIter = collection.iterator(); + this.joinBs = bs; + } + + @Override + public boolean hasNext() { + + if (!hasNextCalled && !isEmpty) { + while (bsIter.hasNext() || collectionIter.hasNext()) { + if (!bsIter.hasNext()) { + bsIter = collectionIter.next().iterator(); + } + next = joinBindingSets(bsIter.next(), joinBs); + if (next == EMPTY_BINDINGSET) { + continue; + } + hasNextCalled = true; + return true; + } + isEmpty = true; + return false; + } else if (isEmpty) { + return false; + } else { + return true; + } + } + + @Override + public BindingSet next() { + if (hasNextCalled) { + hasNextCalled = false; + } else if (isEmpty) { + throw new NoSuchElementException(); + } else { + if (this.hasNext()) { + hasNextCalled = false; + } else { + throw new NoSuchElementException(); + } + } + return next; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/accumulo/pcj/iterators/IteratorCombiner.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/accumulo/pcj/iterators/IteratorCombiner.java b/extras/indexing/src/main/java/org/apache/rya/accumulo/pcj/iterators/IteratorCombiner.java new file mode 100644 index 0000000..2407865 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/accumulo/pcj/iterators/IteratorCombiner.java @@ -0,0 +1,107 @@ +package mvm.rya.accumulo.pcj.iterators; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import info.aduna.iteration.CloseableIteration; + +import java.util.Collection; +import java.util.Iterator; +import java.util.NoSuchElementException; + +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; + +import com.google.common.base.Preconditions; + +/** + * This {@link CloseableIteration} takes in a list of CloseableIterations + * and merges them together into a single CloseableIteration. + * + */ +public class IteratorCombiner implements + CloseableIteration<BindingSet, QueryEvaluationException> { + + + private Collection<CloseableIteration<BindingSet, QueryEvaluationException>> iterators; + private Iterator<CloseableIteration<BindingSet, QueryEvaluationException>> iteratorIterator; + private CloseableIteration<BindingSet, QueryEvaluationException> currIter; + private boolean isEmpty = false; + private boolean hasNextCalled = false; + private BindingSet next; + + public IteratorCombiner(Collection<CloseableIteration<BindingSet, QueryEvaluationException>> iterators) { + Preconditions.checkArgument(iterators.size() > 0); + this.iterators = iterators; + iteratorIterator = iterators.iterator(); + currIter = iteratorIterator.next(); + } + + @Override + public boolean hasNext() throws QueryEvaluationException { + if (!hasNextCalled && !isEmpty) { + while (currIter.hasNext() || iteratorIterator.hasNext()) { + if(!currIter.hasNext()) { + currIter = iteratorIterator.next(); + } + if(!currIter.hasNext()) { + continue; + } + next = currIter.next(); + hasNextCalled = true; + return true; + } + isEmpty = true; + return false; + } else if (isEmpty) { + return false; + } else { + return true; + } + } + + @Override + public BindingSet next() throws QueryEvaluationException { + if (hasNextCalled) { + hasNextCalled = false; + } else if (isEmpty) { + throw new NoSuchElementException(); + } else { + if (this.hasNext()) { + hasNextCalled = false; + } else { + throw new NoSuchElementException(); + } + } + return next; + } + + @Override + public void remove() throws QueryEvaluationException { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws QueryEvaluationException { + for(CloseableIteration<BindingSet, QueryEvaluationException> iterator: iterators) { + iterator.close(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/accumulo/pcj/iterators/PCJKeyToCrossProductBindingSetIterator.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/accumulo/pcj/iterators/PCJKeyToCrossProductBindingSetIterator.java b/extras/indexing/src/main/java/org/apache/rya/accumulo/pcj/iterators/PCJKeyToCrossProductBindingSetIterator.java new file mode 100644 index 0000000..0c7369c --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/accumulo/pcj/iterators/PCJKeyToCrossProductBindingSetIterator.java @@ -0,0 +1,267 @@ +package mvm.rya.accumulo.pcj.iterators; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import info.aduna.iteration.CloseableIteration; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; + +import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; + +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjSerializer; +import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.openrdf.model.Value; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; + +import com.google.common.collect.HashBiMap; + +/** + * This class takes in a {@link Scanner} and a Collection of BindingSets, + * deserializes each {@link Map.Entry<Key,Value>} taken from the Scanner into + * a {@link BindingSet}, and performs a cross product on the BindingSet with + * each BindingSet in the provided Collection. The user can also specify a + * {@link Map<String, Value>} of constant constraints that can be used to filter. + * + */ +public class PCJKeyToCrossProductBindingSetIterator implements + CloseableIteration<BindingSet, QueryEvaluationException> { + + //BindingSets passed to PCJ used to form cross product + private List<BindingSet> crossProductBs; + //Scanner over PCJ table + private Scanner scanner; + //Iterator over PCJ scanner + private Iterator<Map.Entry<Key, org.apache.accumulo.core.data.Value>> iterator; + //Map of PCJ variables in table to variable in query + private Map<String, String> pcjVarMap; + //if PCJ contains LeftJoin, this is a set of variables that only appear in + //LeftJoin. Used when performing the cross product. + private Set<String> unAssuredVariables; + private final AccumuloPcjSerializer converter = new AccumuloPcjSerializer(); + private final BindingSet EMPTY_BINDINGSET = new QueryBindingSet(); + private Iterator<BindingSet> crossProductIter = Collections.emptyIterator(); + private Map<String, Value> constantConstraints; + private BindingSet next; + private boolean hasNextCalled = false; + private boolean isEmpty = false; + private boolean crossProductBsExist = false; + private boolean constantConstraintsExist = false; + + public PCJKeyToCrossProductBindingSetIterator(Scanner scanner, + List<BindingSet> crossProductBs, + Map<String, Value> constantConstraints, Set<String> unAssuredVariables, + Map<String, String> pcjVarMap) { + this.crossProductBs = crossProductBs; + this.scanner = scanner; + this.iterator = scanner.iterator(); + this.pcjVarMap = HashBiMap.create(pcjVarMap).inverse(); + this.constantConstraints = constantConstraints; + this.crossProductBsExist = crossProductBs.size() > 0; + this.constantConstraintsExist = constantConstraints.size() > 0; + this.unAssuredVariables = unAssuredVariables; + } + + @Override + public boolean hasNext() throws QueryEvaluationException { + if (!hasNextCalled && !isEmpty) { + if (crossProductBsExist) { + while (crossProductIter.hasNext() || iterator.hasNext()) { + if (!crossProductIter.hasNext()) { + Key key = iterator.next().getKey(); + try { + crossProductIter = getCrossProducts(getBindingSet(key)); + } catch (BindingSetConversionException e) { + throw new QueryEvaluationException(e); + } + } + if (!crossProductIter.hasNext()) { + continue; + } + next = crossProductIter.next(); + hasNextCalled = true; + return true; + } + } else { + while (iterator.hasNext()) { + Key key = iterator.next().getKey(); + try { + next = getBindingSet(key); + } catch (BindingSetConversionException e) { + throw new QueryEvaluationException(e); + } + //BindingSet cannot be deserialized or is filtered + //out by constant constraints + if (next == null || next == EMPTY_BINDINGSET) { + continue; + } + hasNextCalled = true; + return true; + } + } + isEmpty = true; + return false; + } else if (isEmpty) { + return false; + } else { + return true; + } + } + + @Override + public BindingSet next() throws QueryEvaluationException { + if (hasNextCalled) { + hasNextCalled = false; + } else if (isEmpty) { + throw new NoSuchElementException(); + } else { + if (this.hasNext()) { + hasNextCalled = false; + } else { + throw new NoSuchElementException(); + } + } + return next; + } + + @Override + public void remove() throws QueryEvaluationException { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws QueryEvaluationException { + scanner.close(); + } + + /** + * + * @param key + * - Accumulo key obtained from scan + * @return - BindingSet satisfying any specified constant constraints + * @throws BindingSetConversionException + * @throws QueryEvaluationException + */ + private BindingSet getBindingSet(Key key) + throws BindingSetConversionException, QueryEvaluationException { + byte[] row = key.getRow().getBytes(); + String[] varOrder = key.getColumnFamily().toString() + .split(ExternalTupleSet.VAR_ORDER_DELIM); + + BindingSet bindingSet = converter.convert(row, new VariableOrder( + varOrder)); + + QueryBindingSet bs = new QueryBindingSet(); + for (String var : bindingSet.getBindingNames()) { + String mappedVar = null; + if(pcjVarMap.containsKey(var)) { + mappedVar = pcjVarMap.get(var); + } else { + throw new QueryEvaluationException("PCJ Variable has no mapping to query variable."); + } + if (constantConstraintsExist) { + if (mappedVar.startsWith(ExternalTupleSet.CONST_PREFIX) + && constantConstraints.containsKey(mappedVar) + && !constantConstraints.get(mappedVar).equals( + bindingSet.getValue(var))) { + return EMPTY_BINDINGSET; + } + } + + if (!mappedVar.startsWith(ExternalTupleSet.CONST_PREFIX)) { + bs.addBinding(mappedVar, bindingSet.getValue(var)); + } + } + return bs; + } + + /** + * This method forms the cross-product between an input BindingSet and the + * BindingSets contained in crossProdcutBs. + * + * @param bs + * - {@link BindingSet} used to form cross product with + * cross-product BindingSets + * @return - Iterator over resulting cross-product + */ + private Iterator<BindingSet> getCrossProducts(BindingSet bs) { + Set<BindingSet> crossProducts = new HashSet<BindingSet>(); + + for (BindingSet bSet : crossProductBs) { + BindingSet prod = takeCrossProduct(bSet, bs); + if (prod != EMPTY_BINDINGSET) { + crossProducts.add(prod); + } + } + + return crossProducts.iterator(); + + } + + /** + * This method compute the cross product of the BindingSet passed to the PCJ + * and the PCJ BindingSet. It verifies that only common variables are unassured + * variables, and if leftBs and rightBs have distinct values for a given variable, + * this method uses the value from leftBs in the cross product BindingSet - this + * is effectively performing a LeftJoin. + * + * @param leftBs - BindingSet passed to PCJ + * @param rightBs - PCJ BindingSet + * @return - cross product BindingSet + */ + private BindingSet takeCrossProduct(BindingSet leftBs, BindingSet rightBs) { + if (bindingSetsIntersect(leftBs, rightBs)) { + return EMPTY_BINDINGSET; + } + QueryBindingSet bs = new QueryBindingSet(leftBs); + + //only add Bindings corresponding to variables that have no value + //assigned. This takes into account case where leftBs and rightBs + //share a common, unAssuredVariable. In this case, use value corresponding + //to leftBs, which is effectively performing a LeftJoin. + for(String s: rightBs.getBindingNames()) { + if(bs.getValue(s) == null) { + bs.addBinding(s, rightBs.getValue(s)); + } + } + return bs; + } + + private boolean bindingSetsIntersect(BindingSet bs1, BindingSet bs2) { + + for(String s: bs1.getBindingNames()) { + if(bs2.getValue(s) != null && !unAssuredVariables.contains(s)) { + return true; + } + } + return false; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/accumulo/pcj/iterators/PCJKeyToJoinBindingSetIterator.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/accumulo/pcj/iterators/PCJKeyToJoinBindingSetIterator.java b/extras/indexing/src/main/java/org/apache/rya/accumulo/pcj/iterators/PCJKeyToJoinBindingSetIterator.java new file mode 100644 index 0000000..1b821d4 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/accumulo/pcj/iterators/PCJKeyToJoinBindingSetIterator.java @@ -0,0 +1,199 @@ +package mvm.rya.accumulo.pcj.iterators; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import info.aduna.iteration.CloseableIteration; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NoSuchElementException; + +import mvm.rya.api.RdfCloudTripleStoreUtils; +import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; + +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjSerializer; +import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.openrdf.model.Value; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; + +import com.google.common.base.Preconditions; +import com.google.common.collect.HashBiMap; + +/** + * This class takes in a {@link Scanner} and a Collection of BindingSets, + * deserializes each {@link Map.Entry<Key,Value>} taken from the Scanner into a + * {@link BindingSet}, and creates a {@link Map.Entry<String, BindingSet>} + * object to perform as hash join. The user can also specify a {@link Map + * <String, Value>} of constant constraints that can be used to filter. + * + */ +public class PCJKeyToJoinBindingSetIterator + implements + CloseableIteration<Map.Entry<String, BindingSet>, QueryEvaluationException> { + + //map of variables as they appear in PCJ table to query variables + private Map<String, String> pcjVarMap; + //constant constraints used for filtering + private Map<String, Value> constantConstraints; + //max number of variables an entry in the batch of BindingSets had in common with PCJ + //this is used for constructing hash join key. + private int maxPrefixLen; + private static final AccumuloPcjSerializer converter = new AccumuloPcjSerializer(); + private final Map.Entry<String, BindingSet> EMPTY_ENTRY = new RdfCloudTripleStoreUtils.CustomEntry<String, BindingSet>( + "", new QueryBindingSet()); + private Iterator<Entry<Key, org.apache.accumulo.core.data.Value>> iterator; + private boolean hasNextCalled = false; + private boolean isEmpty = false; + private Map.Entry<String, BindingSet> next; + private BatchScanner scanner; + + public PCJKeyToJoinBindingSetIterator(BatchScanner scanner, + Map<String, String> pcjVarMap, + Map<String, Value> constantConstraints, int maxPrefixLen) { + Preconditions.checkNotNull(scanner); + Preconditions.checkArgument(pcjVarMap.size() > 0, + "Variable map must contain at least one variable!"); + Preconditions.checkNotNull(constantConstraints, + "Constant constraints cannot be null."); + Preconditions.checkArgument(maxPrefixLen > 0, + "Max prefix length must be greater than 0."); + Preconditions + .checkArgument(maxPrefixLen <= pcjVarMap.size(), + "Max prefix length must be less than total number of binding names."); + this.scanner = scanner; + this.pcjVarMap = HashBiMap.create(pcjVarMap).inverse(); + this.constantConstraints = constantConstraints; + this.maxPrefixLen = maxPrefixLen; + this.iterator = scanner.iterator(); + + } + + public PCJKeyToJoinBindingSetIterator(BatchScanner scanner, + Map<String, String> pcjVarMap, int maxPrefixLen) { + this(scanner, pcjVarMap, new HashMap<String, Value>(), maxPrefixLen); + } + + @Override + public boolean hasNext() throws QueryEvaluationException { + + if (!hasNextCalled && !isEmpty) { + while (iterator.hasNext()) { + Key key = iterator.next().getKey(); + // get bindings from scan without values associated with + // constant constraints + try { + next = getBindingSetEntryAndMatchConstants(key); + } catch (BindingSetConversionException e) { + throw new QueryEvaluationException( + "Could not deserialize PCJ BindingSet."); + } + // skip key if constant constraint don't match + if (next == EMPTY_ENTRY) { + continue; + } + hasNextCalled = true; + return true; + } + isEmpty = true; + return false; + } else if (isEmpty) { + return false; + } else { + return true; + } + } + + @Override + public Entry<String, BindingSet> next() throws QueryEvaluationException { + if (hasNextCalled) { + hasNextCalled = false; + } else if (isEmpty) { + throw new NoSuchElementException(); + } else { + if (this.hasNext()) { + hasNextCalled = false; + } else { + throw new NoSuchElementException(); + } + } + return next; + } + + @Override + public void remove() throws QueryEvaluationException { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws QueryEvaluationException { + scanner.close(); + } + + /** + * + * @param key + * - Accumulo key obtained from scan + * @return - Entry<String,BindingSet> satisfying the constant constraints + * @throws BindingSetConversionException + */ + private Map.Entry<String, BindingSet> getBindingSetEntryAndMatchConstants( + Key key) throws BindingSetConversionException { + byte[] row = key.getRow().getBytes(); + String[] varOrder = key.getColumnFamily().toString() + .split(ExternalTupleSet.VAR_ORDER_DELIM); + + BindingSet bindingSet = converter.convert(row, new VariableOrder( + varOrder)); + + QueryBindingSet bs = new QueryBindingSet(); + for (String var : bindingSet.getBindingNames()) { + String mappedVar = pcjVarMap.get(var); + if (mappedVar.startsWith(ExternalTupleSet.CONST_PREFIX) + && constantConstraints.containsKey(mappedVar) + && !constantConstraints.get(mappedVar).equals( + bindingSet.getValue(var))) { + return EMPTY_ENTRY; + } else { + bs.addBinding(mappedVar, bindingSet.getValue(var)); + } + } + + String orderedValueString = bindingSet.getValue(varOrder[0]).toString(); + for (int i = 1; i < maxPrefixLen; i++) { + Value value = bindingSet.getValue(varOrder[i]); + if (value != null) { + orderedValueString = orderedValueString + + ExternalTupleSet.VALUE_DELIM + value.toString(); + } + } + + return new RdfCloudTripleStoreUtils.CustomEntry<String, BindingSet>( + orderedValueString, bs); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java new file mode 100644 index 0000000..53f29f4 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.api.client.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.storage.PcjMetadata; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; +import org.openrdf.sail.Sail; +import org.openrdf.sail.SailConnection; +import org.openrdf.sail.SailException; + +import com.google.common.base.Optional; + +import info.aduna.iteration.CloseableIteration; +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository; +import mvm.rya.api.client.BatchUpdatePCJ; +import mvm.rya.api.client.InstanceDoesNotExistException; +import mvm.rya.api.client.PCJDoesNotExistException; +import mvm.rya.api.client.RyaClientException; +import mvm.rya.api.instance.RyaDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy; +import mvm.rya.api.instance.RyaDetailsRepository; +import mvm.rya.api.instance.RyaDetailsRepository.NotInitializedException; +import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; +import mvm.rya.api.instance.RyaDetailsUpdater; +import mvm.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator; +import mvm.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.rdftriplestore.inference.InferenceEngineException; +import mvm.rya.sail.config.RyaSailFactory; + +/** + * Uses an in memory Rya Client to batch update a PCJ index. + */ +public class AccumuloBatchUpdatePCJ extends AccumuloCommand implements BatchUpdatePCJ { + + private static final Logger log = Logger.getLogger(AccumuloBatchUpdatePCJ.class); + + /** + * Constructs an instance of {@link AccumuloBatchUpdatePCJ}. + * + * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null) + * @param connector - Provides programmatic access to the instance of Accumulo that hosts Rya instance. (not null) + */ + public AccumuloBatchUpdatePCJ(final AccumuloConnectionDetails connectionDetails, final Connector connector) { + super(connectionDetails, connector); + } + + @Override + public void batchUpdate(final String ryaInstanceName, final String pcjId) throws InstanceDoesNotExistException, PCJDoesNotExistException, RyaClientException { + requireNonNull(ryaInstanceName); + requireNonNull(pcjId); + verifyPCJState(ryaInstanceName, pcjId); + updatePCJResults(ryaInstanceName, pcjId); + updatePCJMetadata(ryaInstanceName, pcjId); + } + + private void verifyPCJState(final String ryaInstanceName, final String pcjId) throws RyaClientException { + try { + // Fetch the Rya instance's details. + final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(super.getConnector(), ryaInstanceName); + final RyaDetails ryaDetails = detailsRepo.getRyaInstanceDetails(); + + // Ensure PCJs are enabled. + if(!ryaDetails.getPCJIndexDetails().isEnabled()) { + throw new RyaClientException("PCJs are not enabled for the Rya instance named '" + ryaInstanceName + "'."); + } + + // Ensure the PCJ exists. + if(!ryaDetails.getPCJIndexDetails().getPCJDetails().containsKey(pcjId)) { + throw new PCJDoesNotExistException("The PCJ with id '" + pcjId + "' does not exist within Rya instance '" + ryaInstanceName + "'."); + } + + // Ensure the PCJ is not already being incrementally updated. + final PCJDetails pcjDetails = ryaDetails.getPCJIndexDetails().getPCJDetails().get(pcjId); + final Optional<PCJUpdateStrategy> updateStrategy = pcjDetails.getUpdateStrategy(); + if(updateStrategy.isPresent() && updateStrategy.get() == PCJUpdateStrategy.INCREMENTAL) { + throw new RyaClientException("The PCJ with id '" + pcjId + "' is already being updated incrementally."); + } + } catch(final NotInitializedException e) { + throw new InstanceDoesNotExistException("No RyaDetails are initialized for the Rya instance named '" + ryaInstanceName + "'.", e); + } catch (final RyaDetailsRepositoryException e) { + throw new RyaClientException("Could not fetch the RyaDetails for the Rya instance named '" + ryaInstanceName + "'.", e); + } + } + + private void updatePCJResults(final String ryaInstanceName, final String pcjId) throws InstanceDoesNotExistException, PCJDoesNotExistException, RyaClientException { + // Things that have to be closed before we exit. + Sail sail = null; + SailConnection sailConn = null; + CloseableIteration<? extends BindingSet, QueryEvaluationException> results = null; + + try { + // Create an instance of Sail backed by the Rya instance. + sail = connectToRya(ryaInstanceName); + + // Purge the old results from the PCJ. + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(super.getConnector(), ryaInstanceName); + try { + pcjStorage.purge(pcjId); + } catch (final PCJStorageException e) { + throw new RyaClientException("Could not batch update PCJ with ID '" + pcjId + "' because the old " + + "results could not be purged from it.", e); + } + + try { + // Parse the PCJ's SPARQL query. + final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId); + final String sparql = metadata.getSparql(); + final SPARQLParser parser = new SPARQLParser(); + final ParsedQuery parsedQuery = parser.parseQuery(sparql, null); + + // Execute the query. + sailConn = sail.getConnection(); + results = sailConn.evaluate(parsedQuery.getTupleExpr(), null, null, false); + + // Load the results into the PCJ table. + final List<VisibilityBindingSet> batch = new ArrayList<>(1000); + + while(results.hasNext()) { + final VisibilityBindingSet result = new VisibilityBindingSet(results.next(), ""); + batch.add(result); + + if(batch.size() == 1000) { + pcjStorage.addResults(pcjId, batch); + batch.clear(); + } + } + + if(!batch.isEmpty()) { + pcjStorage.addResults(pcjId, batch); + batch.clear(); + } + } catch(final MalformedQueryException | PCJStorageException | SailException | QueryEvaluationException e) { + throw new RyaClientException("Fail to batch load new results into the PCJ with ID '" + pcjId + "'.", e); + } + } finally { + if(results != null) { + try { + results.close(); + } catch (final QueryEvaluationException e) { + log.warn(e.getMessage(), e); + } + } + + if(sailConn != null) { + try { + sailConn.close(); + } catch (final SailException e) { + log.warn(e.getMessage(), e); + } + } + + if(sail != null) { + try { + sail.shutDown(); + } catch (final SailException e) { + log.warn(e.getMessage(), e); + } + } + } + } + + private Sail connectToRya(final String ryaInstanceName) throws RyaClientException { + try { + final AccumuloConnectionDetails connectionDetails = super.getAccumuloConnectionDetails(); + + final AccumuloRdfConfiguration ryaConf = new AccumuloRdfConfiguration(); + ryaConf.setTablePrefix(ryaInstanceName); + ryaConf.set(ConfigUtils.CLOUDBASE_USER, connectionDetails.getUsername()); + ryaConf.set(ConfigUtils.CLOUDBASE_PASSWORD, new String(connectionDetails.getPassword())); + ryaConf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, connectionDetails.getZookeepers()); + ryaConf.set(ConfigUtils.CLOUDBASE_INSTANCE, connectionDetails.getInstanceName()); + + // Turn PCJs off so that we will only scan the core Rya tables while building the PCJ results. + ryaConf.set(ConfigUtils.USE_PCJ, "false"); + + return RyaSailFactory.getInstance(ryaConf); + } catch (SailException | AccumuloException | AccumuloSecurityException | RyaDAOException | InferenceEngineException e) { + throw new RyaClientException("Could not connect to the Rya instance named '" + ryaInstanceName + "'.", e); + } + } + + private void updatePCJMetadata(final String ryaInstanceName, final String pcjId) throws RyaClientException { + // Update the PCJ's metadata to indicate it was just batch updated. + try { + final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(super.getConnector(), ryaInstanceName); + + new RyaDetailsUpdater(detailsRepo).update(new RyaDetailsMutator() { + @Override + public RyaDetails mutate(final RyaDetails originalDetails) throws CouldNotApplyMutationException { + // Update the original PCJ Details to indicate they were batch updated. + final PCJDetails originalPCJDetails = originalDetails.getPCJIndexDetails().getPCJDetails().get(pcjId); + final PCJDetails.Builder mutatedPCJDetails = PCJDetails.builder( originalPCJDetails ) + .setUpdateStrategy( PCJUpdateStrategy.BATCH ) + .setLastUpdateTime( new Date()); + + // Replace the old PCJ Details with the updated ones. + final RyaDetails.Builder builder = RyaDetails.builder(originalDetails); + builder.getPCJIndexDetails().addPCJDetails( mutatedPCJDetails ); + return builder.build(); + } + }); + } catch (final RyaDetailsRepositoryException | CouldNotApplyMutationException e) { + throw new RyaClientException("Could not update the PCJ's metadata.", e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCommand.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCommand.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCommand.java new file mode 100644 index 0000000..078e985 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCommand.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.api.client.accumulo; + +import static java.util.Objects.requireNonNull; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.accumulo.core.client.Connector; + +/** + * An abstract class that holds onto Accumulo access information. Extend this + * when implementing a command that interacts with Accumulo. + */ +@ParametersAreNonnullByDefault +public abstract class AccumuloCommand { + + private final AccumuloConnectionDetails connectionDetails; + private final Connector connector; + + /** + * Constructs an instance of {@link AccumuloCommand}. + * + * Details about the values that were used to create the connector to the cluster. (not null) + * @param connector - Provides programatic access to the instance of Accumulo + * that hosts Rya instance. (not null) + */ + public AccumuloCommand( + final AccumuloConnectionDetails connectionDetails, + final Connector connector) { + this.connectionDetails = requireNonNull( connectionDetails ); + this.connector = requireNonNull(connector); + } + + /** + * @return Details about the values that were used to create the connector to the cluster. (not null) + */ + public AccumuloConnectionDetails getAccumuloConnectionDetails() { + return connectionDetails; + } + + /** + * @return Provides programatic access to the instance of Accumulo that hosts Rya instance. + */ + public Connector getConnector() { + return connector; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloConnectionDetails.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloConnectionDetails.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloConnectionDetails.java new file mode 100644 index 0000000..c0a7be7 --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloConnectionDetails.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.api.client.accumulo; + +import static java.util.Objects.requireNonNull; + +import javax.annotation.ParametersAreNonnullByDefault; +import javax.annotation.concurrent.Immutable; + +/** + * The information that the shell used to connect to Accumulo. + */ +@Immutable +@ParametersAreNonnullByDefault +public class AccumuloConnectionDetails { + private final String username; + private final char[] password; + private final String instanceName; + private final String zookeepers; + + /** + * Constructs an instance of {@link AccumuloConnectionDetails}. + * + * @param username - The username that was used to establish the connection. (not null) + * @param password - The password that was used to establish the connection. (not null) + * @param instanceName - The Accumulo instance name that was used to establish the connection. (not null) + * @param zookeepers - The list of zookeeper hostname that were used to establish the connection. (not null) + */ + public AccumuloConnectionDetails( + final String username, + final char[] password, + final String instanceName, + final String zookeepers) { + this.username = requireNonNull(username); + this.password = requireNonNull(password); + this.instanceName = requireNonNull(instanceName); + this.zookeepers = requireNonNull(zookeepers); + } + + /** + * @return The username that was used to establish the connection. + */ + public String getUsername() { + return username; + } + + /** + * @return The password that was used to establish the connection. + */ + public char[] getPassword() { + return password; + } + + /** + * @return The Accumulo instance name that was used to establish the connection. + */ + public String getInstanceName() { + return instanceName; + } + + /** + * @return The list of zookeeper hostname that were used to establish the connection. + */ + public String getZookeepers() { + return zookeepers; + } +} \ No newline at end of file
