http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/indexing/pom.xml ---------------------------------------------------------------------- diff --git a/extras/indexing/pom.xml b/extras/indexing/pom.xml deleted file mode 100644 index f484916..0000000 --- a/extras/indexing/pom.xml +++ /dev/null @@ -1,128 +0,0 @@ -<?xml version='1.0'?> - -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.rya</groupId> - <artifactId>rya.extras</artifactId> - <version>3.2.10-SNAPSHOT</version> - </parent> - - <artifactId>rya.indexing</artifactId> - <name>Apache Rya Secondary Indexing</name> - - <dependencies> - <dependency> - <groupId>org.apache.rya</groupId> - <artifactId>rya.sail</artifactId> - <exclusions> - <exclusion> - <artifactId>hsqldb</artifactId> - <groupId>hsqldb</groupId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.rya</groupId> - <artifactId>accumulo.rya</artifactId> - </dependency> - <dependency> - <groupId>org.apache.rya</groupId> - <artifactId>mongodb.rya</artifactId> - </dependency> - <dependency> - <groupId>org.apache.rya</groupId> - <artifactId>rya.prospector</artifactId> - </dependency> - - <!-- Free Text Indexing --> - <dependency> - <groupId>org.apache.lucene</groupId> - <artifactId>lucene-core</artifactId> - </dependency> - <dependency> - <groupId>org.apache.lucene</groupId> - <artifactId>lucene-analyzers</artifactId> - </dependency> - - <dependency> - <groupId>commons-codec</groupId> - <artifactId>commons-codec</artifactId> - </dependency> - - <!-- Geo Indexing --> - <dependency> - <groupId>org.locationtech.geomesa</groupId> - <artifactId>geomesa-accumulo-datastore</artifactId> - </dependency> - - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <executions> - <execution> - <configuration> - <shadedArtifactAttached>true</shadedArtifactAttached> - <shadedClassifierName>map-reduce</shadedClassifierName> - <transformers> - <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> - </transformers> - </configuration> - </execution> - <execution> - <id>accumulo-server</id> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - <configuration> - <shadedArtifactAttached>true</shadedArtifactAttached> - <shadedClassifierName>accumulo-server</shadedClassifierName> - <artifactSet> - <excludes> - <exclude>org.locationtech.geomesa:*</exclude> - <exclude>scala:*</exclude> - <exclude>org.apache.accumulo:*</exclude> - <exclude>org.apache.thrift:*</exclude> - <exclude>org.apache.hadoop:*</exclude> - <exclude>org.apache.zookeeper:*</exclude> - </excludes> - </artifactSet> - <transformers> - <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> - </transformers> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocIndexIteratorUtil.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocIndexIteratorUtil.java b/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocIndexIteratorUtil.java deleted file mode 100644 index fefd651..0000000 --- a/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocIndexIteratorUtil.java +++ /dev/null @@ -1,31 +0,0 @@ -package mvm.rya.accumulo.documentIndex; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -public class DocIndexIteratorUtil { - - - - public static final String DOC_ID_INDEX_DELIM = "\u001D" + "\u001E"; - - - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocumentIndexIntersectingIterator.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocumentIndexIntersectingIterator.java b/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocumentIndexIntersectingIterator.java deleted file mode 100644 index ad38b2b..0000000 --- a/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocumentIndexIntersectingIterator.java +++ /dev/null @@ -1,850 +0,0 @@ -package mvm.rya.accumulo.documentIndex; - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.io.IOException; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; - -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.data.ArrayByteSequence; -import org.apache.accumulo.core.data.ByteSequence; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.PartialKey; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.iterators.IteratorEnvironment; -import org.apache.accumulo.core.iterators.SortedKeyValueIterator; -import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; -import org.apache.accumulo.core.util.TextUtil; -import org.apache.commons.codec.binary.Base64; -import org.apache.hadoop.io.Text; -import org.apache.log4j.Logger; - -/** - * This iterator facilitates document-partitioned indexing. It involves grouping a set of documents together and indexing those documents into a single row of - * an Accumulo table. This allows a tablet server to perform boolean AND operations on terms in the index. - * - * The table structure should have the following form: - * - * row: shardID, colfam: term, colqual: docID - * - * When you configure this iterator with a set of terms (column families), it will return only the docIDs that appear with all of the specified terms. The - * result will have an empty column family, as follows: - * - * row: shardID, colfam: (empty), colqual: docID - * - * This iterator is commonly used with BatchScanner or AccumuloInputFormat, to parallelize the search over all shardIDs. - * - * This iterator will *ignore* any columnFamilies passed to {@link #seek(Range, Collection, boolean)} as it performs intersections over terms. Extending classes - * should override the {@link TermSource#seekColfams} in their implementation's {@link #init(SortedKeyValueIterator, Map, IteratorEnvironment)} method. - * - * README.shard in docs/examples shows an example of using the IntersectingIterator. - */ -public class DocumentIndexIntersectingIterator implements SortedKeyValueIterator<Key,Value> { - - - - - protected Text nullText = new Text(); - - protected Text getRow(Key key) { - return key.getRow(); - } - - protected Text getTerm(Key key) { - return key.getColumnFamily(); - } - - protected Text getTermCond(Key key) { - return key.getColumnQualifier(); - } - - protected Key buildKey(Text row, TextColumn column) { - return new Key(row, (column.getColumnFamily() == null) ? nullText: column.getColumnFamily(), column.getColumnQualifier()); - } - - protected Key buildKey(Text row, Text term) { - return new Key(row, (term == null) ? nullText : term); - } - - protected Key buildKey(Text row, Text term, Text termCond) { - return new Key(row, (term == null) ? nullText : term, termCond); - } - - protected Key buildFollowRowKey(Key key, Text term, Text termCond) { - return new Key(getRow(key.followingKey(PartialKey.ROW)),(term == null) ? nullText : term, termCond); - } - - protected static final Logger log = Logger.getLogger(DocumentIndexIntersectingIterator.class); - - public static class TermSource { - public SortedKeyValueIterator<Key, Value> iter; - public Text term; - public Text termCond; - public Collection<ByteSequence> seekColfams; - public TextColumn column; - public boolean isPrefix; - public Key top ; - public Key next ; - public Text currentCQ; - private boolean seeked = false; - - public TermSource(TermSource other) { - - this.iter = other.iter; - this.term = other.term; - this.termCond = other.termCond; - this.seekColfams = other.seekColfams; - this.column = other.column; - this.top = other.top; - this.next = other.next; - this.currentCQ = other.currentCQ; - this.isPrefix = other.isPrefix; - } - - - public TermSource(SortedKeyValueIterator<Key, Value> iter, TextColumn column) { - - this.iter = iter; - this.column = column; - this.term = column.getColumnFamily(); - this.termCond = column.getColumnQualifier(); - this.currentCQ = new Text(emptyByteArray); - this.seekColfams = Collections.<ByteSequence> singletonList(new ArrayByteSequence(term - .getBytes(), 0, term.getLength())); - - } - - - - public void seek(Range r) throws IOException { - - if (seeked) { - - if (next != null && !r.beforeStartKey(next)) { - if (next.getColumnFamily().equals(term)) { - this.updateTop(); - } - } else if (iter.hasTop()) { - iter.seek(r, seekColfams, true); - this.updateTopNext(); - } else { - top = null; - next = null; - - } - } else { - - iter.seek(r, seekColfams, true); - this.updateTopNext(); - seeked = true; - } - - } - - - public void next() throws IOException { - - this.updateTop(); - } - - public void updateTop() throws IOException { - - top = next; - if (next != null) { - iter.next(); - if (iter.hasTop()) { - next = iter.getTopKey(); - } else { - next = null; - } - } - - } - - public void updateTopNext() throws IOException { - - if (iter.hasTop()) { - top = iter.getTopKey(); - } else { - top = null; - next = null; - return; - } - - iter.next(); - - if(iter.hasTop()) { - next = iter.getTopKey(); - } else { - next = null; - } - } - - public boolean hasTop() { - return top != null; - } - - - public String getTermString() { - return (this.term == null) ? new String("Iterator") : this.term.toString(); - } - } - - TermSource[] sources; - int sourcesCount = 0; - Range overallRange; - - // query-time settings - protected Text currentRow = null; - protected Text currentTermCond = new Text(emptyByteArray); - static final byte[] emptyByteArray = new byte[0]; - - protected Key topKey = null; - protected Value value = new Value(emptyByteArray); - protected String ctxt = null; - protected boolean hasContext = false; - protected boolean termCondSet = false; - - public DocumentIndexIntersectingIterator() {} - - @Override - public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { - //log.info("Calling deep copy on " + this); - return new DocumentIndexIntersectingIterator(this, env); - } - - private DocumentIndexIntersectingIterator(DocumentIndexIntersectingIterator other, IteratorEnvironment env) { - if (other.sources != null) { - sourcesCount = other.sourcesCount; - sources = new TermSource[sourcesCount]; - for (int i = 0; i < sourcesCount; i++) { - sources[i] = new TermSource(other.sources[i].iter.deepCopy(env), other.sources[i].column); - } - } - } - - @Override - public Key getTopKey() { - - return topKey; - } - - @Override - public Value getTopValue() { - // we don't really care about values - return value; - } - - @Override - public boolean hasTop() { - return currentRow != null; - } - - // precondition: currentRow is not null - private boolean seekOneSource(int sourceID) throws IOException { - // find the next key in the appropriate column family that is at or - // beyond the cursor (currentRow, currentCQ) - // advance the cursor if this source goes beyond it - // return whether we advanced the cursor - - // within this loop progress must be made in one of the following forms: - // - currentRow or currentCQ must be increased - // - the given source must advance its iterator - // this loop will end when any of the following criteria are met - // - the iterator for the given source is pointing to the key - // (currentRow, columnFamilies[sourceID], currentCQ) - // - the given source is out of data and currentRow is set to null - // - the given source has advanced beyond the endRow and currentRow is - // set to null - boolean advancedCursor = false; - - - - - - while (true) { - -// if(currentRow.toString().equals(s)) { -// log.info("Source id is " + sourceID); -// if (sources[sourceID].top != null) { -// log.info("Top row is " + getRow(sources[sourceID].top)); -// log.info("Top cq is " + getTermCond(sources[sourceID].top)); -// } -// if (sources[sourceID].next != null) { -// log.info("Next row is " + getRow(sources[sourceID].next)); -// log.info("Next termCond is " + getTermCond(sources[sourceID].next)); -// } -// } - - if (sources[sourceID].hasTop() == false) { - currentRow = null; - // setting currentRow to null counts as advancing the cursor - return true; - } - // check if we're past the end key - int endCompare = -1; - // we should compare the row to the end of the range - - if (overallRange.getEndKey() != null) { - endCompare = overallRange.getEndKey().getRow().compareTo(sources[sourceID].top.getRow()); - if ((!overallRange.isEndKeyInclusive() && endCompare <= 0) || endCompare < 0) { - currentRow = null; - // setting currentRow to null counts as advancing the cursor - return true; - } - } - - - - int rowCompare = currentRow.compareTo(getRow(sources[sourceID].top)); - // check if this source is already at or beyond currentRow - // if not, then seek to at least the current row - - - - if (rowCompare > 0) { - // seek to at least the currentRow - Key seekKey = buildKey(currentRow, sources[sourceID].term); - sources[sourceID].seek(new Range(seekKey, true, null, false)); - - continue; - } - // check if this source has gone beyond currentRow - // if so, advance currentRow - if (rowCompare < 0) { - currentRow.set(getRow(sources[sourceID].top)); - //log.info("Current row is " + currentRow); - advancedCursor = true; - continue; - } - // we have verified that the current source is positioned in - // currentRow - // now we must make sure we're in the right columnFamily in the - // current row - // Note: Iterators are auto-magically set to the correct - // columnFamily - - if (sources[sourceID].column.isValid()) { - - boolean isPrefix = false; - boolean contextEqual = false; - String tempContext = ""; - - int termCompare; - - String[] cQ = getTermCond(sources[sourceID].top).toString().split("\u0000"); - tempContext = cQ[0]; - - if (!hasContext && ctxt == null) { - ctxt = cQ[0]; - } - - contextEqual = ctxt.equals(cQ[0]); - - String s1 = sources[sourceID].termCond.toString(); - String s2 = cQ[1] + "\u0000" + cQ[2]; - - if (sources[sourceID].isPrefix) { - isPrefix = s2.startsWith(s1 + "\u0000"); - } else { - isPrefix = s2.startsWith(s1); - } - - termCompare = (contextEqual && isPrefix) ? 0 : (ctxt + "\u0000" + s1).compareTo(cQ[0] + "\u0000" + s2); - - // if(currentRow.toString().equals(s)) { - // log.info("Term compare is " + termCompare); - // } - - // check if this source is already on the right columnFamily - // if not, then seek forwards to the right columnFamily - if (termCompare > 0) { - Key seekKey = buildKey(currentRow, sources[sourceID].term, new Text(ctxt + - "\u0000" + sources[sourceID].termCond.toString())); - sources[sourceID].seek(new Range(seekKey, true, null, false)); - - continue; - } - // check if this source is beyond the right columnFamily - // if so, then seek to the next row - if (termCompare < 0) { - // we're out of entries in the current row, so seek to the - // next one - - if (endCompare == 0) { - // we're done - currentRow = null; - // setting currentRow to null counts as advancing the - // cursor - return true; - } - - - - //advance to next row if context set - all entries in given row exhausted - if (hasContext || tempContext.length() == 0) { - Key seekKey = buildFollowRowKey(sources[sourceID].top, sources[sourceID].term, - new Text(ctxt + "\u0000" + sources[sourceID].termCond.toString())); - sources[sourceID].seek(new Range(seekKey, true, null, false)); - } else { - - if(contextEqual && !isPrefix) { - Key seekKey = buildKey(currentRow, sources[sourceID].term, new Text(ctxt + "\u0001")); - sources[sourceID].seek(new Range(seekKey, true, null, false)); - if(sources[sourceID].top != null) { - ctxt = getTermCond(sources[sourceID].top).toString().split("\u0000")[0]; - } - } else { - Key seekKey = buildKey(currentRow, sources[sourceID].term, new Text(tempContext + - "\u0000" + sources[sourceID].termCond.toString())); - sources[sourceID].seek(new Range(seekKey, true, null, false)); - if(sources[sourceID].top != null) { - ctxt = getTermCond(sources[sourceID].top).toString().split("\u0000")[0]; - } - } - - } - - -// if(currentRow.toString().equals(s)) { -// log.info("current term cond is " + currentTermCond); -// -// } - - - continue; - } - } - - - - - - - - - - - //set currentTermCond -- gets appended to end of currentKey column qualifier - //used to determine which term iterator to advance when a new iterator is created - - sources[sourceID].currentCQ.set(getTermCond(sources[sourceID].top)); - - if (sources[sourceID].next != null) { - - //is hasContext, only consider sourceID with next having designated context - //otherwise don't set currentTermCond - if (!termCondSet && hasContext) { - if (sources[sourceID].next.getRow().equals(currentRow) - && sources[sourceID].next.getColumnQualifier().toString() - .startsWith(ctxt + "\u0000" + sources[sourceID].termCond.toString())) { - currentTermCond.set(new Text(Integer.toString(sourceID))); - termCondSet = true; - } - } else if(!termCondSet){ - String[] cq = getTermCond(sources[sourceID].next).toString().split("\u0000"); - - //set currentTermCond with preference given to sourceID having next with same context - //otherwise set currentTermCond sourceID with next having termCond as prefix - if (sources[sourceID].next.getRow().equals(currentRow)) { - if (sources[sourceID].next.getColumnQualifier().toString() - .startsWith(ctxt + "\u0000" + sources[sourceID].termCond.toString())) { - currentTermCond.set(new Text(Integer.toString(sourceID))); - termCondSet = true; - } else if ((cq[1] + "\u0000" + cq[2]).startsWith(sources[sourceID].termCond.toString())) { - currentTermCond.set(new Text(Integer.toString(sourceID))); - } - } - } - } - - - break; - } - - return advancedCursor; - } - - @Override - public void next() throws IOException { - if (currentRow == null) { - return; - } - - - - if(currentTermCond.getLength() != 0) { - - int id = Integer.parseInt(currentTermCond.toString()); - - sources[id].next(); - currentTermCond.set(emptyByteArray); - termCondSet = false; - if(sources[id].top != null && !hasContext) { - ctxt = getTermCond(sources[id].top).toString().split("\u0000")[0]; - } - advanceToIntersection(); - return; - } - - sources[0].next(); - if(sources[0].top != null && !hasContext) { - ctxt = getTermCond(sources[0].top).toString().split("\u0000")[0]; - } - advanceToIntersection(); - } - - protected void advanceToIntersection() throws IOException { - boolean cursorChanged = true; - while (cursorChanged) { - // seek all of the sources to at least the highest seen column qualifier in the current row - cursorChanged = false; - for (int i = 0; i < sourcesCount; i++) { -// log.info("New sourceID is " + i); - if (currentRow == null) { - topKey = null; - return; - } - if (seekOneSource(i)) { - currentTermCond.set(emptyByteArray); - termCondSet = false; - cursorChanged = true; - break; - } - } - } - String cq = ""; - for(int i = 0; i < sourcesCount; i++) { - cq = cq + sources[i].currentCQ.toString() + DocIndexIteratorUtil.DOC_ID_INDEX_DELIM; - } - - if (currentTermCond.getLength() == 0) { - topKey = buildKey(currentRow, nullText, new Text(cq + -1)); - } else { - topKey = buildKey(currentRow, nullText, new Text(cq + currentTermCond.toString())); - } - } - - public static String stringTopKey(SortedKeyValueIterator<Key,Value> iter) { - if (iter.hasTop()) - return iter.getTopKey().toString(); - return ""; - } - - private static final String columnOptionName = "columns"; - private static final String columnPrefix = "prefixes"; - private static final String context = "context"; - - - - protected static String encodeColumns(TextColumn[] columns) { - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < columns.length; i++) { - sb.append(new String(Base64.encodeBase64(TextUtil.getBytes(columns[i].getColumnFamily())))); - sb.append('\n'); - sb.append(new String(Base64.encodeBase64(TextUtil.getBytes(columns[i].getColumnQualifier())))); - sb.append('\u0001'); - } - return sb.toString(); - } - - - - protected static TextColumn[] decodeColumns(String columns) { - String[] columnStrings = columns.split("\u0001"); - TextColumn[] columnTexts = new TextColumn[columnStrings.length]; - for (int i = 0; i < columnStrings.length; i++) { - String[] columnComponents = columnStrings[i].split("\n"); - columnTexts[i] = new TextColumn(new Text(Base64.decodeBase64(columnComponents[0].getBytes())), - new Text(Base64.decodeBase64(columnComponents[1].getBytes()))); - } - return columnTexts; - } - - - - - - /** - * @param context - * @return encoded context - */ - protected static String encodeContext(String context) { - - return new String(Base64.encodeBase64(context.getBytes())); - } - - - - /** - * @param context - * @return decoded context - */ - protected static String decodeContext(String context) { - - if (context == null) { - return null; - } else { - return new String(Base64.decodeBase64(context.getBytes())); - } - } - - - - - - protected static String encodeBooleans(boolean[] prefixes) { - byte[] bytes = new byte[prefixes.length]; - for (int i = 0; i < prefixes.length; i++) { - if (prefixes[i]) - bytes[i] = 1; - else - bytes[i] = 0; - } - return new String(Base64.encodeBase64(bytes)); - } - - /** - * @param flags - * @return decoded flags - */ - protected static boolean[] decodeBooleans(String prefixes) { - // return null of there were no flags - if (prefixes == null) - return null; - - byte[] bytes = Base64.decodeBase64(prefixes.getBytes()); - boolean[] bFlags = new boolean[bytes.length]; - for (int i = 0; i < bytes.length; i++) { - if (bytes[i] == 1) - bFlags[i] = true; - else - bFlags[i] = false; - } - return bFlags; - } - - - - - - - - - @Override - public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { - TextColumn[] terms = decodeColumns(options.get(columnOptionName)); - boolean[] prefixes = decodeBooleans(options.get(columnPrefix)); - ctxt = decodeContext(options.get(context)); - - if(ctxt != null) { - hasContext = true; - } - - - - if (terms.length < 2) { - throw new IllegalArgumentException("IntersectionIterator requires two or more columns families"); - } - - sources = new TermSource[terms.length]; - sources[0] = new TermSource(source, terms[0]); - for (int i = 1; i < terms.length; i++) { - //log.info("For decoded column " + i + " column family is " + terms[i].getColumnFamily() + " and qualifier is " + terms[i].getColumnQualifier()); - sources[i] = new TermSource(source.deepCopy(env), terms[i]); - sources[i].isPrefix = prefixes[i]; - } - sourcesCount = terms.length; - } - - @Override - public void seek(Range range, Collection<ByteSequence> seekColumnFamilies, boolean inclusive) throws IOException { - overallRange = new Range(range); - currentRow = new Text(); - currentTermCond.set(emptyByteArray); - termCondSet = false; - - - -// log.info("Calling seek with range " + range); - - // seek each of the sources to the right column family within the row - // given by key - - Key sourceKey; - - if (rangeCqValid(range)) { - - String[] cqInfo = cqParser(range.getStartKey().getColumnQualifier()); - int id = Integer.parseInt(cqInfo[1]); - - - - if (id >= 0) { - for (int i = 0; i < sourcesCount; i++) { - - if (i == id) { - sourceKey = buildKey(getRow(range.getStartKey()), sources[i].term, new Text(cqInfo[0])); - sources[i].seek(new Range(sourceKey, true, null, false)); - sources[i].next(); - if(!hasContext && sources[i].hasTop()) { - ctxt = getTermCond(sources[i].top).toString().split("\u0000")[0]; - } - } else { - sourceKey = buildKey(getRow(range.getStartKey()), sources[i].term); - sources[i].seek(new Range(sourceKey, true, null, false)); - } - } - } else { - - - for (int i = 0; i < sourcesCount; i++) { - sourceKey = buildKey(getRow(range.getStartKey()), sources[i].term, range.getStartKey() - .getColumnQualifier()); - sources[i].seek(new Range(sourceKey, true, null, false)); - } - } - - - } else { - -// log.info("Range is invalid."); - for (int i = 0; i < sourcesCount; i++) { - - if (range.getStartKey() != null) { - - sourceKey = buildKey(getRow(range.getStartKey()), sources[i].term); - - // Seek only to the term for this source as a column family - sources[i].seek(new Range(sourceKey, true, null, false)); - } else { - // Seek only to the term for this source as a column family - - sources[i].seek(range); - } - } - } - - advanceToIntersection(); - - } - - - private String[] cqParser(Text cq) { - - String cQ = cq.toString(); - String[] cqComponents = cQ.split(DocIndexIteratorUtil.DOC_ID_INDEX_DELIM); - int id = -1; - String[] valPos = new String[2]; - - - - - if(cqComponents.length > 1) { - id = Integer.parseInt(cqComponents[cqComponents.length-1]); - if (id >= 0) { - valPos[0] = cqComponents[id].toString(); - valPos[1] = "" + id; - } else { - valPos[0] = cqComponents[0].toString(); - valPos[1] = "" + id; - } - } else { - valPos[0] = cq.toString(); - valPos[1] = "" + -1; - } - - return valPos; - - } - - - private boolean rangeCqValid(Range range) { - return (range.getStartKey() != null) && (range.getStartKey().getColumnQualifier() != null); - } - - - - public void addSource(SortedKeyValueIterator<Key,Value> source, IteratorEnvironment env, TextColumn column) { - // Check if we have space for the added Source - if (sources == null) { - sources = new TermSource[1]; - } else { - // allocate space for node, and copy current tree. - // TODO: Should we change this to an ArrayList so that we can just add() ? - ACCUMULO-1309 - TermSource[] localSources = new TermSource[sources.length + 1]; - int currSource = 0; - for (TermSource myTerm : sources) { - // TODO: Do I need to call new here? or can I just re-use the term? - ACCUMULO-1309 - localSources[currSource] = new TermSource(myTerm); - currSource++; - } - sources = localSources; - } - sources[sourcesCount] = new TermSource(source.deepCopy(env), column); - sourcesCount++; - } - - /** - * Encode the columns to be used when iterating. - * - * @param cfg - * @param columns - */ - public static void setColumnFamilies(IteratorSetting cfg, TextColumn[] columns) { - if (columns.length < 2) - throw new IllegalArgumentException("Must supply at least two terms to intersect"); - - boolean[] prefix = new boolean[columns.length]; - - for(int i = 0; i < columns.length; i++) { - prefix[i] = columns[i].isPrefix(); - } - - - - cfg.addOption(DocumentIndexIntersectingIterator.columnPrefix, DocumentIndexIntersectingIterator.encodeBooleans(prefix)); - cfg.addOption(DocumentIndexIntersectingIterator.columnOptionName, DocumentIndexIntersectingIterator.encodeColumns(columns)); - } - - - - - - public static void setContext(IteratorSetting cfg, String context) { - - cfg.addOption(DocumentIndexIntersectingIterator.context, DocumentIndexIntersectingIterator.encodeContext(context)); - - } - - - - - - - - - - - - - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/TextColumn.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/TextColumn.java b/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/TextColumn.java deleted file mode 100644 index 661f62b..0000000 --- a/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/TextColumn.java +++ /dev/null @@ -1,108 +0,0 @@ -package mvm.rya.accumulo.documentIndex; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import org.apache.hadoop.io.Text; - -public class TextColumn { - - - private Text columnFamily; - private Text columnQualifier; - private boolean isPrefix = false; - - - - public TextColumn(Text columnFamily, Text columnQualifier) { - this.columnFamily = columnFamily; - this.columnQualifier = columnQualifier; - } - - - public TextColumn(TextColumn other) { - - this.columnFamily = new Text(other.columnFamily); - this.columnQualifier = new Text(other.columnQualifier); - this.isPrefix = other.isPrefix; - - } - - - public Text getColumnFamily() { - return columnFamily; - } - - - public boolean isPrefix() { - return isPrefix; - } - - - public void setIsPrefix(boolean isPrefix) { - this.isPrefix = isPrefix; - } - - - public boolean isValid() { - return (columnFamily != null && columnQualifier != null); - } - - - - public Text getColumnQualifier() { - return columnQualifier; - } - - - public void setColumnFamily(Text cf) { - this.columnFamily = cf; - } - - public void setColumnQualifier(Text cq) { - this.columnQualifier = cq; - } - - public String toString() { - - return columnFamily.toString() + ", " + columnQualifier.toString() + ", prefix:" + isPrefix; - } - - @Override - public boolean equals(Object other) { - - if(other == null) { - return false; - } - - if(!(other instanceof TextColumn)) { - return false; - } - - TextColumn tc = (TextColumn) other; - - return this.columnFamily.equals(tc.columnFamily) && this.columnQualifier.equals(tc.columnQualifier) && this.isPrefix == tc.isPrefix; - - - - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/indexing/src/main/java/mvm/rya/accumulo/mr/NullFreeTextIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/mr/NullFreeTextIndexer.java b/extras/indexing/src/main/java/mvm/rya/accumulo/mr/NullFreeTextIndexer.java deleted file mode 100644 index 3d005cf..0000000 --- a/extras/indexing/src/main/java/mvm/rya/accumulo/mr/NullFreeTextIndexer.java +++ /dev/null @@ -1,70 +0,0 @@ -package mvm.rya.accumulo.mr; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import info.aduna.iteration.CloseableIteration; - -import java.io.IOException; -import java.util.Set; - -import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.indexing.FreeTextIndexer; -import mvm.rya.indexing.StatementContraints; - -import org.apache.hadoop.conf.Configuration; -import org.openrdf.model.Statement; -import org.openrdf.model.URI; -import org.openrdf.query.QueryEvaluationException; - -public class NullFreeTextIndexer extends AbstractAccumuloIndexer implements FreeTextIndexer { - - @Override - public String getTableName() { - return null; - } - - @Override - public void storeStatement(RyaStatement statement) throws IOException { - } - - @Override - public Configuration getConf() { - return null; - } - - @Override - public void setConf(Configuration arg0) { - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryText(String query, StatementContraints contraints) - throws IOException { - return null; - } - - @Override - public Set<URI> getIndexablePredicates() { - return null; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/indexing/src/main/java/mvm/rya/accumulo/mr/NullGeoIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/mr/NullGeoIndexer.java b/extras/indexing/src/main/java/mvm/rya/accumulo/mr/NullGeoIndexer.java deleted file mode 100644 index b351c13..0000000 --- a/extras/indexing/src/main/java/mvm/rya/accumulo/mr/NullGeoIndexer.java +++ /dev/null @@ -1,121 +0,0 @@ -package mvm.rya.accumulo.mr; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import info.aduna.iteration.CloseableIteration; - -import java.io.IOException; -import java.util.Set; - -import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.indexing.GeoIndexer; -import mvm.rya.indexing.StatementContraints; - -import org.apache.hadoop.conf.Configuration; -import org.openrdf.model.Statement; -import org.openrdf.model.URI; -import org.openrdf.query.QueryEvaluationException; - -import com.vividsolutions.jts.geom.Geometry; - -public class NullGeoIndexer extends AbstractAccumuloIndexer implements GeoIndexer { - - @Override - public String getTableName() { - - return null; - } - - @Override - public void storeStatement(RyaStatement statement) throws IOException { - - - } - - @Override - public Configuration getConf() { - - return null; - } - - @Override - public void setConf(Configuration arg0) { - - - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryEquals(Geometry query, StatementContraints contraints) { - - return null; - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryDisjoint(Geometry query, StatementContraints contraints) { - - return null; - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryIntersects(Geometry query, StatementContraints contraints) { - - return null; - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryTouches(Geometry query, StatementContraints contraints) { - - return null; - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryCrosses(Geometry query, StatementContraints contraints) { - - return null; - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryWithin(Geometry query, StatementContraints contraints) { - - return null; - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryContains(Geometry query, StatementContraints contraints) { - - return null; - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryOverlaps(Geometry query, StatementContraints contraints) { - - return null; - } - - @Override - public Set<URI> getIndexablePredicates() { - - return null; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/indexing/src/main/java/mvm/rya/accumulo/mr/NullTemporalIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/mr/NullTemporalIndexer.java b/extras/indexing/src/main/java/mvm/rya/accumulo/mr/NullTemporalIndexer.java deleted file mode 100644 index 153a3c3..0000000 --- a/extras/indexing/src/main/java/mvm/rya/accumulo/mr/NullTemporalIndexer.java +++ /dev/null @@ -1,154 +0,0 @@ -package mvm.rya.accumulo.mr; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import info.aduna.iteration.CloseableIteration; - -import java.io.IOException; -import java.util.Collection; -import java.util.Set; - -import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.RyaURI; -import mvm.rya.indexing.StatementContraints; -import mvm.rya.indexing.TemporalIndexer; -import mvm.rya.indexing.TemporalInstant; -import mvm.rya.indexing.TemporalInterval; - -import org.apache.hadoop.conf.Configuration; -import org.openrdf.model.Statement; -import org.openrdf.model.URI; -import org.openrdf.query.QueryEvaluationException; - -/** - * Temporal Indexer that does nothing, like when disabled. - * - */ -public class NullTemporalIndexer extends AbstractAccumuloIndexer implements TemporalIndexer { - - @Override - public String getTableName() { - - return null; - } - - @Override - public void storeStatement(RyaStatement statement) throws IOException { - - - } - - @Override - public Configuration getConf() { - - return null; - } - - @Override - public void setConf(Configuration arg0) { - - - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryInstantEqualsInstant(TemporalInstant queryInstant, - StatementContraints contraints) throws QueryEvaluationException { - - return null; - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryInstantBeforeInstant(TemporalInstant queryInstant, - StatementContraints contraints) throws QueryEvaluationException { - - return null; - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryInstantAfterInstant(TemporalInstant queryInstant, - StatementContraints contraints) throws QueryEvaluationException { - - return null; - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryInstantBeforeInterval(TemporalInterval givenInterval, - StatementContraints contraints) throws QueryEvaluationException { - - return null; - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryInstantAfterInterval(TemporalInterval givenInterval, - StatementContraints contraints) throws QueryEvaluationException { - - return null; - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryInstantInsideInterval(TemporalInterval givenInterval, - StatementContraints contraints) throws QueryEvaluationException { - - return null; - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryInstantHasBeginningInterval(TemporalInterval queryInterval, - StatementContraints contraints) throws QueryEvaluationException { - - return null; - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryInstantHasEndInterval(TemporalInterval queryInterval, - StatementContraints contraints) throws QueryEvaluationException { - - return null; - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryIntervalEquals(TemporalInterval query, - StatementContraints contraints) throws QueryEvaluationException { - - return null; - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryIntervalBefore(TemporalInterval query, - StatementContraints contraints) throws QueryEvaluationException { - - return null; - } - - @Override - public CloseableIteration<Statement, QueryEvaluationException> queryIntervalAfter(TemporalInterval query, StatementContraints contraints) - throws QueryEvaluationException { - - return null; - } - - @Override - public Set<URI> getIndexablePredicates() { - - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/indexing/src/main/java/mvm/rya/accumulo/mr/RyaOutputFormat.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/mr/RyaOutputFormat.java b/extras/indexing/src/main/java/mvm/rya/accumulo/mr/RyaOutputFormat.java deleted file mode 100644 index 8a0d599..0000000 --- a/extras/indexing/src/main/java/mvm/rya/accumulo/mr/RyaOutputFormat.java +++ /dev/null @@ -1,329 +0,0 @@ -package mvm.rya.accumulo.mr; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import java.io.Closeable; -import java.io.Flushable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; - -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.accumulo.AccumuloRyaDAO; -import mvm.rya.accumulo.mr.utils.MRUtils; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.persist.RyaDAOException; -import mvm.rya.api.resolver.RdfToRyaConversions; -import mvm.rya.indexing.FreeTextIndexer; -import mvm.rya.indexing.GeoIndexer; -import mvm.rya.indexing.TemporalIndexer; -import mvm.rya.indexing.accumulo.ConfigUtils; -import mvm.rya.indexing.accumulo.StatementSerializer; -import mvm.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer; -import mvm.rya.indexing.accumulo.geo.GeoMesaGeoIndexer; -import mvm.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.TableExistsException; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.data.Mutation; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; -import org.apache.log4j.Logger; -import org.geotools.feature.SchemaException; -import org.openrdf.model.Statement; - -/** - * Hadoop Map/Reduce class to use Rya, the {@link GeoIndexer}, the {@link FreeTextIndexer}, and the {@link TemporalIndexer} as the sink of {@link Statement} data. - * wrapped in an {@link StatementWritable} objects. This {@link OutputFormat} ignores the Keys and only writes the Values to Rya. - * - * The user must specify connection parameters for Rya, {@link GeoIndexer}, {@link FreeTextIndexer}, and {@link TemporalIndexer}. - */ -public class RyaOutputFormat extends OutputFormat<Writable, StatementWritable> { - private static final Logger logger = Logger.getLogger(RyaOutputFormat.class); - - private static final String PREFIX = RyaOutputFormat.class.getSimpleName(); - private static final String MAX_MUTATION_BUFFER_SIZE = PREFIX + ".maxmemory"; - private static final String ENABLE_FREETEXT = PREFIX + ".freetext.enable"; - private static final String ENABLE_GEO = PREFIX + ".geo.enable"; - private static final String ENABLE_TEMPORAL = PREFIX + ".temporal.enable";; - - - @Override - public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException { - Configuration conf = jobContext.getConfiguration(); - - // make sure that all of the indexers can connect - getGeoIndexer(conf); - getFreeTextIndexer(conf); - getTemporalIndexer(conf); - getRyaIndexer(conf); - } - - @Override - public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { - // copied from AccumuloOutputFormat - return new NullOutputFormat<Text, Mutation>().getOutputCommitter(context); - } - - @Override - public RecordWriter<Writable, StatementWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { - return new RyaRecordWriter(context); - } - - private static GeoIndexer getGeoIndexer(Configuration conf) throws IOException { - if (!conf.getBoolean(ENABLE_GEO, true)) { - return new NullGeoIndexer(); - } - - GeoMesaGeoIndexer geo = new GeoMesaGeoIndexer(); - geo.setConf(conf); - return geo; - - } - - private static FreeTextIndexer getFreeTextIndexer(Configuration conf) throws IOException { - if (!conf.getBoolean(ENABLE_FREETEXT, true)) { - return new NullFreeTextIndexer(); - } - - AccumuloFreeTextIndexer freeText = new AccumuloFreeTextIndexer(); - freeText.setConf(conf); - return freeText; - - } - - private static TemporalIndexer getTemporalIndexer(Configuration conf) throws IOException { - if (!conf.getBoolean(ENABLE_TEMPORAL, true)) { - return new NullTemporalIndexer(); - } - AccumuloTemporalIndexer temporal = new AccumuloTemporalIndexer(); - temporal.setConf(conf); - return temporal; - } - - private static AccumuloRyaDAO getRyaIndexer(Configuration conf) throws IOException { - try { - AccumuloRyaDAO ryaIndexer = new AccumuloRyaDAO(); - Connector conn = ConfigUtils.getConnector(conf); - ryaIndexer.setConnector(conn); - - AccumuloRdfConfiguration ryaConf = new AccumuloRdfConfiguration(); - - String tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, null); - if (tablePrefix != null) { - ryaConf.setTablePrefix(tablePrefix); - } - ryaConf.setDisplayQueryPlan(false); - ryaIndexer.setConf(ryaConf); - ryaIndexer.init(); - return ryaIndexer; - } catch (AccumuloException e) { - logger.error("Cannot create RyaIndexer", e); - throw new IOException(e); - } catch (AccumuloSecurityException e) { - logger.error("Cannot create RyaIndexer", e); - throw new IOException(e); - } catch (RyaDAOException e) { - logger.error("Cannot create RyaIndexer", e); - throw new IOException(e); - } - } - - public static class RyaRecordWriter extends RecordWriter<Writable, StatementWritable> implements Closeable, Flushable { - private static final Logger logger = Logger.getLogger(RyaRecordWriter.class); - - private FreeTextIndexer freeTextIndexer; - private GeoIndexer geoIndexer; - private TemporalIndexer temporalIndexer; - private AccumuloRyaDAO ryaIndexer; - - private static final long ONE_MEGABYTE = 1024L * 1024L; - private static final long AVE_STATEMENT_SIZE = 100L; - - private long bufferSizeLimit; - private long bufferCurrentSize = 0; - - private ArrayList<RyaStatement> buffer; - - public RyaRecordWriter(TaskAttemptContext context) throws IOException { - this(context.getConfiguration()); - } - - public RyaRecordWriter(Configuration conf) throws IOException { - // set up the buffer - bufferSizeLimit = conf.getLong(MAX_MUTATION_BUFFER_SIZE, ONE_MEGABYTE); - int bufferCapacity = (int) (bufferSizeLimit / AVE_STATEMENT_SIZE); - buffer = new ArrayList<RyaStatement>(bufferCapacity); - - // set up the indexers - freeTextIndexer = getFreeTextIndexer(conf); - geoIndexer = getGeoIndexer(conf); - temporalIndexer = getTemporalIndexer(conf); - ryaIndexer = getRyaIndexer(conf); - - // update fields used for metrics - startTime = System.currentTimeMillis(); - lastCommitFinishTime = startTime; - } - - @Override - public void flush() throws IOException { - flushBuffer(); - } - - @Override - public void close() throws IOException { - close(null); - } - - @Override - public void close(TaskAttemptContext paramTaskAttemptContext) throws IOException { - // close everything. log errors - try { - flush(); - } catch (IOException e) { - logger.error("Error flushing the buffer on RyaOutputFormat Close", e); - } - try { - if (geoIndexer != null) - geoIndexer.close(); - } catch (IOException e) { - logger.error("Error closing the geoIndexer on RyaOutputFormat Close", e); - } - try { - if (freeTextIndexer != null) - freeTextIndexer.close(); - } catch (IOException e) { - logger.error("Error closing the freetextIndexer on RyaOutputFormat Close", e); - } - try { - if (temporalIndexer != null) - temporalIndexer.close(); - } catch (IOException e) { - logger.error("Error closing the temporalIndexer on RyaOutputFormat Close", e); - } - try { - ryaIndexer.destroy(); - } catch (RyaDAOException e) { - logger.error("Error closing RyaDAO on RyaOutputFormat Close", e); - } - } - - public void write(Statement statement) throws IOException, InterruptedException { - write(null, new StatementWritable(statement)); - } - - @Override - public void write(Writable key, StatementWritable value) throws IOException, InterruptedException { - buffer.add(RdfToRyaConversions.convertStatement(value)); - - bufferCurrentSize += StatementSerializer.writeStatement(value).length(); - - if (bufferCurrentSize >= bufferSizeLimit) { - flushBuffer(); - } - } - - // fields for storing metrics - private long startTime = 0; - private long lastCommitFinishTime = 0; - private long totalCommitRecords = 0; - - private double totalReadDuration = 0; - private double totalWriteDuration = 0; - - private long commitCount = 0; - - private void flushBuffer() throws IOException { - totalCommitRecords += buffer.size(); - commitCount++; - - long startCommitTime = System.currentTimeMillis(); - - logger.info(String.format("(C-%d) Flushing buffer with %,d objects and %,d bytes", commitCount, buffer.size(), - bufferCurrentSize)); - - double readingDuration = (startCommitTime - lastCommitFinishTime) / 1000.; - totalReadDuration += readingDuration; - double currentReadRate = buffer.size() / readingDuration; - double totalReadRate = totalCommitRecords / totalReadDuration; - - // Print "reading" metrics - logger.info(String.format("(C-%d) (Reading) Duration, Current Rate, Total Rate: %.2f %.2f %.2f ", commitCount, readingDuration, - currentReadRate, totalReadRate)); - - // write to geo - geoIndexer.storeStatements(buffer); - geoIndexer.flush(); - - // write to free text - freeTextIndexer.storeStatements(buffer); - freeTextIndexer.flush(); - - // write to temporal - temporalIndexer.storeStatements(buffer); - temporalIndexer.flush(); - - // write to rya - try { - ryaIndexer.add(buffer.iterator()); - } catch (RyaDAOException e) { - logger.error("Cannot writing statement to Rya", e); - throw new IOException(e); - } - - lastCommitFinishTime = System.currentTimeMillis(); - - double writingDuration = (lastCommitFinishTime - startCommitTime) / 1000.; - totalWriteDuration += writingDuration; - double currentWriteRate = buffer.size() / writingDuration; - double totalWriteRate = totalCommitRecords / totalWriteDuration; - - // Print "writing" stats - logger.info(String.format("(C-%d) (Writing) Duration, Current Rate, Total Rate: %.2f %.2f %.2f ", commitCount, writingDuration, - currentWriteRate, totalWriteRate)); - - double processDuration = writingDuration + readingDuration; - double totalProcessDuration = totalWriteDuration + totalReadDuration; - double currentProcessRate = buffer.size() / processDuration; - double totalProcessRate = totalCommitRecords / (totalProcessDuration); - - // Print "total" stats - logger.info(String.format("(C-%d) (Total) Duration, Current Rate, Total Rate: %.2f %.2f %.2f ", commitCount, processDuration, - currentProcessRate, totalProcessRate)); - - // clear the buffer - buffer.clear(); - bufferCurrentSize = 0L; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/indexing/src/main/java/mvm/rya/accumulo/mr/StatementWritable.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/mr/StatementWritable.java b/extras/indexing/src/main/java/mvm/rya/accumulo/mr/StatementWritable.java deleted file mode 100644 index aefdf74..0000000 --- a/extras/indexing/src/main/java/mvm/rya/accumulo/mr/StatementWritable.java +++ /dev/null @@ -1,86 +0,0 @@ -package mvm.rya.accumulo.mr; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import mvm.rya.indexing.accumulo.StatementSerializer; - -import org.apache.hadoop.io.Writable; -import org.openrdf.model.Resource; -import org.openrdf.model.Statement; -import org.openrdf.model.URI; -import org.openrdf.model.Value; - -/** - * A {@link Writable} wrapper for {@link Statement} objects. - */ -@SuppressWarnings("serial") -public class StatementWritable implements Statement, Writable { - - private Statement statement; - - public StatementWritable(Statement statement) { - setStatement(statement); - } - - public void setStatement(Statement statement) { - this.statement = statement; - } - - public Statement getStatement() { - return statement; - } - - @Override - public void readFields(DataInput paramDataInput) throws IOException { - statement = StatementSerializer.readStatement(paramDataInput.readUTF()); - } - - @Override - public void write(DataOutput paramDataOutput) throws IOException { - paramDataOutput.writeUTF(StatementSerializer.writeStatement(statement)); - } - - @Override - public Resource getSubject() { - return statement.getSubject(); - } - - @Override - public URI getPredicate() { - return statement.getPredicate(); - } - - @Override - public Value getObject() { - return statement.getObject(); - } - - @Override - public Resource getContext() { - return statement.getContext(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/5a03ef61/extras/indexing/src/main/java/mvm/rya/accumulo/mr/fileinput/BulkNtripsInputToolIndexing.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/mr/fileinput/BulkNtripsInputToolIndexing.java b/extras/indexing/src/main/java/mvm/rya/accumulo/mr/fileinput/BulkNtripsInputToolIndexing.java deleted file mode 100644 index ecc2354..0000000 --- a/extras/indexing/src/main/java/mvm/rya/accumulo/mr/fileinput/BulkNtripsInputToolIndexing.java +++ /dev/null @@ -1,227 +0,0 @@ -package mvm.rya.accumulo.mr.fileinput; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.io.IOException; -import java.io.StringReader; - -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.accumulo.mr.utils.MRUtils; -import mvm.rya.api.resolver.RdfToRyaConversions; -import mvm.rya.indexing.FreeTextIndexer; -import mvm.rya.indexing.GeoIndexer; -import mvm.rya.indexing.accumulo.ConfigUtils; -import mvm.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer; -import mvm.rya.indexing.accumulo.geo.GeoMesaGeoIndexer; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.TableExistsException; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; -import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.apache.log4j.Logger; -import org.geotools.feature.SchemaException; -import org.openrdf.model.Resource; -import org.openrdf.model.Statement; -import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.ContextStatementImpl; -import org.openrdf.model.impl.ValueFactoryImpl; -import org.openrdf.rio.ParserConfig; -import org.openrdf.rio.RDFFormat; -import org.openrdf.rio.RDFHandlerException; -import org.openrdf.rio.RDFParseException; -import org.openrdf.rio.RDFParser; -import org.openrdf.rio.Rio; -import org.openrdf.rio.helpers.RDFHandlerBase; - -import com.google.common.base.Preconditions; - -/** - * Take large ntrips files and use MapReduce to ingest into other indexing - */ -public class BulkNtripsInputToolIndexing extends Configured implements Tool { - - private String userName = null; - private String pwd = null; - private String instance = null; - private String zk = null; - - private String format = RDFFormat.NTRIPLES.getName(); - - @Override - public int run(final String[] args) throws Exception { - final Configuration conf = getConf(); - // conf - zk = conf.get(MRUtils.AC_ZK_PROP, zk); - instance = conf.get(MRUtils.AC_INSTANCE_PROP, instance); - userName = conf.get(MRUtils.AC_USERNAME_PROP, userName); - pwd = conf.get(MRUtils.AC_PWD_PROP, pwd); - format = conf.get(MRUtils.FORMAT_PROP, format); - - String auths = conf.get(MRUtils.AC_CV_PROP, ""); - - conf.set(MRUtils.FORMAT_PROP, format); - Preconditions.checkNotNull(zk, MRUtils.AC_ZK_PROP + " not set"); - Preconditions.checkNotNull(instance, MRUtils.AC_INSTANCE_PROP + " not set"); - Preconditions.checkNotNull(userName, MRUtils.AC_USERNAME_PROP + " not set"); - Preconditions.checkNotNull(pwd, MRUtils.AC_PWD_PROP + " not set"); - - // map the config values to free text configu values - conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zk); - conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instance); - conf.set(ConfigUtils.CLOUDBASE_USER, userName); - conf.set(ConfigUtils.CLOUDBASE_PASSWORD, pwd); - conf.set(ConfigUtils.CLOUDBASE_AUTHS, auths); - - final String inputDir = args[0]; - - String tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, null); - Preconditions.checkNotNull(tablePrefix, MRUtils.TABLE_PREFIX_PROPERTY + " not set"); - - String docTextTable = tablePrefix + "text"; - conf.set(ConfigUtils.FREE_TEXT_DOC_TABLENAME, docTextTable); - - String docTermTable = tablePrefix + "terms"; - conf.set(ConfigUtils.FREE_TEXT_TERM_TABLENAME, docTermTable); - - String geoTable = tablePrefix + "geo"; - conf.set(ConfigUtils.GEO_TABLENAME, geoTable); - - System.out.println("Loading data into tables[freetext, geo]"); - System.out.println("Loading data into tables[" + docTermTable + " " + docTextTable + " " + geoTable + "]"); - - Job job = new Job(new Configuration(conf), "Bulk Ingest load data into Indexing Tables"); - job.setJarByClass(this.getClass()); - - // setting long job - Configuration jobConf = job.getConfiguration(); - jobConf.setBoolean("mapred.map.tasks.speculative.execution", false); - jobConf.setBoolean("mapred.reduce.tasks.speculative.execution", false); - jobConf.set("io.sort.mb", jobConf.get("io.sort.mb", "256")); - jobConf.setBoolean("mapred.compress.map.output", true); - - job.setInputFormatClass(TextInputFormat.class); - - job.setMapperClass(ParseNtripsMapper.class); - - // I'm not actually going to write output. - job.setOutputFormatClass(NullOutputFormat.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(Text.class); - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(Text.class); - - TextInputFormat.setInputPaths(job, new Path(inputDir)); - - job.setNumReduceTasks(0); - - job.waitForCompletion(true); - - return 0; - } - - public static void main(String[] args) throws Exception { - ToolRunner.run(new Configuration(), new BulkNtripsInputToolIndexing(), args); - } - - public static class ParseNtripsMapper extends Mapper<LongWritable, Text, Text, Text> { - private static final Logger logger = Logger.getLogger(ParseNtripsMapper.class); - - public static final String TABLE_PROPERTY = "parsentripsmapper.table"; - - private RDFParser parser; - private FreeTextIndexer freeTextIndexer; - private GeoIndexer geoIndexer; - private String rdfFormat; - - @Override - protected void setup(final Context context) throws IOException, InterruptedException { - super.setup(context); - Configuration conf = context.getConfiguration(); - - freeTextIndexer = new AccumuloFreeTextIndexer(); - freeTextIndexer.setConf(conf); - geoIndexer = new GeoMesaGeoIndexer(); - geoIndexer.setConf(conf); - final ValueFactory vf = new ValueFactoryImpl(); - - rdfFormat = conf.get(MRUtils.FORMAT_PROP); - checkNotNull(rdfFormat, "Rdf format cannot be null"); - - String namedGraphString = conf.get(MRUtils.NAMED_GRAPH_PROP); - checkNotNull(namedGraphString, MRUtils.NAMED_GRAPH_PROP + " cannot be null"); - - final Resource namedGraph = vf.createURI(namedGraphString); - - parser = Rio.createParser(RDFFormat.valueOf(rdfFormat)); - parser.setParserConfig(new ParserConfig(true, true, true, RDFParser.DatatypeHandling.VERIFY)); - parser.setRDFHandler(new RDFHandlerBase() { - - @Override - public void handleStatement(Statement statement) throws RDFHandlerException { - Statement contextStatement = new ContextStatementImpl(statement.getSubject(), statement - .getPredicate(), statement.getObject(), namedGraph); - try { - freeTextIndexer.storeStatement(RdfToRyaConversions.convertStatement(contextStatement)); - geoIndexer.storeStatement(RdfToRyaConversions.convertStatement(contextStatement)); - } catch (IOException e) { - logger.error("Error creating indexers", e); - } - } - }); - } - - @Override - public void map(LongWritable key, Text value, Context output) throws IOException, InterruptedException { - String rdf = value.toString(); - try { - parser.parse(new StringReader(rdf), ""); - } catch (RDFParseException e) { - System.out.println("Line[" + rdf + "] cannot be formatted with format[" + rdfFormat + "]. Exception[" + e.getMessage() - + "]"); - } catch (Exception e) { - logger.error("error during map", e); - throw new IOException("Exception occurred parsing triple[" + rdf + "]"); - } - } - - @Override - public void cleanup(Context context) { - IOUtils.closeStream(freeTextIndexer); - IOUtils.closeStream(geoIndexer); - } - } - -}
