http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/common-query/src/main/java/ss/cloudbase/core/iterators/ConversionIterator.java ---------------------------------------------------------------------- diff --git a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/ConversionIterator.java b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/ConversionIterator.java new file mode 100644 index 0000000..5e75334 --- /dev/null +++ b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/ConversionIterator.java @@ -0,0 +1,151 @@ +package ss.cloudbase.core.iterators; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import ss.cloudbase.core.iterators.conversion.Operation; +import ss.cloudbase.core.iterators.filter.CBConverter; +import cloudbase.core.data.Key; +import cloudbase.core.data.Value; +import cloudbase.core.iterators.IteratorEnvironment; +import cloudbase.core.iterators.SortedKeyValueIterator; +import cloudbase.core.iterators.WrappingIterator; + +public class ConversionIterator extends WrappingIterator { + public static final String OPTION_CONVERSIONS = "conversions"; + public static final String OPTION_MULTI_DOC = "multiDoc"; + /** The character or characters that defines the end of the field in the column qualifier. Defaults to '@' **/ + public static final String OPTION_FIELD_END = "fieldEnd"; + + protected CBConverter serializedConverter; + protected Map<String, Operation> conversions; + protected boolean multiDoc = false; + protected String fieldEnd = "@"; + + public ConversionIterator() {} + + public ConversionIterator(ConversionIterator other) { + this.conversions.putAll(other.conversions); + this.multiDoc = other.multiDoc; + this.serializedConverter = other.serializedConverter; + } + + @Override + public SortedKeyValueIterator<Key, Value> deepCopy(IteratorEnvironment env) { + return new ConversionIterator(this); + } + + @Override + public Value getTopValue() { + if (hasTop()) { + if (conversions != null) { + if (multiDoc) { + return multiDocConvert(super.getTopValue()); + } else { + return convert(super.getTopValue()); + } + } + } + return super.getTopValue(); + } + + protected String getMultiDocField(Key key) { + String colq = key.getColumnQualifier().toString(); + int start = colq.indexOf("\u0000"); + if (start == -1) { + return null; + } + + int end = colq.indexOf(fieldEnd, start + 1); + if (end == -1) { + end = colq.length(); + } + + return colq.substring(start + 1, end); + } + + protected Value multiDocConvert(Value value) { + String field = getMultiDocField(getTopKey()); + if (conversions.containsKey(field)) { + String newValue = conversions.get(field).execute(value.toString()); + return new Value(newValue.getBytes()); + } else { + return value; + } + } + + protected Value convert(Value value) { + Map<String, String> record = serializedConverter.toMap(getTopKey(), value); + + for (String field: record.keySet()) { + if (conversions.containsKey(field)) { + record.put(field, conversions.get(field).execute(record.get(field))); + } + } + + return serializedConverter.toValue(record); + } + + @Override + public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException { + super.init(source, options, env); + + if (options.containsKey(OPTION_MULTI_DOC)) { + multiDoc = Boolean.parseBoolean(options.get(OPTION_MULTI_DOC)); + } else { + multiDoc = false; + } + + if (!multiDoc) { + serializedConverter = new CBConverter(); + serializedConverter.init(options); + } + + if (options.containsKey(OPTION_FIELD_END)) { + fieldEnd = options.get(OPTION_FIELD_END); + } + + if (options.containsKey(OPTION_CONVERSIONS)) { + Operation[] ops = decodeConversions(options.get(OPTION_CONVERSIONS)); + conversions = new HashMap<String, Operation> (); + + for (Operation o: ops) { + conversions.put(o.getField(), o); + } + } + } + + /** + * Encodes a set of conversion strings for use with the OPTION_CONVERSIONS options. Each conversion + * string should be in the format 'field op value' (whitespace necessary), where op is +, -, *, /, %, or + * ^ and the value is a number. + * + * @param conversions + * @return The encoded value to use with OPTION_CONVERSIONS + */ + public static String encodeConversions(String[] conversions) { + StringBuilder sb = new StringBuilder(); + boolean first = true; + for (String conversion: conversions) { + if (first) { + first = false; + } else { + sb.append("\u0000"); + } + sb.append(conversion); + } + return sb.toString(); + } + + public static Operation[] decodeConversions(String conversions) { + String[] configs = conversions.split("\u0000"); + Operation[] ops = new Operation[configs.length]; + + for (int i = 0; i < configs.length; i++) { + ops[i] = new Operation(configs[i]); + } + + return ops; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/common-query/src/main/java/ss/cloudbase/core/iterators/GMDenIntersectingIterator.java ---------------------------------------------------------------------- diff --git a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/GMDenIntersectingIterator.java b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/GMDenIntersectingIterator.java new file mode 100644 index 0000000..7ec401f --- /dev/null +++ b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/GMDenIntersectingIterator.java @@ -0,0 +1,363 @@ +// Dear Cloudbase, +// Use protected fields/methods as much as possible in APIs. +// Love, +// Will + +// since the IntersectingIterator/FamilyIntersectingIterator classes are stingy with their fields, we have to use +// the exact same package name to get at currentPartition and currentDocID +package ss.cloudbase.core.iterators; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; + +import ss.cloudbase.core.iterators.IntersectingIterator.TermSource; + +import cloudbase.core.data.ArrayByteSequence; +import cloudbase.core.data.ByteSequence; +import cloudbase.core.data.Key; +import cloudbase.core.data.Range; +import cloudbase.core.data.Value; +import cloudbase.core.iterators.IteratorEnvironment; +import cloudbase.core.iterators.SortedKeyValueIterator; + +/** + * This class is a copy of FamilyIntersectingIterator with a few minor changes. It assumes a table structure like the following: + * <table> + * <tr><th>Row</th><th>Column Family</th><th>Column Qualifier</th><th>Value</th></tr> + * <tr><td>Partition1</td><td>event</td><td>UUID</td><td>The record value</td></tr> + * <tr><td>Partition1</td><td>index</td><td>term\u0000UUID</td><td></td></tr> + * </table> + * + * @author William Wall + * + */ +public class GMDenIntersectingIterator extends IntersectingIterator { + private static final Logger logger = Logger.getLogger(GMDenIntersectingIterator.class); + + public static final Text DEFAULT_INDEX_COLF = new Text("i"); + public static final Text DEFAULT_DOC_COLF = new Text("e"); + + public static final String indexFamilyOptionName = "indexFamily"; + public static final String docFamilyOptionName = "docFamily"; + + protected static Text indexColf = DEFAULT_INDEX_COLF; + protected static Text docColf = DEFAULT_DOC_COLF; + protected static Set<ByteSequence> indexColfSet; + protected static Set<ByteSequence> docColfSet; + + protected static final byte[] nullByte = {0}; + + protected SortedKeyValueIterator<Key,Value> docSource; + + /** + * Use this option to retrieve all the documents that match the UUID rather than just the first. This + * is commonly used in cell-level security models that use the column-qualifier like this: + * UUID \0 field1 [] value + * UUID \0 securedField [ALPHA] secretValue + **/ + public static final String OPTION_MULTI_DOC = "multiDoc"; + + /** + * Use this option to turn off document lookup. + */ + public static final String OPTION_DOC_LOOKUP = "docLookup"; + + protected boolean multiDoc = false; + protected boolean doDocLookup = true; + protected Range docRange = null; + protected boolean nextId = false; + + @Override + public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException { + if (options.containsKey(indexFamilyOptionName)) + indexColf = new Text(options.get(indexFamilyOptionName)); + if (options.containsKey(docFamilyOptionName)) + docColf = new Text(options.get(docFamilyOptionName)); + docSource = source.deepCopy(env); + indexColfSet = Collections.singleton((ByteSequence)new ArrayByteSequence(indexColf.getBytes(),0,indexColf.getLength())); + + if (options.containsKey(OPTION_MULTI_DOC)) { + multiDoc = Boolean.parseBoolean(options.get(OPTION_MULTI_DOC)); + } + + if (options.containsKey(OPTION_DOC_LOOKUP)) { + doDocLookup = Boolean.parseBoolean(options.get(OPTION_DOC_LOOKUP)); + } + + if (!doDocLookup) { + // it makes no sense to turn on multiDoc if doDocLookup is off + multiDoc = false; + } + + // remove any range terms + Text[] originalTerms = decodeColumns(options.get(columnFamiliesOptionName)); + boolean[] originalBooleans = decodeBooleans(options.get(notFlagOptionName)); + + List<Text> terms = new ArrayList<Text>(); + List<Boolean> termBooleans = new ArrayList<Boolean>(); + List<Text> ranges = new ArrayList<Text>(); + List<Boolean> rangeBooleans = new ArrayList<Boolean>(); + + boolean boolsExist = originalBooleans != null && originalBooleans.length == originalTerms.length; + + for (int i = 0; i < originalTerms.length; i++) { + if (isRangeTerm(originalTerms[i])) { + ranges.add(originalTerms[i]); + if (boolsExist) { + rangeBooleans.add(originalBooleans[i]); + } else { + rangeBooleans.add(false); + } + } else { + terms.add(originalTerms[i]); + + if (boolsExist) { + termBooleans.add(originalBooleans[i]); + } else { + termBooleans.add(false); + } + } + } + + boolean[] bools = new boolean[termBooleans.size()]; + for (int i = 0; i < termBooleans.size(); i++) { + bools[i] = termBooleans.get(i).booleanValue(); + } + + boolean[] rangeBools = new boolean[rangeBooleans.size()]; + for (int i = 0; i < rangeBooleans.size(); i++) { + rangeBools[i] = rangeBooleans.get(i).booleanValue(); + } + + // put the modified term/boolean lists back in the options + + if (terms.size() < 2) { + // the intersecting iterator will choke on these, so we'll set it up ourselves + if (terms.size() == 1) { + sources = new TermSource[1]; + sources[0] = new TermSource(source, terms.get(0)); + } + } else { + options.put(columnFamiliesOptionName, encodeColumns(terms.toArray(new Text[terms.size()]))); + if (termBooleans.size() > 0) { + options.put(notFlagOptionName, encodeBooleans(bools)); + } + + super.init(source, options, env); + } + + // add the range terms + if (ranges.size() > 0) { + + TermSource[] localSources; + + int offset = 0; + if (sources != null) { + localSources = new TermSource[sources.length + ranges.size()]; + + // copy array + for (int i = 0; i < sources.length; i++) { + localSources[i] = sources[i]; + } + + offset = sources.length; + } else { + localSources = new TermSource[ranges.size()]; + } + + for (int i = 0; i < ranges.size(); i++) { + IntersectionRange ri = new IntersectionRange(); + ri.init(source.deepCopy(env), getRangeIteratorOptions(ranges.get(i)), env); + localSources[i + offset] = new TermSource(ri, ri.getOutputTerm(), rangeBools[i]); + } + + sources = localSources; + } + + sourcesCount = sources.length; + + if (sourcesCount < 2) { + throw new IOException("GMDenIntersectingIterator requires two or more terms"); + } + + docColfSet = Collections.singleton((ByteSequence)new ArrayByteSequence(docColf.getBytes(),0,docColf.getLength())); + } + + @Override + protected Key buildKey(Text partition, Text term, Text docID) { + Text colq = new Text(term); + colq.append(nullByte, 0, 1); + colq.append(docID.getBytes(), 0, docID.getLength()); + return new Key(partition, indexColf, colq); + } + + @Override + protected Key buildKey(Text partition, Text term) { + Text colq = new Text(term); + return new Key(partition, indexColf, colq); + } + + @Override + protected Text getTerm(Key key) { + if (indexColf.compareTo(key.getColumnFamily().getBytes(),0,indexColf.getLength())< 0) { + // We're past the index column family, so return a term that will sort lexicographically last. + // The last unicode character should suffice + return new Text("\uFFFD"); + } + Text colq = key.getColumnQualifier(); + int zeroIndex = colq.find("\0"); + Text term = new Text(); + term.set(colq.getBytes(),0,zeroIndex); + return term; + } + + @Override + public SortedKeyValueIterator<Key, Value> deepCopy(IteratorEnvironment env) { + GMDenIntersectingIterator newItr = new GMDenIntersectingIterator(); + if(sources != null) { + newItr.sourcesCount = sourcesCount; + newItr.sources = new TermSource[sourcesCount]; + for(int i = 0; i < sourcesCount; i++) { + newItr.sources[i] = new TermSource(sources[i].iter.deepCopy(env), sources[i].term); + } + } + newItr.currentDocID = currentDocID; + newItr.currentPartition = currentPartition; + newItr.docRange = docRange; + newItr.docSource = docSource.deepCopy(env); + newItr.inclusive = inclusive; + newItr.multiDoc = multiDoc; + newItr.nextId = nextId; + newItr.overallRange = overallRange; + return newItr; + } + + @Override + public void seek(Range range, Collection<ByteSequence> seekColumnFamilies, boolean inclusive) throws IOException { + super.seek(range, indexColfSet, true); + + } + + @Override + protected Text getDocID(Key key) { + Text colq = key.getColumnQualifier(); + int firstZeroIndex = colq.find("\0"); + if (firstZeroIndex < 0) { + throw new IllegalArgumentException("bad docid: "+key.toString()); + } + Text docID = new Text(); + try { + docID.set(colq.getBytes(),firstZeroIndex+1, colq.getBytes().length - firstZeroIndex - 1); + } catch (ArrayIndexOutOfBoundsException e) { + throw new IllegalArgumentException("bad indices for docid: "+key.toString()+" "+firstZeroIndex +" " + (colq.getBytes().length - firstZeroIndex - 1)); + } + return docID; + } + + protected Key buildStartKey() { + return new Key(currentPartition, docColf, currentDocID); + } + + protected Key buildEndKey() { + if (multiDoc) { + return new Key(currentPartition, docColf, new Text(currentDocID.toString() + "\u0000\uFFFD")); + } + return null; + } + + @Override + public void next() throws IOException { + if (multiDoc && nextId) { + docSource.next(); + + // check to make sure that the docSource top is less than our max key + if (docSource.hasTop() && docRange.contains(docSource.getTopKey())) { + topKey = docSource.getTopKey(); + value = docSource.getTopValue(); + return; + } + } + + nextId = false; + super.next(); + } + + @Override + protected void advanceToIntersection() throws IOException { + super.advanceToIntersection(); + + if (topKey==null || !doDocLookup) + return; + + if (logger.isTraceEnabled()) logger.trace("using top key to seek for doc: "+topKey.toString()); + docRange = new Range(buildStartKey(), true, buildEndKey(), false); + docSource.seek(docRange, docColfSet, true); + logger.debug("got doc key: "+docSource.getTopKey().toString()); + if (docSource.hasTop()&& docRange.contains(docSource.getTopKey())) { + value = docSource.getTopValue(); + } + logger.debug("got doc value: "+value.toString()); + + if (docSource.hasTop()) { + if (multiDoc && topKey != null) { + nextId = true; + } + topKey = docSource.getTopKey(); + } + } + + + public boolean isRangeTerm(Text term) { + return term.toString().startsWith("range\u0000"); + } + + protected Map<String, String> getRangeIteratorOptions(Text config) { + // we want the keys from Range Iterators to look like this: + // range|colf|lower|includeLower|upper|includeUpper + // e.g. range|geo|21332|true|21333|false + + // and we'll output a key like this: + // partition index:geo\0UUID ... + + + String[] range = config.toString().split("\u0000"); + Map<String, String> options = new HashMap<String, String>(); + options.put(IntersectionRange.OPTION_COLF, range[1]); + options.put(IntersectionRange.OPTION_OUTPUT_TERM, range[1]); + options.put(IntersectionRange.OPTION_LOWER_BOUND, range[2]); + options.put(IntersectionRange.OPTION_START_INCLUSIVE, range[3]); + options.put(IntersectionRange.OPTION_UPPER_BOUND, range[4]); + options.put(IntersectionRange.OPTION_END_INCLUSIVE, range[5]); + options.put(IntersectionRange.OPTION_OUTPUT_COLF, indexColf.toString()); + return options; + } + + /** + * Builds a range term for use with the IntersectingIterator + * @param colf The column family to search + * @param start The start of the range + * @param includeStart Whether the start of the range is inclusive or not + * @param end The end of the range + * @param includeEnd Whether the end of the range is inclusive or not + * @return A String formatted for use as a term a GMDenIntersectingIterator + */ + public static String getRangeTerm(String colf, String start, boolean includeStart, String end, boolean includeEnd) { + StringBuilder sb = new StringBuilder(); + sb.append("range\u0000"); + sb.append(colf).append("\u0000"); + sb.append(start).append("\u0000"); + sb.append(includeStart ? "true": "false").append("\u0000"); + sb.append(end).append("\u0000"); + sb.append(includeEnd ? "true": "false").append("\u0000"); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/common-query/src/main/java/ss/cloudbase/core/iterators/IntersectingIterator.java ---------------------------------------------------------------------- diff --git a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/IntersectingIterator.java b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/IntersectingIterator.java new file mode 100644 index 0000000..3b4961f --- /dev/null +++ b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/IntersectingIterator.java @@ -0,0 +1,557 @@ +package ss.cloudbase.core.iterators; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; + +import org.apache.commons.codec.binary.Base64; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; + +import cloudbase.core.data.ByteSequence; +import cloudbase.core.data.Key; +import cloudbase.core.data.PartialKey; +import cloudbase.core.data.Range; +import cloudbase.core.data.Value; +import cloudbase.core.iterators.IteratorEnvironment; +import cloudbase.core.iterators.SortedKeyValueIterator; +import cloudbase.core.util.TextUtil; + +public class IntersectingIterator implements SortedKeyValueIterator<Key,Value> { + + protected Text nullText = new Text(); + + protected Text getPartition(Key key) { + return key.getRow(); + } + + protected Text getTerm(Key key) { + return key.getColumnFamily(); + } + + protected Text getDocID(Key key) { + return key.getColumnQualifier(); + } + + protected Key buildKey(Text partition, Text term) { + return new Key(partition,(term == null) ? nullText : term); + } + + protected Key buildKey(Text partition, Text term, Text docID) { + return new Key(partition,(term == null) ? nullText : term, docID); + } + + protected Key buildFollowingPartitionKey(Key key) { + return key.followingKey(PartialKey.ROW); + } + + protected static final Logger log = Logger.getLogger(IntersectingIterator.class); + + protected static class TermSource { + public SortedKeyValueIterator<Key,Value> iter; + public Text term; + public boolean notFlag; + + public TermSource(TermSource other) { + this.iter = other.iter; + this.term = other.term; + this.notFlag = other.notFlag; + } + + public TermSource(SortedKeyValueIterator<Key,Value> iter, Text term) { + this.iter = iter; + this.term = term; + this.notFlag = false; + } + public TermSource(SortedKeyValueIterator<Key,Value> iter, Text term, boolean notFlag) { + this.iter = iter; + this.term = term; + this.notFlag = notFlag; + } + + public String getTermString() { + return (this.term == null) ? new String("Iterator") : this.term.toString(); + } + } + + protected TermSource[] sources; + protected int sourcesCount = 0; + + protected Range overallRange; + + // query-time settings + protected Text currentPartition = null; + protected Text currentDocID = new Text(emptyByteArray); + protected static final byte [] emptyByteArray = new byte[0]; + + protected Key topKey = null; + protected Value value = new Value(emptyByteArray); + + protected Collection<ByteSequence> seekColumnFamilies; + + protected boolean inclusive; + + + public IntersectingIterator() + {} + + @Override + public SortedKeyValueIterator<Key, Value> deepCopy(IteratorEnvironment env) { + return new IntersectingIterator(this, env); + } + + public IntersectingIterator(IntersectingIterator other, IteratorEnvironment env) + { + if(other.sources != null) + { + sourcesCount = other.sourcesCount; + sources = new TermSource[sourcesCount]; + for(int i = 0; i < sourcesCount; i++) + { + sources[i] = new TermSource(other.sources[i].iter.deepCopy(env), other.sources[i].term); + } + } + } + + @Override + public Key getTopKey() { + return topKey; + } + + @Override + public Value getTopValue() { + // we don't really care about values + return value; + } + + @Override + public boolean hasTop() { + return currentPartition != null; + } + + // precondition: currentRow is not null + private boolean seekOneSource(int sourceID) throws IOException + { + // find the next key in the appropriate column family that is at or beyond the cursor (currentRow, currentCQ) + // advance the cursor if this source goes beyond it + // return whether we advanced the cursor + + // within this loop progress must be made in one of the following forms: + // - currentRow or currentCQ must be increased + // - the given source must advance its iterator + // this loop will end when any of the following criteria are met + // - the iterator for the given source is pointing to the key (currentRow, columnFamilies[sourceID], currentCQ) + // - the given source is out of data and currentRow is set to null + // - the given source has advanced beyond the endRow and currentRow is set to null + boolean advancedCursor = false; + + if (sources[sourceID].notFlag) + { + while(true) + { + if(sources[sourceID].iter.hasTop() == false) + { + // an empty column that you are negating is a valid condition + break; + } + // check if we're past the end key + int endCompare = -1; + // we should compare the row to the end of the range + if(overallRange.getEndKey() != null) + { + endCompare = overallRange.getEndKey().getRow().compareTo(sources[sourceID].iter.getTopKey().getRow()); + if((!overallRange.isEndKeyInclusive() && endCompare <= 0) || endCompare < 0) + { + // an empty column that you are negating is a valid condition + break; + } + } + int partitionCompare = currentPartition.compareTo(getPartition(sources[sourceID].iter.getTopKey())); + // check if this source is already at or beyond currentRow + // if not, then seek to at least the current row + + if(partitionCompare > 0) + { + // seek to at least the currentRow + Key seekKey = buildKey(currentPartition,sources[sourceID].term); + sources[sourceID].iter.seek(new Range(seekKey,true, null, false), seekColumnFamilies, inclusive); + continue; + } + // check if this source has gone beyond currentRow + // if so, this is a valid condition for negation + if(partitionCompare < 0) + { + break; + } + // we have verified that the current source is positioned in currentRow + // now we must make sure we're in the right columnFamily in the current row + // Note: Iterators are auto-magically set to the correct columnFamily + if(sources[sourceID].term != null) + { + int termCompare = sources[sourceID].term.compareTo(getTerm(sources[sourceID].iter.getTopKey())); + // check if this source is already on the right columnFamily + // if not, then seek forwards to the right columnFamily + if(termCompare > 0) + { + Key seekKey = buildKey(currentPartition,sources[sourceID].term,currentDocID); + sources[sourceID].iter.seek(new Range(seekKey,true,null,false), seekColumnFamilies, inclusive); + continue; + } + // check if this source is beyond the right columnFamily + // if so, then this is a valid condition for negating + if(termCompare < 0) + { + break; + } + } + + // we have verified that we are in currentRow and the correct column family + // make sure we are at or beyond columnQualifier + Text docID = getDocID(sources[sourceID].iter.getTopKey()); + int docIDCompare = currentDocID.compareTo(docID); + // If we are past the target, this is a valid result + if(docIDCompare < 0) + { + break; + } + // if this source is not yet at the currentCQ then advance in this source + if(docIDCompare > 0) + { + // seek forwards + Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID); + sources[sourceID].iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive); + continue; + } + // if we are equal to the target, this is an invalid result. + // Force the entire process to go to the next row. + // We are advancing column 0 because we forced that column to not contain a ! + // when we did the init() + if(docIDCompare == 0) + { + sources[0].iter.next(); + advancedCursor = true; + break; + } + } + } + else + { + while(true) + { + if(sources[sourceID].iter.hasTop() == false) + { + currentPartition = null; + // setting currentRow to null counts as advancing the cursor + return true; + } + // check if we're past the end key + int endCompare = -1; + // we should compare the row to the end of the range + + if(overallRange.getEndKey() != null) + { + endCompare = overallRange.getEndKey().getRow().compareTo(sources[sourceID].iter.getTopKey().getRow()); + if((!overallRange.isEndKeyInclusive() && endCompare <= 0) || endCompare < 0) + { + currentPartition = null; + // setting currentRow to null counts as advancing the cursor + return true; + } + } + int partitionCompare = currentPartition.compareTo(getPartition(sources[sourceID].iter.getTopKey())); + // check if this source is already at or beyond currentRow + // if not, then seek to at least the current row + if(partitionCompare > 0) + { + // seek to at least the currentRow + Key seekKey = buildKey(currentPartition,sources[sourceID].term); + sources[sourceID].iter.seek(new Range(seekKey,true, null, false), seekColumnFamilies, inclusive); + continue; + } + // check if this source has gone beyond currentRow + // if so, advance currentRow + if(partitionCompare < 0) + { + currentPartition.set(getPartition(sources[sourceID].iter.getTopKey())); + currentDocID.set(emptyByteArray); + advancedCursor = true; + continue; + } + // we have verified that the current source is positioned in currentRow + // now we must make sure we're in the right columnFamily in the current row + // Note: Iterators are auto-magically set to the correct columnFamily + + if(sources[sourceID].term != null) + { + int termCompare = sources[sourceID].term.compareTo(getTerm(sources[sourceID].iter.getTopKey())); + // check if this source is already on the right columnFamily + // if not, then seek forwards to the right columnFamily + if(termCompare > 0) + { + Key seekKey = buildKey(currentPartition,sources[sourceID].term,currentDocID); + sources[sourceID].iter.seek(new Range(seekKey,true,null,false), seekColumnFamilies, inclusive); + continue; + } + // check if this source is beyond the right columnFamily + // if so, then seek to the next row + if(termCompare < 0) + { + // we're out of entries in the current row, so seek to the next one + // byte[] currentRowBytes = currentRow.getBytes(); + // byte[] nextRow = new byte[currentRowBytes.length + 1]; + // System.arraycopy(currentRowBytes, 0, nextRow, 0, currentRowBytes.length); + // nextRow[currentRowBytes.length] = (byte)0; + // // we should reuse text objects here + // sources[sourceID].seek(new Key(new Text(nextRow),columnFamilies[sourceID])); + if(endCompare == 0) + { + // we're done + currentPartition = null; + // setting currentRow to null counts as advancing the cursor + return true; + } + Key seekKey = buildFollowingPartitionKey(sources[sourceID].iter.getTopKey()); + try { + sources[sourceID].iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive); + } catch (Exception e) { + // the seek will throw an exception if we have crossed a tablet boundary + // setting the Partition to null will advance to the next tablet + currentPartition = null; + return true; + } + continue; + } + } + // we have verified that we are in currentRow and the correct column family + // make sure we are at or beyond columnQualifier + Text docID = getDocID(sources[sourceID].iter.getTopKey()); + int docIDCompare = currentDocID.compareTo(docID); + // if this source has advanced beyond the current column qualifier then advance currentCQ and return true + if(docIDCompare < 0) + { + currentDocID.set(docID); + advancedCursor = true; + break; + } + // if this source is not yet at the currentCQ then seek in this source + if(docIDCompare > 0) + { + // seek forwards + Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID); + sources[sourceID].iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive); + continue; + } + // this source is at the current row, in its column family, and at currentCQ + break; + } + } + return advancedCursor; + } + + @Override + public void next() throws IOException { + if(currentPartition == null) + { + return; + } + // precondition: the current row is set up and the sources all have the same column qualifier + // while we don't have a match, seek in the source with the smallest column qualifier + sources[0].iter.next(); + advanceToIntersection(); + } + + protected void advanceToIntersection() throws IOException + { + boolean cursorChanged = true; + int numSeeks = 0; + while(cursorChanged) + { + // seek all of the sources to at least the highest seen column qualifier in the current row + cursorChanged = false; + for(int i = 0; i < sourcesCount; i++) + { + if(currentPartition == null) + { + topKey = null; + return; + } + numSeeks++; + if(seekOneSource(i)) + { + cursorChanged = true; + break; + } + } + } + topKey = buildKey(currentPartition,nullText,currentDocID); + } + + public static String stringTopKey(SortedKeyValueIterator<Key, Value> iter) { + if (iter.hasTop()) + return iter.getTopKey().toString(); + return ""; + } + + public static final String columnFamiliesOptionName = "columnFamilies"; + public static final String notFlagOptionName = "notFlag"; + + public static String encodeColumns(Text[] columns) + { + StringBuilder sb = new StringBuilder(); + for(int i = 0; i < columns.length; i++) + { + sb.append(new String(Base64.encodeBase64(TextUtil.getBytes(columns[i])))); + sb.append('\n'); + } + return sb.toString(); + } + + public static String encodeBooleans(boolean[] flags) + { + byte[] bytes = new byte[flags.length]; + for(int i = 0; i < flags.length; i++) + { + if(flags[i]) + bytes[i] = 1; + else + bytes[i] = 0; + } + return new String(Base64.encodeBase64(bytes)); + } + + public static Text[] decodeColumns(String columns) + { + String[] columnStrings = columns.split("\n"); + Text[] columnTexts = new Text[columnStrings.length]; + for(int i = 0; i < columnStrings.length; i++) + { + columnTexts[i] = new Text(Base64.decodeBase64(columnStrings[i].getBytes())); + } + return columnTexts; + } + + public static boolean[] decodeBooleans(String flags) + { + // return null of there were no flags + if(flags == null) + return null; + + byte[] bytes = Base64.decodeBase64(flags.getBytes()); + boolean[] bFlags = new boolean[bytes.length]; + for(int i = 0; i < bytes.length; i++) + { + if(bytes[i] == 1) + bFlags[i] = true; + else + bFlags[i] = false; + } + return bFlags; + } + + @Override + public void init(SortedKeyValueIterator<Key, Value> source, + Map<String, String> options, IteratorEnvironment env) throws IOException { + Text[] terms = decodeColumns(options.get(columnFamiliesOptionName)); + boolean[] notFlag = decodeBooleans(options.get(notFlagOptionName)); + + if(terms.length < 2) + { + throw new IOException("IntersectionIterator requires two or more columns families"); + } + + // Scan the not flags. + // There must be at least one term that isn't negated + // And we are going to re-order such that the first term is not a ! term + if(notFlag == null) + { + notFlag = new boolean[terms.length]; + for(int i = 0; i < terms.length; i++) + notFlag[i] = false; + } + if(notFlag[0]) { + for(int i = 1; i < notFlag.length; i++) + { + if(notFlag[i] == false) + { + Text swapFamily = new Text(terms[0]); + terms[0].set(terms[i]); + terms[i].set(swapFamily); + notFlag[0] = false; + notFlag[i] = true; + break; + } + } + if(notFlag[0]) + { + throw new IOException("IntersectionIterator requires at lest one column family without not"); + } + } + + + sources = new TermSource[terms.length]; + sources[0] = new TermSource(source, terms[0]); + for(int i = 1; i < terms.length; i++) + { + sources[i] = new TermSource(source.deepCopy(env), terms[i], notFlag[i]); + } + sourcesCount = terms.length; + } + + @Override + public void seek(Range range, Collection<ByteSequence> seekColumnFamilies, boolean inclusive) throws IOException { + overallRange = new Range(range); + currentPartition = new Text(); + currentDocID.set(emptyByteArray); + + this.seekColumnFamilies = seekColumnFamilies; + this.inclusive = inclusive; + + // seek each of the sources to the right column family within the row given by key + for(int i = 0; i < sourcesCount; i++) + { + Key sourceKey; + if(range.getStartKey() != null) + { + if(range.getStartKey().getColumnQualifier() != null) + { + sourceKey = buildKey(getPartition(range.getStartKey()),sources[i].term,range.getStartKey().getColumnQualifier()); + } + else + { + sourceKey = buildKey(getPartition(range.getStartKey()),sources[i].term); + } + sources[i].iter.seek(new Range(sourceKey, true, null, false), seekColumnFamilies, inclusive); + } + else + { + sources[i].iter.seek(range, seekColumnFamilies, inclusive); + } + } + advanceToIntersection(); + } + + public void addSource(SortedKeyValueIterator<Key, Value> source, IteratorEnvironment env, + Text term, boolean notFlag) { + // Check if we have space for the added Source + if(sources == null) + { + sources = new TermSource[1]; + } + else + { + // allocate space for node, and copy current tree. + // TODO: Should we change this to an ArrayList so that we can just add() ? + TermSource[] localSources = new TermSource[sources.length + 1]; + int currSource = 0; + for(TermSource myTerm : sources) + { + // TODO: Do I need to call new here? or can I just re-use the term? + localSources[currSource] = new TermSource(myTerm); + currSource++; + } + sources = localSources; + } + sources[sourcesCount] = new TermSource(source.deepCopy(env), term, notFlag); + sourcesCount++; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/common-query/src/main/java/ss/cloudbase/core/iterators/IntersectionRange.java ---------------------------------------------------------------------- diff --git a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/IntersectionRange.java b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/IntersectionRange.java new file mode 100644 index 0000000..04d5884 --- /dev/null +++ b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/IntersectionRange.java @@ -0,0 +1,330 @@ +package ss.cloudbase.core.iterators; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.hadoop.io.Text; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +import cloudbase.core.client.CBException; +import cloudbase.core.data.ArrayByteSequence; +import cloudbase.core.data.ByteSequence; +import cloudbase.core.data.Key; +import cloudbase.core.data.PartialKey; +import cloudbase.core.data.Range; +import cloudbase.core.data.Value; +import cloudbase.core.iterators.IteratorEnvironment; +import cloudbase.core.iterators.SortedKeyValueIterator; + +/** + * When attempting to intersect a term which is a range (lowerval <= x <= upperval), the entire range + * must first be scanned so that the document keys can be sorted before passing them up to the + * intersecting iterator of choice. + * + * @author William Wall (wawall) + */ +public class IntersectionRange implements SortedKeyValueIterator<Key, Value>{ + private static final Logger logger = Logger.getLogger(IntersectionRange.class); + + public static final String OPTION_OUTPUT_COLF = "outputColf"; + public static final String OPTION_OUTPUT_TERM = "outputTerm"; + public static final String OPTION_COLF = "columnFamily"; + public static final String OPTION_LOWER_BOUND = "lower"; + public static final String OPTION_UPPER_BOUND = "upper"; + public static final String OPTION_DELIMITER = "delimiter"; + public static final String OPTION_START_INCLUSIVE = "startInclusive"; + public static final String OPTION_END_INCLUSIVE = "endInclusive"; + public static final String OPTION_TEST_OUTOFMEM = "testOutOfMemory"; + + protected SortedKeyValueIterator<Key, Value> source; + protected Text colf = null; + protected Text lower = null; + protected Text upper = null; + protected String delimiter = null; + protected String outputTerm = null; + protected Text outputColf = null; + protected Text currentPartition = null; + protected boolean startInclusive = true; + protected boolean endInclusive = false; + protected boolean testOutOfMemory = false; + + protected Key topKey = null; + + protected Iterator<Key> itr; + protected boolean sortComplete = false; + protected Range overallRange; + protected SortedSet<Key> docIds = new TreeSet<Key>(); + protected static Set<ByteSequence> indexColfSet; + + @Override + public SortedKeyValueIterator<Key, Value> deepCopy(IteratorEnvironment env) { + return new IntersectionRange(this, env); + } + + public IntersectionRange() { + logger.setLevel(Level.ALL); + } + + public IntersectionRange(IntersectionRange other, IteratorEnvironment env) { + source = other.source.deepCopy(env); + colf = other.colf; + lower = other.lower; + upper = other.upper; + delimiter = other.delimiter; + outputColf = other.outputColf; + outputTerm = other.outputTerm; + currentPartition = other.currentPartition; + startInclusive = other.startInclusive; + endInclusive = other.endInclusive; + topKey = other.topKey; + docIds.addAll(other.docIds); + itr = docIds.iterator(); + sortComplete = other.sortComplete; + overallRange = other.overallRange; + } + + public Text getOutputTerm() { + return new Text(outputTerm); + } + + public Text getOutputColumnFamily() { + return outputColf; + } + + @Override + public Key getTopKey() { + return topKey; + } + + @Override + public Value getTopValue() { + return IteratorConstants.emptyValue; + } + + @Override + public boolean hasTop() { + try { + if (topKey == null) next(); + } catch (IOException e) { + + } + + return topKey != null; + } + + protected String getDocID(Key key) { + try { + String s = key.getColumnQualifier().toString(); + int start = s.indexOf("\u0000") + 1; + int end = s.indexOf("\u0000", start); + if (end == -1) { + end = s.length(); + } + return s.substring(start, end); + } catch (Exception e) { + + } + return null; + } + + protected Text getTerm(Key key) { + try { + Text colq = key.getColumnQualifier(); + Text term = new Text(); + term.set(colq.getBytes(), 0, colq.find("\0")); + return term; + } catch (Exception e) { + } + return null; + } + + protected Text getPartition(Key key) { + return key.getRow(); + } + + protected Text getFollowingPartition(Key key) { + return key.followingKey(PartialKey.ROW).getRow(); + } + + @Override + public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException { + if (options.containsKey(OPTION_LOWER_BOUND)) { + lower = new Text(options.get(OPTION_LOWER_BOUND)); + } else { + lower = new Text("\u0000"); + } + + if (options.containsKey(OPTION_UPPER_BOUND)) { + upper = new Text(options.get(OPTION_UPPER_BOUND)); + } else { + upper = new Text("\u0000"); + } + + if (options.containsKey(OPTION_DELIMITER)) { + delimiter = options.get(OPTION_DELIMITER); + } else { + delimiter = "\u0000"; + } + + if (options.containsKey(OPTION_COLF)) { + colf = new Text(options.get(OPTION_COLF)); + } else { + colf = new Text("index"); + } + + if (options.containsKey(OPTION_OUTPUT_COLF)) { + outputColf = new Text(options.get(OPTION_OUTPUT_COLF)); + } else { + outputColf = colf; + } + + if (options.containsKey(OPTION_START_INCLUSIVE)) { + startInclusive = Boolean.parseBoolean(options.get(OPTION_START_INCLUSIVE)); + } + + if (options.containsKey(OPTION_END_INCLUSIVE)) { + endInclusive = Boolean.parseBoolean(options.get(OPTION_END_INCLUSIVE)); + } + + if (options.containsKey(OPTION_TEST_OUTOFMEM)) { + testOutOfMemory = Boolean.parseBoolean(options.get(OPTION_TEST_OUTOFMEM)); + } + + outputTerm = options.get(OPTION_OUTPUT_TERM); + this.source = source; + + indexColfSet = Collections.singleton((ByteSequence) new ArrayByteSequence(colf.getBytes(),0,colf.getLength())); + } + + /** + * Sets up the document/record IDs in a sorted structure. + * @throws IOException + * @throws CBException + */ + protected void setUpDocIds() throws IOException { + int count = 0; + try { + if (testOutOfMemory) { + throw new OutOfMemoryError(); + } + + long start = System.currentTimeMillis(); + if (source.hasTop()) { + docIds.clear(); + currentPartition = getPartition(source.getTopKey()); + while (currentPartition != null) { + Key lowerKey = new Key(currentPartition, colf, lower); + try { + source.seek(new Range(lowerKey, true, null, false), indexColfSet, true); + } catch (IllegalArgumentException e) { + // the range does not overlap the overall range? quit + currentPartition = null; + break; + } + + // if we don't have a value then quit + if (!source.hasTop()) { + currentPartition = null; + break; + } + + Key top; + while(source.hasTop()) { + top = source.getTopKey(); + + if (overallRange != null && overallRange.getEndKey() != null) { + // see if we're past the end of the partition range + int endCompare = overallRange.getEndKey().compareTo(top, PartialKey.ROW); + if ((!overallRange.isEndKeyInclusive() && endCompare <= 0) || endCompare < 0) { + // we're done + currentPartition = null; + break; + } + } + + // make sure we're still in the right partition + if (currentPartition.compareTo(getPartition(top)) < 0) { + currentPartition.set(getPartition(top)); + break; + } + + // make sure we're still in the right column family + if (colf.compareTo(top.getColumnFamily()) < 0) { + // if not, then get the next partition + currentPartition = getFollowingPartition(top); + break; + } + + Text term = getTerm(top); + int lowerCompare = term.compareTo(lower); + int upperCompare = term.compareTo(upper); + + // if we went past the upper bound, jump to the next partition + if ((endInclusive && upperCompare > 0) || (!endInclusive && upperCompare >= 0)) { + currentPartition = getFollowingPartition(top); + break; + } else if ((startInclusive && lowerCompare >= 0) || (!startInclusive && lowerCompare > 0)) { + // if the term is lexicographically between the upper and lower bounds, + // then add the doc ID + docIds.add(buildOutputKey(top)); + count++; + } + source.next(); + + // make sure we check to see if we're at the end before potentially seeking back + if (!source.hasTop()) { + currentPartition = null; + break; + } + } + } + itr = docIds.iterator(); + sortComplete = true; + logger.debug("setUpDocIds completed for " + lower + "<=" + colf + "<=" + upper + " in " + (System.currentTimeMillis() - start) + " ms. Count = " + count); + } else { + logger.warn("There appear to be no records on this tablet"); + } + } catch (OutOfMemoryError e) { + logger.warn("OutOfMemory error: Count = " + count); + throw new IOException("OutOfMemory error while sorting keys"); + } + } + + protected Key buildOutputKey(Key key) { + String id = getDocID(key); + return new Key(currentPartition, outputColf, new Text((outputTerm != null ? outputTerm: colf.toString()) + "\u0000" +id)); + } + + @Override + public void next() throws IOException { + if (itr != null && itr.hasNext()) { + topKey = itr.next(); + } else { + topKey = null; + } + } + + @Override + public void seek(Range range, Collection<ByteSequence> colfs, boolean inclusive) throws IOException { + if (!sortComplete) { + overallRange = range; + source.seek(range, colfs, inclusive); + setUpDocIds(); + } + + if (range.getStartKey() != null) { + while (hasTop() && topKey.compareTo(range.getStartKey(), PartialKey.ROW_COLFAM_COLQUAL) < 0) { + next(); + } + } else { + next(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/common-query/src/main/java/ss/cloudbase/core/iterators/IteratorConstants.java ---------------------------------------------------------------------- diff --git a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/IteratorConstants.java b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/IteratorConstants.java new file mode 100644 index 0000000..0db50f6 --- /dev/null +++ b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/IteratorConstants.java @@ -0,0 +1,11 @@ +package ss.cloudbase.core.iterators; + +import org.apache.hadoop.io.Text; + +import cloudbase.core.data.Value; + +public class IteratorConstants { + public static final byte[] emptyByteArray = new byte[0]; + public static final Value emptyValue = new Value(emptyByteArray); + public static final Text emptyText = new Text(emptyByteArray); +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/common-query/src/main/java/ss/cloudbase/core/iterators/SortedMinIterator.java ---------------------------------------------------------------------- diff --git a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/SortedMinIterator.java b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/SortedMinIterator.java new file mode 100644 index 0000000..c25cc72 --- /dev/null +++ b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/SortedMinIterator.java @@ -0,0 +1,173 @@ +package ss.cloudbase.core.iterators; + +import java.io.IOException; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; + +import cloudbase.core.data.Key; +import cloudbase.core.data.PartialKey; +import cloudbase.core.data.Range; +import cloudbase.core.data.Value; +import cloudbase.core.iterators.IteratorEnvironment; +import cloudbase.core.iterators.SortedKeyValueIterator; + +/** + * Iterates over the minimum value of every term with the given prefix and parts delimeter. If, for example, you + * wanted to find each person's last known position, you would set up the following index: + * + * We want the last date instead of the first, so we'll use reverseDate in our index + * partitionX index:<prefix>_<personID>_<reverseDate>.<recordID> + * + * (where "." is actually "\u0000") + * + * <code>SortedMinIterator</code> initially seeks to index:prefix in the first partition. From there, it grabs the record + * as the "document" and then seeks to index:<whatever-the-term-was-up-to-last-delimiter> + "\uFFFD" (last unicode + * character), which then puts it at the next persion ID in our example. + * + * NOTE that this iterator gives a unique result per tablet server. You may have to process the results to determine + * the true minimum value. + * + * @author William Wall (wawall) + */ +public class SortedMinIterator extends SortedRangeIterator { + private static final Logger logger = Logger.getLogger(SortedMinIterator.class); + + /** + * The option to supply a prefix to the term combination. Defaults to "min" + */ + public static final String OPTION_PREFIX = "prefix"; + + /** + * The delimiter for the term (note that this is and must be different than the delimiter between the term and record ID). Defaults to "_" + */ + public static final String OPTION_PARTS_DELIMITER = "partsDelimiter"; + + protected String prefix = "min"; + protected String partsDelimiter = "_"; + protected boolean firstKey = true; + protected String lastPart = null; + + @Override + public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException { + super.init(source, options, env); + + prefix = options.get(OPTION_PREFIX); + String s = options.get(OPTION_PARTS_DELIMITER); + partsDelimiter = s != null ? s: "_"; + //TODO: make sure prefix and partsDelimeter is set + lower = new Text(prefix); + } + + protected String getPrefix(Key key) { + String s = key.getColumnQualifier().toString(); + int i = s.indexOf(partsDelimiter); + if (i > 0) { + return s.substring(0, i + partsDelimiter.length()); + } + return null; + } + + protected String getPart(Key key) { + String s = key.getColumnQualifier().toString(); + int i = s.lastIndexOf(partsDelimiter); + if (i > 0) { + return s.substring(0, i + 1); + } + return null; + } + + @Override + protected void setUpDocIds() throws IOException { + int count = 0; + try { + if (testOutOfMemory) { + throw new OutOfMemoryError(); + } + + long start = System.currentTimeMillis(); + if (source.hasTop()) { + SortedSet<Key> docIds = new TreeSet<Key>(); + currentPartition = getPartition(source.getTopKey()); + while (currentPartition != null) { + // seek to the prefix (aka lower) + Key lowerKey = new Key(currentPartition, colf, lower); + source.seek(new Range(lowerKey, true, null, false), indexColfSet, true); + + // if we don't have a value then quit + if (!source.hasTop()) { + currentPartition = null; + } + + Key top; + while(source.hasTop()) { + top = source.getTopKey(); + + if (overallRange != null && overallRange.getEndKey() != null) { + // see if we're past the end of the partition range + int endCompare = overallRange.getEndKey().compareTo(top, PartialKey.ROW); + if ((!overallRange.isEndKeyInclusive() && endCompare <= 0) || endCompare < 0) { + // we're done + currentPartition = null; + break; + } + } + + // make sure we're still in the right partition + if (currentPartition.compareTo(getPartition(top)) < 0) { + currentPartition.set(getPartition(top)); + break; + } + + // make sure we're still in the right column family + if (colf.compareTo(top.getColumnFamily()) < 0) { + // if not, then get the next partition + currentPartition = getFollowingPartition(top); + break; + } + + // make sure we're still in the index prefix + String p = getPrefix(top); + String part = getPart(top); + + if (p != null && p.startsWith(prefix)) { + if (part != null) { + if (!part.equals(lastPart)) { + // if the part (e.g. "lastPosition_personId_") is different, then it's valid + lastPart = part; + docIds.add(buildOutputKey(top)); + count++; + } + + // seek to the next part + lowerKey = new Key(currentPartition, colf, new Text(part + "\uFFFD")); + source.seek(new Range(lowerKey, true, null, false), indexColfSet, true); + } + } else { + // we're done in this partition + currentPartition = getFollowingPartition(top); + break; + } + + // make sure we check to see if we're at the end before potentially seeking back + if (!source.hasTop()) { + currentPartition = null; + break; + } + } + } + itr = docIds.iterator(); + sortComplete = true; + logger.debug("setUpDocIds completed in " + (System.currentTimeMillis() - start) + " ms. Count = " + count); + } else { + logger.warn("There appear to be no records on this tablet"); + } + } catch (OutOfMemoryError e) { + logger.warn("OutOfMemory error: Count = " + count); + throw new IOException("OutOfMemory error while sorting keys"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/common-query/src/main/java/ss/cloudbase/core/iterators/SortedRangeIterator.java ---------------------------------------------------------------------- diff --git a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/SortedRangeIterator.java b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/SortedRangeIterator.java new file mode 100644 index 0000000..4541230 --- /dev/null +++ b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/SortedRangeIterator.java @@ -0,0 +1,136 @@ +package ss.cloudbase.core.iterators; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; + +import cloudbase.core.data.ArrayByteSequence; +import cloudbase.core.data.ByteSequence; +import cloudbase.core.data.Key; +import cloudbase.core.data.PartialKey; +import cloudbase.core.data.Range; +import cloudbase.core.data.Value; +import cloudbase.core.iterators.IteratorEnvironment; +import cloudbase.core.iterators.SortedKeyValueIterator; + +/** + * <code>SortedRangeIterator</code> uses the insertion sort functionality of <code>IntersectionRange</code> + * to store off document keys rather than term keys. + * + * @author William Wall (wawall) + */ +public class SortedRangeIterator extends IntersectionRange { + private static final Logger logger = Logger.getLogger(SortedRangeIterator.class); + + /** Use this option to set the document column family. Defaults to "event". **/ + public static final String OPTION_DOC_COLF = "docColf"; + + /** + * Use this option to retrieve all the documents that match the UUID rather than just the first. This + * is commonly used in cell-level security models that use the column-qualifier like this: + * UUID \0 field1 [] value + * UUID \0 securedField [ALPHA] secretValue + **/ + public static final String OPTION_MULTI_DOC = "multiDoc"; + + /** The source document iterator **/ + protected SortedKeyValueIterator<Key, Value> docSource; + + /** The document column family. Defaults to "event". **/ + protected Text docColf; + protected Value docValue; + + protected boolean nextId = false; + protected Range docRange = null; + protected boolean multiDoc; + + protected Set<ByteSequence> docColfSet; + + @Override + public void next() throws IOException { + if (multiDoc && nextId) { + docSource.next(); + + // check to make sure that the docSource top is less than our max key + if (docSource.hasTop() && docRange.contains(docSource.getTopKey())) { + topKey = docSource.getTopKey(); + docValue = docSource.getTopValue(); + return; + } + } + + super.next(); + + // if we're looking for multiple documents in the doc source, then + // set the max key for our range check + if (topKey != null) { + Text row = topKey.getRow(); + Text colf = topKey.getColumnFamily(); + if (multiDoc) { + docRange = new Range( + new Key (row, colf, new Text(topKey.getColumnQualifier().toString())), + true, + new Key (row, colf, new Text(topKey.getColumnQualifier().toString() + "\u0000\uFFFD")), + true + ); + } else { + docRange = new Range(new Key (row, colf, new Text(topKey.getColumnQualifier().toString())),true, null, false); + } + } + + nextId = false; + getDocument(); + } + + @Override + public Value getTopValue() { + return docValue; + } + + @Override + protected Key buildOutputKey(Key key) { + // we want to build the document key as the output key + return new Key(currentPartition, docColf, new Text(getDocID(key))); + } + + protected void getDocument() throws IOException { + // look up the document value + if (topKey != null) { + docSource.seek(docRange, docColfSet, true); + + if (docSource.hasTop() && docRange.contains(docSource.getTopKey())) { + // found it! + topKey = docSource.getTopKey(); + docValue = docSource.getTopValue(); + nextId = true; + } else { + // does not exist or user had auths that could see the index but not the event + logger.warn("Document: " + topKey + " does not exist or user had auths for " + colf + " but not " + docColf); + docValue = IteratorConstants.emptyValue; + } + } + } + + @Override + public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException { + super.init(source, options, env); + docSource = source.deepCopy(env); + if (options.containsKey(OPTION_DOC_COLF)) { + docColf = new Text(options.get(OPTION_DOC_COLF)); + } else { + docColf = new Text("event"); + } + + if (options.containsKey(OPTION_MULTI_DOC)) { + multiDoc = Boolean.parseBoolean(options.get(OPTION_MULTI_DOC)); + } else { + multiDoc = false; + } + + docColfSet = Collections.singleton((ByteSequence) new ArrayByteSequence(docColf.getBytes(), 0, docColf.getLength())); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/common-query/src/main/java/ss/cloudbase/core/iterators/UniqueIterator.java ---------------------------------------------------------------------- diff --git a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/UniqueIterator.java b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/UniqueIterator.java new file mode 100644 index 0000000..2111bbd --- /dev/null +++ b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/UniqueIterator.java @@ -0,0 +1,95 @@ +package ss.cloudbase.core.iterators; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; + +import cloudbase.core.data.ByteSequence; +import cloudbase.core.data.Key; +import cloudbase.core.data.PartialKey; +import cloudbase.core.data.Range; +import cloudbase.core.data.Value; +import cloudbase.core.iterators.IteratorEnvironment; +import cloudbase.core.iterators.SkippingIterator; +import cloudbase.core.iterators.SortedKeyValueIterator; +import cloudbase.core.iterators.WrappingIterator; + +/** + * This iterator gets unique keys by the given depth. The depth defaults to PartialKey.ROW_COLFAM. + * + * @author William Wall + */ +public class UniqueIterator extends WrappingIterator { + public static final String OPTION_DEPTH = "depth"; + private static final Collection<ByteSequence> EMPTY_SET = Collections.emptySet(); + protected PartialKey depth; + protected Range range; + protected Key lastKey = null; + + public UniqueIterator() {} + + public UniqueIterator(UniqueIterator other) { + this.depth = other.depth; + this.range = other.range; + this.lastKey = other.lastKey; + } + + @Override + public void next() throws IOException { + consume(); + } + + protected void consume() throws IOException { + if (lastKey != null) { + int count = 0; + // next is way faster, so we'll try doing that 10 times before seeking + while (getSource().hasTop() && getSource().getTopKey().compareTo(lastKey, depth) == 0 && count < 10) { + getSource().next(); + count++; + } + if (getSource().hasTop() && getSource().getTopKey().compareTo(lastKey, depth) == 0) { + reseek(getSource().getTopKey().followingKey(depth)); + } + } + + if (getSource().hasTop()) { + lastKey = getSource().getTopKey(); + } + } + + protected void reseek(Key key) throws IOException { + if (range.afterEndKey(key)) { + range = new Range(range.getEndKey(), true, range.getEndKey(), range.isEndKeyInclusive()); + } else { + range = new Range(key, true, range.getEndKey(), range.isEndKeyInclusive()); + } + getSource().seek(range, EMPTY_SET, false); + } + + + @Override + public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException { + super.init(source, options, env); + + if (options.containsKey(OPTION_DEPTH)) { + depth = PartialKey.getByDepth(Integer.parseInt(options.get(OPTION_DEPTH))); + } else { + depth = PartialKey.ROW_COLFAM; + } + } + + @Override + public SortedKeyValueIterator<Key, Value> deepCopy(IteratorEnvironment env) { + UniqueIterator u = new UniqueIterator(this); + u.setSource(getSource().deepCopy(env)); + return u; + } + + @Override + public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { + this.range = range; + getSource().seek(range, columnFamilies, inclusive); + consume(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/common-query/src/main/java/ss/cloudbase/core/iterators/conversion/Operation.java ---------------------------------------------------------------------- diff --git a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/conversion/Operation.java b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/conversion/Operation.java new file mode 100644 index 0000000..c93c085 --- /dev/null +++ b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/conversion/Operation.java @@ -0,0 +1,109 @@ +package ss.cloudbase.core.iterators.conversion; + +public class Operation { + protected static final char[] ops = new char[] {'+', '-', '*', '/', '%', '^'}; + protected String field; + protected char op; + protected double operand; + + public Operation(String config) { + if (config.startsWith("conversion.")) { + config = config.substring("conversion.".length()); + } + + String[] parts = config.split("\\s"); + if (parts.length == 3) { + field = parts[0]; + op = parts[1].charAt(0); + if (!checkOp(op)) { + throw new IllegalArgumentException("Operator '" + op + "' is not among the supported operators: " + getOps()); + } + try { + operand = Double.parseDouble(parts[2]); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Operand '" + parts[2] + "' could not be parsed as a number."); + } + } else { + throw new IllegalArgumentException("'" + config + "' was not in the format 'field op value'"); + } + } + + public String getField() { + return field; + } + + public char getOp() { + return op; + } + + public double getOperand() { + return operand; + } + + public String execute(String value) { + if (value == null) { + return value; + } + + double v = Double.NaN; + + try { + v = Double.parseDouble(value); + } catch (NumberFormatException e) { + // we'll attempt to convert hex strings + try { + v = Integer.parseInt(value, 16); + } catch (NumberFormatException e1) { + return value; + } + } + + switch (op) { + case '+': + v += operand; + break; + case '-': + v -= operand; + break; + case '*': + v *= operand; + break; + case '/': + v /= operand; + break; + case '%': + v %= operand; + break; + case '^': + v = Math.pow(v, operand); + break; + } + + return "" + v; + } + + protected String getOps() { + StringBuilder sb = new StringBuilder(); + boolean first = true; + + for (char c: ops) { + if (first) { + sb.append(c); + first = false; + } else { + sb.append(','); + sb.append(c); + } + } + return sb.toString(); + } + + protected boolean checkOp(char op) { + for (char c: ops) { + if (op == c) { + return true; + } + } + return false; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/common-query/src/main/java/ss/cloudbase/core/iterators/filter/CBConverter.java ---------------------------------------------------------------------- diff --git a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/filter/CBConverter.java b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/filter/CBConverter.java new file mode 100644 index 0000000..7d6bedd --- /dev/null +++ b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/filter/CBConverter.java @@ -0,0 +1,117 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package ss.cloudbase.core.iterators.filter; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.log4j.Logger; + +import cloudbase.core.data.Key; +import cloudbase.core.data.Value; + +/** + * + * @author rashah + */ +public class CBConverter { + + /** The string that separates the key/value pairs in the row value. */ + public static final String OPTION_PAIR_DELIMITER = "pairDelimiter"; + /** + * The string that separates the key and the value(s) within a pair in the + * row value. + */ + public static final String OPTION_VALUE_DELIMITER = "valueDelimiter"; + /** + * Contains the pair delimiter provided through the + * <code>OPTION_PAIR_DELIMITER</code> option. + */ + protected String pairDelimiter = "\u0000"; + /** + * Contains the value delimiter provided through the + * <code>OPTION_VALUE_DELIMITER</code> option. + */ + protected String valueDelimiter = "\uFFFD"; + private static Logger LOG = Logger.getLogger(CBConverter.class); + + public CBConverter() { + } + + public Map<String, String> toMap(Key CBKey, Value CBValue) { + LOG.trace("Convert"); + + Map<String, String> return_value = new HashMap<String, String>(); + + String value = CBValue.toString(); + + // Determine the start/end of the value. + int valueStartIndex = 0; + int valueEndIndex = value.length(); + + int vLen = valueDelimiter.length(); + int fLen = pairDelimiter.length(); + LOG.debug(vLen + ", " + fLen + ", CBValue = " + CBValue.toString()); + // Parse each of the values from the row value. + while (valueStartIndex < valueEndIndex) { + int vIndex = value.indexOf(valueDelimiter, valueStartIndex); + + // If an "equals" sign was found, parse the key and value. + if (vIndex != -1) { + String key = value.substring(valueStartIndex, vIndex).trim(); + int v = value.indexOf(valueDelimiter, vIndex + vLen); + if (v == -1) { + v = valueEndIndex; + } + int f = value.indexOf(pairDelimiter, vIndex + vLen); + if (f == -1) { + f = valueEndIndex; + } + + int fIndex = Math.min(f, v); + String val = value.substring(vIndex + 1, fIndex).trim(); + valueStartIndex = f; + valueStartIndex += fLen; + return_value.put(key, val); + LOG.debug("Key {" + key + "} Value {" + val + "}"); + } + } + + return return_value; + } + + public Value toValue(Map<String, String> record) { + StringBuilder sb = new StringBuilder(); + boolean first = true; + + for (Entry<String, String> e: record.entrySet()) { + if (first) { + first = false; + } else { + sb.append(pairDelimiter); + } + sb.append(e.getKey()); + sb.append(valueDelimiter); + sb.append(e.getValue()); + } + + return new Value(sb.toString().getBytes()); + } + + public void init(Map<String, String> options) { + LOG.trace("Init"); + + pairDelimiter = options.get(OPTION_PAIR_DELIMITER); + if (pairDelimiter == null || pairDelimiter.length() == 0) { + pairDelimiter = "\u0000"; + } + + valueDelimiter = options.get(OPTION_VALUE_DELIMITER); + if (valueDelimiter == null || valueDelimiter.length() == 0) { + valueDelimiter = "\uFFFD"; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/common-query/src/main/java/ss/cloudbase/core/iterators/filter/general/GVDateFilter.java ---------------------------------------------------------------------- diff --git a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/filter/general/GVDateFilter.java b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/filter/general/GVDateFilter.java new file mode 100644 index 0000000..9063f12 --- /dev/null +++ b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/filter/general/GVDateFilter.java @@ -0,0 +1,169 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package ss.cloudbase.core.iterators.filter.general; + +import ss.cloudbase.core.iterators.filter.CBConverter; + +import java.util.Map; + +import org.apache.log4j.Logger; + +import cloudbase.core.data.Key; +import cloudbase.core.data.Value; +import cloudbase.core.iterators.filter.Filter; +import java.sql.Timestamp; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; + +/** + * This filter will take an incoming frequency and match that to a range + * contained within the cloudbase record + * + * @author Raju Shah + */ +public class GVDateFilter implements Filter +{ + + private static final Logger LOG = Logger.getLogger(GVDateFilter.class); + /** The string that indicates the key name in the row value. */ + public static final String OPTIONInTimestamp = "InDate"; + protected String TimeStamp_S = "2011-03-03 20:44:28.633"; + protected Timestamp TimeStamp_T = Timestamp.valueOf(TimeStamp_S); + public static final String OPTIONGVTimeStartField = "date-start"; + protected String DateStartField = "date-start"; + public static final String OPTIONGVTimeEndField = "date-end"; + protected String DateEndField = "date-end"; + public static final String OPTIONRBActive = "RBCurrentlyActive"; + protected String RBActive = "version"; + CBConverter cbconvertor = new CBConverter(); + + public long GetUSecFromString(String Time_S) + { + long return_value = 0; + Date d = null; + Calendar c = Calendar.getInstance(); + SimpleDateFormat df_long = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); + SimpleDateFormat df_med = new SimpleDateFormat("yyyy-MM-dd"); + + try + { + d = df_long.parse(Time_S); + } + catch (Exception e) + { + try + { + d = df_med.parse(Time_S); + } + catch (Exception e1) + { + System.out.println("Don't like it [" + Time_S + "]"); + return return_value; + } + } + c.setTime(d); + return_value = c.getTimeInMillis(); + + return return_value; + } + + /** + * Whether or not to accept this key/value entry. A map of row keys and values is parsed and then sent off to the process function to be evaluated. + * @param key The cloudbase entry key + * @param value The cloudbase entry value + * @return True if the entry should be included in the results, false otherwise + */ + @Override + public boolean accept(Key CBKey, Value CBValue) + { + LOG.trace("accept"); + + boolean return_value = false; + + Map<String, String> CBRecord = cbconvertor.toMap(CBKey, CBValue); + + // Get the Date Strings + String sStart = (String) CBRecord.get(DateStartField); + Timestamp tStart = new Timestamp(0); + String sEnd = (String) CBRecord.get(DateEndField); + Timestamp tEnd = new Timestamp(0); + + //Get Active Strings + String rbActive = (String) CBRecord.get(RBActive); + + //LOGIC + //1) If The signal is NOT ACTIVE (I.E. the active flag is specified and off) PUNT + if ( ((rbActive != null) && rbActive.equals("0")) ) + { + return return_value; + } + //1) Remaining signals are either specified ACTIVE or NOT INDICATED + + + //LOGIC + //2) Next check if both start and end are specified, then it must be inbetween + if ((sStart != null) && (sEnd != null)) + { + tStart.setTime(GetUSecFromString(sStart)); + tEnd.setTime(GetUSecFromString(sEnd)); + if (tStart.before(TimeStamp_T) && TimeStamp_T.before(tEnd)) + { + return_value = true; + } + return return_value; + } + + + //LOGIC + //3) If the start date is specified then just check against start date + if (sStart != null) + { + tStart.setTime(GetUSecFromString(sStart)); + if (tStart.before(TimeStamp_T)) + { + return_value = true; + } + return return_value; + } + + //LOGIC + //4) Return false for all others - Start Date must be present + + + return return_value; + } + + @Override + public void init(Map<String, String> options) + { + LOG.trace("init"); + cbconvertor.init(options); + + DateStartField = options.get(OPTIONGVTimeStartField); + if (DateStartField == null || DateStartField.length() == 0) + { + DateStartField = "date-start"; + } + + + DateEndField = options.get(OPTIONGVTimeEndField); + if (DateEndField == null || DateEndField.length() == 0) + { + DateEndField = "date-end"; + } + + + TimeStamp_S = options.get(OPTIONInTimestamp); + if (TimeStamp_S == null || TimeStamp_S.length() == 0) + { + TimeStamp_S = "2011-03-03T20:44:28.633Z"; + } + TimeStamp_T.setTime(GetUSecFromString(TimeStamp_S)); + + + LOG.debug("Creating Time Filter, does " + TimeStamp_S + " = " + TimeStamp_T.toString()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/common-query/src/main/java/ss/cloudbase/core/iterators/filter/general/GVFrequencyFilter.java ---------------------------------------------------------------------- diff --git a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/filter/general/GVFrequencyFilter.java b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/filter/general/GVFrequencyFilter.java new file mode 100644 index 0000000..f4c2edc --- /dev/null +++ b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/filter/general/GVFrequencyFilter.java @@ -0,0 +1,92 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package ss.cloudbase.core.iterators.filter.general; + +import ss.cloudbase.core.iterators.filter.CBConverter; + +import java.util.Map; + +import org.apache.log4j.Logger; + +import cloudbase.core.data.Key; +import cloudbase.core.data.Value; +import cloudbase.core.iterators.filter.Filter; + +/** + * This filter will take an incoming frequency and match that to a range + * contained within the cloudbase record + * + * @author Raju Shah + */ +public class GVFrequencyFilter implements Filter +{ + + private static final Logger LOG = Logger.getLogger(GVFrequencyFilter.class); + /** The string that indicates the key name in the row value. */ + public static final String OPTIONFrequency = "frequency"; + protected String Frequency_S = "0.0"; + protected Double Frequency_D = Double.parseDouble(Frequency_S); + // Initially the values in Global Vision are just Center Freq and BW + // On the second revision we may change that to the actual ranges so + // the numerical computations below can be optimized out. Then we can just use the normal OGC filters + //public static final String OPTIONGVFrequencyStart = "Frequency_Start"; + //public static final String OPTIONGVFrequencyEnd = "Frequency_End"; + public static final String OPTIONGVCenterFrequency = "frequency"; + public static final String OPTIONGVBandwidth = "bandwidth"; + CBConverter cbconvertor = new CBConverter(); + + /** + * Whether or not to accept this key/value entry. A map of row keys and values is parsed and then sent off to the process function to be evaluated. + * @param key The cloudbase entry key + * @param value The cloudbase entry value + * @return True if the entry should be included in the results, false otherwise + */ + @Override + public boolean accept(Key CBKey, Value CBValue) + { + LOG.trace("Accept"); + + boolean return_value = false; + Map<String, String> CBRecord = cbconvertor.toMap(CBKey, CBValue); + + try + { + String s1 = (String) CBRecord.get(OPTIONGVCenterFrequency); + String s2 = (String) CBRecord.get(OPTIONGVBandwidth); + + Double d1 = Double.parseDouble(s1); + Double d2 = Double.parseDouble(s2); + + if (((d1 - (0.5 * d2)) <= Frequency_D) && (Frequency_D <= (d1 + (0.5 * d2)))) + { + return_value = true; + } + + } + catch (Exception e) + { + return_value = false; + } + + return return_value; + } + + @Override + public void init(Map<String, String> options) + { + LOG.trace("Init"); + + cbconvertor.init(options); + + Frequency_S = options.get(OPTIONFrequency); + if (Frequency_S == null || Frequency_S.length() == 0) + { + Frequency_S = "0.0"; + } + + + Frequency_D = Double.parseDouble(Frequency_S); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/partition/common-query/src/main/java/ss/cloudbase/core/iterators/filter/jts/JTSFilter.java ---------------------------------------------------------------------- diff --git a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/filter/jts/JTSFilter.java b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/filter/jts/JTSFilter.java new file mode 100644 index 0000000..625b48a --- /dev/null +++ b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/filter/jts/JTSFilter.java @@ -0,0 +1,191 @@ +/* + * This is a filter for some basic Geo Functionality for data stored in a WKT format + */ +package ss.cloudbase.core.iterators.filter.jts; + +import ss.cloudbase.core.iterators.filter.CBConverter; + +import java.util.Map; + +import org.apache.log4j.Logger; + +import cloudbase.core.data.Key; +import cloudbase.core.data.Value; +import cloudbase.core.iterators.filter.Filter; + +import com.vividsolutions.jts.io.WKTReader; +import com.vividsolutions.jts.io.ParseException; +import com.vividsolutions.jts.geom.Geometry; +import com.vividsolutions.jts.geom.Point; +import com.vividsolutions.jts.geom.Coordinate; +import com.vividsolutions.jts.geom.CoordinateSequence; +import com.vividsolutions.jts.geom.GeometryFactory; +import com.vividsolutions.jts.geom.impl.CoordinateArraySequenceFactory; + +/** + * @author Raju Shah + */ +public class JTSFilter implements Filter +{ + + private static final Logger logger = Logger.getLogger(JTSFilter.class); + /** The string that indicates the key name in the row value. */ + public static final String OPTIONGeometryKeyName = "GeometryKeyName"; + protected String GeometryKeyName = "geometry-contour"; + /** The string that is the centerpoint - Latitude. */ + public static final String OPTIONCenterPointLat = "latitude"; + protected String CenterPointLat = "0.0"; + /** The string that is the centerpoint - Longitude. */ + public static final String OPTIONCenterPointLon = "longitude"; + protected String CenterPointLon = "0.0"; + public static final String OPTIONBeamIDName = "BeamID"; + protected String BeamIDKeyName = "beam-globalviewid"; + /** The string that is the centerpoint - Latitude. */ + /** The compare type for the geometric point **/ + protected Point p = null; + CBConverter cbconvertor = new CBConverter(); + + /** + * Whether or not to accept this key/value entry. A map of row keys and values is parsed and then sent off to the process function to be evaluated. + * @param key The cloudbase entry key + * @param value The cloudbase entry value + * @return True if the entry should be included in the results, false otherwise + */ + @Override + public boolean accept(Key CBKey, Value CBValue) + { + boolean return_value = false; + Map<String, String> CBRecord = cbconvertor.toMap(CBKey, CBValue); + + String s = (String) CBRecord.get(GeometryKeyName); + + // I expect the field to exist + if ((s == null) || (s.length() < 1)) + { + return return_value; + } + + // If the object cotains the word POLYGON or MULTIPOLYGON then it should be good + if (s.contains("POLYGON")) + { + //convert that string into a geometry + WKTReader reader = new WKTReader(); + try + { + Geometry WKTgeometry = reader.read(s); + + //See if the two geometries overlap + return_value = p.coveredBy(WKTgeometry); + } + catch (Exception e) + { + try + { + String beamid = (String) CBRecord.get(BeamIDKeyName); + logger.debug("Bad Beam ID ["+beamid + "]"); + //See if the two geometries overlap + } + catch (Exception ex) + { + } + + //logger.error(e, e); + return return_value; + } + } + else + { + String start_s = "SDO_ORDINATE_ARRAY("; + int start_index = s.indexOf(start_s); + if (start_index != -1) + { + start_index += start_s.length(); + + int end_index = s.indexOf(")", start_index); + + if (end_index == -1) + { + return false; + } + s = s.substring(start_index, end_index); + //System.out.println("{" + s + "}"); + + //remove every other , + // want to search for -70.838, 39.967, and replace with -70.838 39.967, + start_index = 1; + end_index = s.length(); + while ((start_index < (end_index - 1)) && (start_index > 0)) + { + start_index = s.indexOf(",", start_index); + char[] temp = s.toCharArray(); + temp[start_index] = ' '; + s = new String(temp); + //skip the next one + start_index = s.indexOf(",", start_index) + 1; + } + //System.out.println("<" + s + ">"); + + //convert that string into a geometry + WKTReader reader = new WKTReader(); + try + { + Geometry WKTgeometry = reader.read("POLYGON((" + s + "))"); + + //See if the two geometries overlap + return_value = p.coveredBy(WKTgeometry); + } + catch (Exception e) + { + //logger.error(e, e); + return return_value; + } + } + } + return return_value; + } + + @Override + public void init(Map<String, String> options) + { + cbconvertor.init(options); + + GeometryKeyName = options.get(OPTIONGeometryKeyName); + if (GeometryKeyName == null || GeometryKeyName.length() == 0) + { + GeometryKeyName = "geometry-contour"; + } + + + CenterPointLat = options.get(OPTIONCenterPointLat); + if (CenterPointLat == null || CenterPointLat.length() == 0) + { + CenterPointLat = "0.0"; + } + + + CenterPointLon = options.get(OPTIONCenterPointLon); + if (CenterPointLon == null || CenterPointLon.length() == 0) + { + CenterPointLon = "0.0"; + } + + BeamIDKeyName = options.get(OPTIONBeamIDName); + if (BeamIDKeyName == null || BeamIDKeyName.length() == 0) + { + BeamIDKeyName = "beam-globalviewid"; + } + + Double CenterPointLatD = Double.parseDouble(CenterPointLat); + Double CenterPointLonD = Double.parseDouble(CenterPointLon); + + Coordinate[] coordinates = + { + new Coordinate(CenterPointLonD, CenterPointLatD) + }; + + CoordinateSequence cs = CoordinateArraySequenceFactory.instance().create(coordinates); + GeometryFactory gf = new GeometryFactory(); + + p = new Point(cs, gf); + } +}
