http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/common-query-ext/src/test/java/SampleJTSData.java ---------------------------------------------------------------------- diff --git a/partition/common-query-ext/src/test/java/SampleJTSData.java b/partition/common-query-ext/src/test/java/SampleJTSData.java deleted file mode 100644 index 41df658..0000000 --- a/partition/common-query-ext/src/test/java/SampleJTSData.java +++ /dev/null @@ -1,171 +0,0 @@ - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.hadoop.io.Text; - -import cloudbase.core.client.BatchWriter; -import cloudbase.core.client.CBException; -import cloudbase.core.client.CBSecurityException; -import cloudbase.core.client.Connector; -import cloudbase.core.client.Instance; -import cloudbase.core.client.MultiTableBatchWriter; -import cloudbase.core.client.TableExistsException; -import cloudbase.core.client.TableNotFoundException; -import cloudbase.core.client.mock.MockInstance; -import cloudbase.core.data.Mutation; -import cloudbase.core.security.Authorizations; - -// For use in testing the Date Filter and Frequency Filter classes -public class SampleJTSData -{ - - public static int NUM_PARTITIONS = 2; - - - public static Connector initConnector() - { - Instance instance = new MockInstance(); - - try - { - Connector connector = instance.getConnector("root", "password".getBytes()); - - // set up table - connector.tableOperations().create("partition"); - - // set up root's auths - connector.securityOperations().changeUserAuthorizations("root", new Authorizations("ALPHA,BETA,GAMMA".split(","))); - - return connector; - } - catch (CBException e) - { - e.printStackTrace(); - } - catch (CBSecurityException e) - { - e.printStackTrace(); - } - catch (TableExistsException e) - { - e.printStackTrace(); - } - - return null; - } - - public static Collection<Map<String, String>> sampleData() - { - List<Map<String, String>> list = new ArrayList<Map<String, String>>(); - Map<String, String> item; - - item = new HashMap<String, String>(); - item.put("geometry-contour", "SDO_GEOMETRY(2007, 8307, NULL, SDO_ELEM_INFO_ARRAY(1, 1003, 1), SDO_ORDINATE_ARRAY(91.985, -12.108, 94.657, -12.059, 98.486, -11.988, 101.385, -12.296, 102.911, -12.569, 103.93, -12.852, 105.005, -12.531, 106.37, -12.204, 108.446, -11.503, 109.585, -10.88, 110.144, -10.207, 108.609, -9.573, 106.05, -8.535, 104.145, -7.606, 102.191, -7.522, 99.522, -7.691, 97.64, -7.606, 95.482, -7.947, 94.546, -8.084, 92.465, -8.605, 90.554, -9.366, 90.197, -10.436, 89.84, -11.729, 90.554, -12.175, 91.985, -12.108))"); - item.put("beam-name", "OPTUS D1 Ku-BAND NATIONAL A & B AUSTRALIA Downlink"); - list.add(item); - //This is Australia - //Points like 22S 135E are in the beam - - //This one is like GV - item = new HashMap<String, String>(); - item.put("beam-name", "AMC 1 Ku-BAND ZONAL NORTH AMERICA Down HV"); - item.put("geometry-contour", "SDO_GEOMETRY(2007, 8307, NULL, SDO_ELEM_INFO_ARRAY(1, 1003, 1), SDO_ORDINATE_ARRAY(-70.838, 39.967, -70.506, 40.331, -70.698, 41.679, -71.179, 42.401, -71.578, 42.38, -72.994, 42.924, -74.353, 43.242, -75.715, 43.26, -77.318, 42.981, -78.684, 42.774, -80.05, 42.491, -82.005, 42.517, -83.608, 42.312, -84.977, 41.805, -86.58, 41.525, -88.127, 41.02, -89.731, 40.741, -90.905, 41.582, -92.264, 41.9, -93.861, 42.147, -95.411, 41.341, -96.257, 40.076, -97.222, 38.737, -98.011, 37.17, -98.031, 35.593, -97.691, 34.312, -96.875, 33.25, -97.307, 31.904, -97.916, 30.561, -98.702, 29.295, -99.134, 27.949, -98.14, 26.884, -97.205, 25.821, -95.842, 25.803, -94.42, 25.784, -92.876, 26.064, -91.277, 26.043, -90.085, 26.553, -88.729, 26.01, -87.38, 24.941, -86.031, 23.797, -84.616, 23.253, -83.256, 23.01, -81.887, 23.517, -80.866, 24.555, -80.254, 26.124, -79.642, 27.693, -78.444, 28.728, -77.486, 29.542, -76.463, 30.805, -76.088, 32.377, -75.656, 33.723, -76.051, 35.305, -75.442, 36.649, -74.426, 37.386, -73.228, 38.422, -72.032, 39.232, -70.838, 39.967))"); - list.add(item); - //This is North America - //Points 39°44'21.00"N 104°59'3.00"W (Denver) are in the footprint - - item = new HashMap<String, String>(); - item.put("beam-name", "testa"); - item.put("beam-footprint", "MULTIPOLYGON (((-169.286 40.431, -164.971 39.992, -155.397 38.482, -146.566 36.233, -136.975 32.539, -128.124 27.742, -121.946 24.548, -116.849 21.339, -112.156 17.479, -109.391 14.206, -107.301 11.715, -105.274 9.477, -103.443 8.229, -102.108 7.7, -99.109 7.428, -96.681 7.745, -93.894 8.843, -89.917 11.687, -85.953 15.017, -81.148 17.266, -78.145 17.986, -75.582 17.887, -68.1 17.987, -64.696 18.493, -61.445 19.38, -60.094 20.288, -59.315 21.564, -57.026 26.51, -55.089 30.962, -53.59 33.657, -52.495 34.691, -50.468 36.204, -46.146 38.672, -41.684 40.663, -37.914 42.055, -33.806 43.082, -27.523 44.149, -21.645 44.96, -16.578 45.406, -13.807 45.771, -14.929 50.108, -16.186 53.919, -17.051 56.0, -18.388 58.824, -19.861 61.567, -21.807 64.188, -23.104 65.742, -25.28 67.904, -27.699 69.823, -28.955 70.728, -32.415 72.768, -34.968 73.998, -38.468 75.309, -48.292 73.025, -56.545 71.12, -64.023 70.474, -72.753 70.357, -78.41 70.827, -80.466 71.093, -82.412 71.876, -83.02 72.944, -83.175 74.04, -82.493 74.782, -82.412 75.552, -82.697 76.778, -84.041 78.398, -86.316 81.078, -104.098 80.819, -110.861 80.482, -115.73 80.17, -120.936 79.669, -125.84 79.176, -126.696 79.02, -134.316 77.732, -139.505 76.478, -144.823 74.826, -148.231 73.417, -151.517 71.687, -153.87 70.165, -154.536 69.672, -155.868 68.678, -156.482 68.098, -158.281 66.421, -159.716 64.804, -160.996 63.126, -161.878 61.786, -163.046 59.875, -164.369 57.254, -165.563 54.479, -166.73 51.089, -167.811 47.267, -168.581 44.041, -169.286 40.431)), ((-171.333 23.244, -171.523 18.894, -170.127 18.986, -161.559 18.555, -156.977 18.134, -153.574 18.116, -151.108 18.324, -149.947 18.45, -149.018 18.957, -148.515 19.822, -148.524 20.914, -149.018 21.766, -149.947 22.272, -152.185 23.054, -155.563 23.434, -158.075 23.75, -160.272 24.034, -162.184 24.008, -163.514 23.99, -164.595 23.976, -166.52 23.687, -169.159 23.18, -171.333 23.244)))"); - list.add(item); -// this point should be in there... - // -164 40 - somewhere near hawaii - - item = new HashMap<String, String>(); - item.put("beam-name", "testb"); - item.put("beam-footprint", "POLYGON ((-140.153 34.772, -140.341 33.272, -137.024 33.026, -132.723 32.369, -130.947 31.916, -128.664 31.225, -125.293 29.612, -121.813 27.871, -118.699 25.892, -115.589 23.79, -112.593 21.875, -109.136 19.335, -106.939 16.701, -105.006 14.97, -104.195 14.407, -103.049 13.659, -100.363 12.717, -98.063 12.288, -94.299 11.612, -90.825 11.097, -87.997 11.584, -86.815 12.109, -86.163 12.893, -85.014 14.342, -83.804 15.788, -82.104 16.998, -80.413 17.269, -78.005 16.574, -76.181 16.531, -74.65 16.68, -73.552 17.392, -72.957 18.3, -72.917 19.651, -73.526 21.325, -74.913 23.018, -76.036 24.519, -76.159 26.428, -75.741 28.447, -74.257 30.072, -72.771 31.331, -70.517 34.328, -69.638 36.04, -68.624 39.467, -68.015 41.851, -67.607 43.501, -67.548 45.528, -67.586 47.308, -68.601 49.066, -69.868 50.07, -71.621 50.778, -73.285 50.888, -74.9 50.926, -76.994 50.975, -79.332 50.846, -81.066 50.887, -83.842 51.136, -86.569 51.016, -87.95 50.864, -90.831 50.563, -94 .27 50.644, -98.068 50.733, -102.937 51.032, -106.455 51.484, -109.973 51.936, -114.119 52.402, -117.363 53.031, -119.899 53.276, -123.243 53.539, -127.017 54.427, -130.519 55.431, -133.643 56.058, -134.826 56.279, -135.354 55.029, -135.792 53.864, -136.168965072136 52.8279962761917, -136.169 52.828, -136.169497186166 52.8264970826432, -136.192 52.763, -136.556548517884 51.6453176911637, -136.703232746756 51.2152965828266, -136.781220290925 50.9919311116929, -136.793 50.959, -136.80274055379 50.9259886895048, -136.992 50.295, -137.200898649547 49.5808675274021, -137.202 49.581, -137.200962495599 49.5806459535167, -137.360714473458 49.0197683891632, -137.459 48.677, -137.462166719028 48.6649126473121, -137.471 48.634, -137.515105536699 48.4619710228524, -137.74710368039 47.5528216167105, -137.793718522461 47.3758260237407, -137.854 47.152, -137.977773277882 46.6610808974241, -138.044 46.403, -138.330834102374 45.1674736036557, -138.365 45.019, -138.38180854655 44.9421315900087, -138. 449801069917 44.6389849661384, -138.485 44.484, -138.497077239724 44.4262941289417, -138.536 44.25, -138.622787032392 43.8206200438395, -138.743816168807 43.232032787661, -138.981390224617 42.0843314825185, -138.989 42.048, -138.990605533614 42.0389442888447, -138.991 42.037, -138.997785044232 41.9994454595406, -139.004 41.969, -139.035645873997 41.7890661698517, -139.061212567475 41.6462082823816, -139.428 39.584, -139.673 38.073, -139.713116752585 37.8001474769807, -139.766 37.457, -139.764942047737 37.4567768906428, -139.898 36.573, -139.897723683259 36.5729429963606, -139.986 35.994, -140.04777653037 35.5462970502163, -140.094 35.232, -140.090797568766 35.2315144621917, -140.153 34.772))"); - list.add(item); - - - - //London is in niether - 51°30'0.00"N 0° 7'0.00"W - return list; - } - - - public static void writeDenSerialized(Connector connector, Collection<Map<String, String>> data) - { - // write sample data - MultiTableBatchWriter mtbw = connector.createMultiTableBatchWriter(200000, 10000, 1); - try - { - BatchWriter writer; - if (mtbw != null) - { - writer = mtbw.getBatchWriter("partition"); - } - else - { - writer = connector.createBatchWriter("partition", 200000, 10000, 1); - } - int count = 0; - Mutation m; - for (Map<String, String> object : data) - { - count++; - String id = (count < 10 ? "0" + count : "" + count); - Text partition = new Text("" + (count % NUM_PARTITIONS)); - - StringBuilder value = new StringBuilder(); - boolean first = true; - for (Entry<String, String> entry : object.entrySet()) - { - if (!first) - { - value.append("\u0000"); - } - else - { - first = false; - } - value.append(entry.getKey()); - value.append("\uFFFD"); - value.append(entry.getValue()); - - // write the general index mutation - m = new Mutation(partition); - m.put("index", entry.getValue() + "\u0000" + id, ""); - writer.addMutation(m); - - // write the specific index mutation - m = new Mutation(partition); - m.put("index", entry.getKey() + "//" + entry.getValue() + "\u0000" + id, ""); - writer.addMutation(m); - } - - // write the event mutation - m = new Mutation(partition); - m.put("event", id, value.toString()); - writer.addMutation(m); - } - writer.close(); - } - catch (CBException e) - { - e.printStackTrace(); - } - catch (CBSecurityException e) - { - e.printStackTrace(); - } - catch (TableNotFoundException e) - { - e.printStackTrace(); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/common-query/pom.xml ---------------------------------------------------------------------- diff --git a/partition/common-query/pom.xml b/partition/common-query/pom.xml deleted file mode 100644 index 6db84bf..0000000 --- a/partition/common-query/pom.xml +++ /dev/null @@ -1,103 +0,0 @@ -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <!--<parent>--> - <!--<groupId>sitestore</groupId>--> - <!--<artifactId>sitestore</artifactId>--> - <!--<version>2.0.0-SNAPSHOT</version>--> - <!--</parent>--> - - <parent> - <groupId>mvm.rya</groupId> - <artifactId>parent</artifactId> - <version>2.0.0-SNAPSHOT</version> - </parent> - - <groupId>sitestore.common</groupId> - <artifactId>common-query</artifactId> - <name>common-query (${project.version})</name> - <version>2.0.0-SNAPSHOT</version> - <description>A set of filters and iterators for cloudbase queries</description> - - <properties> - <skipTests>true</skipTests> - </properties> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-source-plugin</artifactId> - <version>2.1.2</version> - <executions> - <execution> - <id>attach-sources</id> - <phase>install</phase> - <goals> - <goal>jar</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <version>2.7.2</version> - <configuration> - <skipTests>${skipTests}</skipTests> - </configuration> - </plugin> - </plugins> - </build> - <!--<scm>--> - <!--<connection>${scmLocation}/tto/ss/common/trunk/common-query</connection>--> - <!--</scm>--> - <dependencies> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - </dependency> - <dependency> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - <version>1.2.14</version> - </dependency> - <dependency> - <groupId>cloudbase</groupId> - <artifactId>cloudbase-core</artifactId> - </dependency> - <dependency> - <groupId>commons-logging</groupId> - <artifactId>commons-logging</artifactId> - <version>1.0.4</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - </dependency> - <dependency> - <groupId>cloudbase</groupId> - <artifactId>cloudbase-start</artifactId> - </dependency> - <dependency> - <groupId>commons-codec</groupId> - <artifactId>commons-codec</artifactId> - <version>1.3</version> - </dependency> - <dependency> - <groupId>org.apache.thrift</groupId> - <artifactId>thrift</artifactId> - </dependency> - <dependency> - <groupId>com.vividsolutions</groupId> - <artifactId>jts</artifactId> - <version>1.11</version> - </dependency> - <dependency> - <groupId>xerces</groupId> - <artifactId>xercesImpl</artifactId> - <version>2.8.1</version> - </dependency> - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/common-query/src/main/java/ss/cloudbase/core/iterators/CellLevelFilteringIterator.java ---------------------------------------------------------------------- diff --git a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/CellLevelFilteringIterator.java b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/CellLevelFilteringIterator.java deleted file mode 100644 index e0126fa..0000000 --- a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/CellLevelFilteringIterator.java +++ /dev/null @@ -1,163 +0,0 @@ -package ss.cloudbase.core.iterators; - -import java.io.IOException; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -import org.apache.hadoop.io.Text; - -import ss.cloudbase.core.iterators.filter.ogc.OGCFilter; -import cloudbase.core.data.ByteSequence; -import cloudbase.core.data.Key; -import cloudbase.core.data.Range; -import cloudbase.core.data.Value; -import cloudbase.core.iterators.IteratorEnvironment; -import cloudbase.core.iterators.SortedKeyValueIterator; -import cloudbase.core.iterators.WrappingIterator; - -public class CellLevelFilteringIterator extends WrappingIterator { - private static final Collection<ByteSequence> EMPTY_SET = Collections.emptySet(); - - /** The OGC Filter string **/ - public static final String OPTION_FILTER = "filter"; - - /** The character or characters that defines the end of the field in the column qualifier. Defaults to '@' **/ - public static final String OPTION_FIELD_END = "fieldEnd"; - - protected SortedKeyValueIterator<Key, Value> checkSource; - - protected Map<String, Boolean> cache = new HashMap<String, Boolean>(); - - protected OGCFilter filter; - - protected String fieldEnd = "@"; - - public CellLevelFilteringIterator() {} - - public CellLevelFilteringIterator(CellLevelFilteringIterator other, IteratorEnvironment env) { - setSource(other.getSource().deepCopy(env)); - checkSource = other.checkSource.deepCopy(env); - cache = other.cache; - fieldEnd = other.fieldEnd; - } - - @Override - public CellLevelFilteringIterator deepCopy(IteratorEnvironment env) { - return new CellLevelFilteringIterator(this, env); - } - - @Override - public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException { - super.init(source, options, env); - if (source instanceof GMDenIntersectingIterator) { - checkSource = ((GMDenIntersectingIterator) source).docSource.deepCopy(env); - } else if (source instanceof SortedRangeIterator) { - checkSource = ((SortedRangeIterator) source).docSource.deepCopy(env); - } else { - checkSource = source.deepCopy(env); - } - filter = new OGCFilter(); - filter.init(options); - - if (options.containsKey(OPTION_FIELD_END)) { - fieldEnd = options.get(OPTION_FIELD_END); - } - } - - @Override - public void next() throws IOException { - getSource().next(); - findTop(); - } - - protected String getDocId(Key key) { - String colq = key.getColumnQualifier().toString(); - int i = colq.indexOf("\u0000"); - if (i == -1) { - i = colq.length(); - } - return colq.substring(0, i); - } - - protected Key getRecordStartKey(Key key, String docId) { - return new Key(key.getRow(), key.getColumnFamily(), new Text(docId + "\u0000")); - } - - protected Key getRecordEndKey(Key key, String docId) { - return new Key(key.getRow(), key.getColumnFamily(), new Text(docId + "\u0000\uFFFD")); - } - - protected String getField(Key key, Value value) { - String colq = key.getColumnQualifier().toString(); - int i = colq.indexOf("\u0000"); - if (i == -1) { - return null; - } - - int j = colq.indexOf(fieldEnd, i + 1); - if (j == -1) { - j = colq.length(); - } - - return colq.substring(i + 1, j); - } - - protected String getValue(Key key, Value value) { - return value.toString(); - } - - protected void findTop() throws IOException { - boolean goodKey; - String docId; - Map<String, String> record = new HashMap<String, String>(); - - while (getSource().hasTop()) { - docId = getDocId(getSource().getTopKey()); - - // if the document is in the cache, then we have already scanned it - if (cache.containsKey(docId)) { - goodKey = cache.get(docId); - } else { - // we need to scan the whole record into a map and evaluate the filter - - // seek the check source to the beginning of the record - Range range = new Range( - getRecordStartKey(getSource().getTopKey(), docId), - true, - getRecordEndKey(getSource().getTopKey(), docId), - true - ); - - checkSource.seek(range, EMPTY_SET, false); - - // read in the record to the map - record.clear(); - while (checkSource.hasTop()) { - String field = getField(checkSource.getTopKey(), checkSource.getTopValue()); - if (field != null) { - record.put(field, getValue(checkSource.getTopKey(), checkSource.getTopValue())); - } - checkSource.next(); - } - - // evaluate the filter - goodKey = filter.accept(record); - - // cache the result so that we don't do this for every cell - cache.put(docId, goodKey); - } - - if (goodKey==true) - return; - getSource().next(); - } - } - - @Override - public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { - getSource().seek(range, columnFamilies, inclusive); - findTop(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/common-query/src/main/java/ss/cloudbase/core/iterators/CellLevelRecordIterator.java ---------------------------------------------------------------------- diff --git a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/CellLevelRecordIterator.java b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/CellLevelRecordIterator.java deleted file mode 100644 index 1f59882..0000000 --- a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/CellLevelRecordIterator.java +++ /dev/null @@ -1,144 +0,0 @@ -package ss.cloudbase.core.iterators; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import org.apache.hadoop.io.Text; - -import ss.cloudbase.core.iterators.filter.CBConverter; - -import cloudbase.core.data.Key; -import cloudbase.core.data.Range; -import cloudbase.core.data.Value; -import cloudbase.core.iterators.IteratorEnvironment; -import cloudbase.core.iterators.SkippingIterator; -import cloudbase.core.iterators.SortedKeyValueIterator; - -public class CellLevelRecordIterator extends SkippingIterator { - public static final String OPTION_FIELD_END = "fieldEnd"; - public static final String OPTION_MULTIPLE_DELIMITER = "multipleDelimiter"; - - protected String multipleDelimiter = ","; - - protected Key topKey; - protected Value topValue; - protected String fieldEnd = "@"; - protected String docId = null; - protected CBConverter converter = new CBConverter(); - - @Override - public SortedKeyValueIterator<Key, Value> deepCopy(IteratorEnvironment env) { - CellLevelRecordIterator itr = new CellLevelRecordIterator(); - itr.setSource(this.getSource().deepCopy(env)); - itr.fieldEnd = this.fieldEnd; - return itr; - } - - @Override - public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException { - super.init(source, options, env); - converter.init(options); - if (options.containsKey(OPTION_FIELD_END)) { - fieldEnd = options.get(OPTION_FIELD_END); - } - - if (options.containsKey(OPTION_MULTIPLE_DELIMITER)) { - multipleDelimiter = options.get(OPTION_MULTIPLE_DELIMITER); - } - } - - @Override - public void next() throws IOException { - consume(); - } - - @Override - public boolean hasTop() { - return getSource().hasTop() || topKey != null || topValue != null; - } - - @Override - public Key getTopKey() { - return topKey; - } - - @Override - public Value getTopValue() { - return topValue; - } - - protected String getDocId(Key key) { - String colq = key.getColumnQualifier().toString(); - int i = colq.indexOf("\u0000"); - if (i == -1) { - i = colq.length(); - } - return colq.substring(0, i); - } - - protected Key buildTopKey(Key key, String docId) { - return new Key(key.getRow(), key.getColumnFamily(), new Text(docId), key.getColumnVisibility(), key.getTimestamp()); - } - - protected String getField(Key key, Value value) { - String colq = key.getColumnQualifier().toString(); - int i = colq.indexOf("\u0000"); - if (i == -1) { - return null; - } - - int j = colq.indexOf(fieldEnd, i + 1); - if (j == -1) { - j = colq.length(); - } - - return colq.substring(i + 1, j); - } - - protected String getValue(Key key, Value value) { - return value.toString(); - } - - protected Key getRecordStartKey(Key key, String docId) { - return new Key(key.getRow(), key.getColumnFamily(), new Text(docId)); - } - - protected Key getRecordEndKey(Key key, String docId) { - return new Key(key.getRow(), key.getColumnFamily(), new Text(docId + "\u0000\uFFFD")); - } - - @Override - protected void consume() throws IOException { - // build the top key - if (getSource().hasTop()) { - docId = getDocId(getSource().getTopKey()); - topKey = buildTopKey(getSource().getTopKey(), docId); - - Range range = new Range( - getRecordStartKey(getSource().getTopKey(), docId), - true, - getRecordEndKey(getSource().getTopKey(), docId), - true - ); - - Map<String, String> record = new HashMap<String, String>(); - while (getSource().hasTop() && range.contains(getSource().getTopKey())) { - String field = getField(getSource().getTopKey(), getSource().getTopValue()); - if (field != null) { - if (record.get(field) == null) { - record.put(field, getValue(getSource().getTopKey(), getSource().getTopValue())); - } else { - record.put(field, record.get(field) + multipleDelimiter + getValue(getSource().getTopKey(), getSource().getTopValue())); - } - } - getSource().next(); - } - - topValue = converter.toValue(record); - } else { - topKey = null; - topValue = null; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/common-query/src/main/java/ss/cloudbase/core/iterators/ConversionIterator.java ---------------------------------------------------------------------- diff --git a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/ConversionIterator.java b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/ConversionIterator.java deleted file mode 100644 index 5e75334..0000000 --- a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/ConversionIterator.java +++ /dev/null @@ -1,151 +0,0 @@ -package ss.cloudbase.core.iterators; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; - -import ss.cloudbase.core.iterators.conversion.Operation; -import ss.cloudbase.core.iterators.filter.CBConverter; -import cloudbase.core.data.Key; -import cloudbase.core.data.Value; -import cloudbase.core.iterators.IteratorEnvironment; -import cloudbase.core.iterators.SortedKeyValueIterator; -import cloudbase.core.iterators.WrappingIterator; - -public class ConversionIterator extends WrappingIterator { - public static final String OPTION_CONVERSIONS = "conversions"; - public static final String OPTION_MULTI_DOC = "multiDoc"; - /** The character or characters that defines the end of the field in the column qualifier. Defaults to '@' **/ - public static final String OPTION_FIELD_END = "fieldEnd"; - - protected CBConverter serializedConverter; - protected Map<String, Operation> conversions; - protected boolean multiDoc = false; - protected String fieldEnd = "@"; - - public ConversionIterator() {} - - public ConversionIterator(ConversionIterator other) { - this.conversions.putAll(other.conversions); - this.multiDoc = other.multiDoc; - this.serializedConverter = other.serializedConverter; - } - - @Override - public SortedKeyValueIterator<Key, Value> deepCopy(IteratorEnvironment env) { - return new ConversionIterator(this); - } - - @Override - public Value getTopValue() { - if (hasTop()) { - if (conversions != null) { - if (multiDoc) { - return multiDocConvert(super.getTopValue()); - } else { - return convert(super.getTopValue()); - } - } - } - return super.getTopValue(); - } - - protected String getMultiDocField(Key key) { - String colq = key.getColumnQualifier().toString(); - int start = colq.indexOf("\u0000"); - if (start == -1) { - return null; - } - - int end = colq.indexOf(fieldEnd, start + 1); - if (end == -1) { - end = colq.length(); - } - - return colq.substring(start + 1, end); - } - - protected Value multiDocConvert(Value value) { - String field = getMultiDocField(getTopKey()); - if (conversions.containsKey(field)) { - String newValue = conversions.get(field).execute(value.toString()); - return new Value(newValue.getBytes()); - } else { - return value; - } - } - - protected Value convert(Value value) { - Map<String, String> record = serializedConverter.toMap(getTopKey(), value); - - for (String field: record.keySet()) { - if (conversions.containsKey(field)) { - record.put(field, conversions.get(field).execute(record.get(field))); - } - } - - return serializedConverter.toValue(record); - } - - @Override - public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException { - super.init(source, options, env); - - if (options.containsKey(OPTION_MULTI_DOC)) { - multiDoc = Boolean.parseBoolean(options.get(OPTION_MULTI_DOC)); - } else { - multiDoc = false; - } - - if (!multiDoc) { - serializedConverter = new CBConverter(); - serializedConverter.init(options); - } - - if (options.containsKey(OPTION_FIELD_END)) { - fieldEnd = options.get(OPTION_FIELD_END); - } - - if (options.containsKey(OPTION_CONVERSIONS)) { - Operation[] ops = decodeConversions(options.get(OPTION_CONVERSIONS)); - conversions = new HashMap<String, Operation> (); - - for (Operation o: ops) { - conversions.put(o.getField(), o); - } - } - } - - /** - * Encodes a set of conversion strings for use with the OPTION_CONVERSIONS options. Each conversion - * string should be in the format 'field op value' (whitespace necessary), where op is +, -, *, /, %, or - * ^ and the value is a number. - * - * @param conversions - * @return The encoded value to use with OPTION_CONVERSIONS - */ - public static String encodeConversions(String[] conversions) { - StringBuilder sb = new StringBuilder(); - boolean first = true; - for (String conversion: conversions) { - if (first) { - first = false; - } else { - sb.append("\u0000"); - } - sb.append(conversion); - } - return sb.toString(); - } - - public static Operation[] decodeConversions(String conversions) { - String[] configs = conversions.split("\u0000"); - Operation[] ops = new Operation[configs.length]; - - for (int i = 0; i < configs.length; i++) { - ops[i] = new Operation(configs[i]); - } - - return ops; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/common-query/src/main/java/ss/cloudbase/core/iterators/GMDenIntersectingIterator.java ---------------------------------------------------------------------- diff --git a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/GMDenIntersectingIterator.java b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/GMDenIntersectingIterator.java deleted file mode 100644 index 7ec401f..0000000 --- a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/GMDenIntersectingIterator.java +++ /dev/null @@ -1,363 +0,0 @@ -// Dear Cloudbase, -// Use protected fields/methods as much as possible in APIs. -// Love, -// Will - -// since the IntersectingIterator/FamilyIntersectingIterator classes are stingy with their fields, we have to use -// the exact same package name to get at currentPartition and currentDocID -package ss.cloudbase.core.iterators; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.io.Text; -import org.apache.log4j.Logger; - -import ss.cloudbase.core.iterators.IntersectingIterator.TermSource; - -import cloudbase.core.data.ArrayByteSequence; -import cloudbase.core.data.ByteSequence; -import cloudbase.core.data.Key; -import cloudbase.core.data.Range; -import cloudbase.core.data.Value; -import cloudbase.core.iterators.IteratorEnvironment; -import cloudbase.core.iterators.SortedKeyValueIterator; - -/** - * This class is a copy of FamilyIntersectingIterator with a few minor changes. It assumes a table structure like the following: - * <table> - * <tr><th>Row</th><th>Column Family</th><th>Column Qualifier</th><th>Value</th></tr> - * <tr><td>Partition1</td><td>event</td><td>UUID</td><td>The record value</td></tr> - * <tr><td>Partition1</td><td>index</td><td>term\u0000UUID</td><td></td></tr> - * </table> - * - * @author William Wall - * - */ -public class GMDenIntersectingIterator extends IntersectingIterator { - private static final Logger logger = Logger.getLogger(GMDenIntersectingIterator.class); - - public static final Text DEFAULT_INDEX_COLF = new Text("i"); - public static final Text DEFAULT_DOC_COLF = new Text("e"); - - public static final String indexFamilyOptionName = "indexFamily"; - public static final String docFamilyOptionName = "docFamily"; - - protected static Text indexColf = DEFAULT_INDEX_COLF; - protected static Text docColf = DEFAULT_DOC_COLF; - protected static Set<ByteSequence> indexColfSet; - protected static Set<ByteSequence> docColfSet; - - protected static final byte[] nullByte = {0}; - - protected SortedKeyValueIterator<Key,Value> docSource; - - /** - * Use this option to retrieve all the documents that match the UUID rather than just the first. This - * is commonly used in cell-level security models that use the column-qualifier like this: - * UUID \0 field1 [] value - * UUID \0 securedField [ALPHA] secretValue - **/ - public static final String OPTION_MULTI_DOC = "multiDoc"; - - /** - * Use this option to turn off document lookup. - */ - public static final String OPTION_DOC_LOOKUP = "docLookup"; - - protected boolean multiDoc = false; - protected boolean doDocLookup = true; - protected Range docRange = null; - protected boolean nextId = false; - - @Override - public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException { - if (options.containsKey(indexFamilyOptionName)) - indexColf = new Text(options.get(indexFamilyOptionName)); - if (options.containsKey(docFamilyOptionName)) - docColf = new Text(options.get(docFamilyOptionName)); - docSource = source.deepCopy(env); - indexColfSet = Collections.singleton((ByteSequence)new ArrayByteSequence(indexColf.getBytes(),0,indexColf.getLength())); - - if (options.containsKey(OPTION_MULTI_DOC)) { - multiDoc = Boolean.parseBoolean(options.get(OPTION_MULTI_DOC)); - } - - if (options.containsKey(OPTION_DOC_LOOKUP)) { - doDocLookup = Boolean.parseBoolean(options.get(OPTION_DOC_LOOKUP)); - } - - if (!doDocLookup) { - // it makes no sense to turn on multiDoc if doDocLookup is off - multiDoc = false; - } - - // remove any range terms - Text[] originalTerms = decodeColumns(options.get(columnFamiliesOptionName)); - boolean[] originalBooleans = decodeBooleans(options.get(notFlagOptionName)); - - List<Text> terms = new ArrayList<Text>(); - List<Boolean> termBooleans = new ArrayList<Boolean>(); - List<Text> ranges = new ArrayList<Text>(); - List<Boolean> rangeBooleans = new ArrayList<Boolean>(); - - boolean boolsExist = originalBooleans != null && originalBooleans.length == originalTerms.length; - - for (int i = 0; i < originalTerms.length; i++) { - if (isRangeTerm(originalTerms[i])) { - ranges.add(originalTerms[i]); - if (boolsExist) { - rangeBooleans.add(originalBooleans[i]); - } else { - rangeBooleans.add(false); - } - } else { - terms.add(originalTerms[i]); - - if (boolsExist) { - termBooleans.add(originalBooleans[i]); - } else { - termBooleans.add(false); - } - } - } - - boolean[] bools = new boolean[termBooleans.size()]; - for (int i = 0; i < termBooleans.size(); i++) { - bools[i] = termBooleans.get(i).booleanValue(); - } - - boolean[] rangeBools = new boolean[rangeBooleans.size()]; - for (int i = 0; i < rangeBooleans.size(); i++) { - rangeBools[i] = rangeBooleans.get(i).booleanValue(); - } - - // put the modified term/boolean lists back in the options - - if (terms.size() < 2) { - // the intersecting iterator will choke on these, so we'll set it up ourselves - if (terms.size() == 1) { - sources = new TermSource[1]; - sources[0] = new TermSource(source, terms.get(0)); - } - } else { - options.put(columnFamiliesOptionName, encodeColumns(terms.toArray(new Text[terms.size()]))); - if (termBooleans.size() > 0) { - options.put(notFlagOptionName, encodeBooleans(bools)); - } - - super.init(source, options, env); - } - - // add the range terms - if (ranges.size() > 0) { - - TermSource[] localSources; - - int offset = 0; - if (sources != null) { - localSources = new TermSource[sources.length + ranges.size()]; - - // copy array - for (int i = 0; i < sources.length; i++) { - localSources[i] = sources[i]; - } - - offset = sources.length; - } else { - localSources = new TermSource[ranges.size()]; - } - - for (int i = 0; i < ranges.size(); i++) { - IntersectionRange ri = new IntersectionRange(); - ri.init(source.deepCopy(env), getRangeIteratorOptions(ranges.get(i)), env); - localSources[i + offset] = new TermSource(ri, ri.getOutputTerm(), rangeBools[i]); - } - - sources = localSources; - } - - sourcesCount = sources.length; - - if (sourcesCount < 2) { - throw new IOException("GMDenIntersectingIterator requires two or more terms"); - } - - docColfSet = Collections.singleton((ByteSequence)new ArrayByteSequence(docColf.getBytes(),0,docColf.getLength())); - } - - @Override - protected Key buildKey(Text partition, Text term, Text docID) { - Text colq = new Text(term); - colq.append(nullByte, 0, 1); - colq.append(docID.getBytes(), 0, docID.getLength()); - return new Key(partition, indexColf, colq); - } - - @Override - protected Key buildKey(Text partition, Text term) { - Text colq = new Text(term); - return new Key(partition, indexColf, colq); - } - - @Override - protected Text getTerm(Key key) { - if (indexColf.compareTo(key.getColumnFamily().getBytes(),0,indexColf.getLength())< 0) { - // We're past the index column family, so return a term that will sort lexicographically last. - // The last unicode character should suffice - return new Text("\uFFFD"); - } - Text colq = key.getColumnQualifier(); - int zeroIndex = colq.find("\0"); - Text term = new Text(); - term.set(colq.getBytes(),0,zeroIndex); - return term; - } - - @Override - public SortedKeyValueIterator<Key, Value> deepCopy(IteratorEnvironment env) { - GMDenIntersectingIterator newItr = new GMDenIntersectingIterator(); - if(sources != null) { - newItr.sourcesCount = sourcesCount; - newItr.sources = new TermSource[sourcesCount]; - for(int i = 0; i < sourcesCount; i++) { - newItr.sources[i] = new TermSource(sources[i].iter.deepCopy(env), sources[i].term); - } - } - newItr.currentDocID = currentDocID; - newItr.currentPartition = currentPartition; - newItr.docRange = docRange; - newItr.docSource = docSource.deepCopy(env); - newItr.inclusive = inclusive; - newItr.multiDoc = multiDoc; - newItr.nextId = nextId; - newItr.overallRange = overallRange; - return newItr; - } - - @Override - public void seek(Range range, Collection<ByteSequence> seekColumnFamilies, boolean inclusive) throws IOException { - super.seek(range, indexColfSet, true); - - } - - @Override - protected Text getDocID(Key key) { - Text colq = key.getColumnQualifier(); - int firstZeroIndex = colq.find("\0"); - if (firstZeroIndex < 0) { - throw new IllegalArgumentException("bad docid: "+key.toString()); - } - Text docID = new Text(); - try { - docID.set(colq.getBytes(),firstZeroIndex+1, colq.getBytes().length - firstZeroIndex - 1); - } catch (ArrayIndexOutOfBoundsException e) { - throw new IllegalArgumentException("bad indices for docid: "+key.toString()+" "+firstZeroIndex +" " + (colq.getBytes().length - firstZeroIndex - 1)); - } - return docID; - } - - protected Key buildStartKey() { - return new Key(currentPartition, docColf, currentDocID); - } - - protected Key buildEndKey() { - if (multiDoc) { - return new Key(currentPartition, docColf, new Text(currentDocID.toString() + "\u0000\uFFFD")); - } - return null; - } - - @Override - public void next() throws IOException { - if (multiDoc && nextId) { - docSource.next(); - - // check to make sure that the docSource top is less than our max key - if (docSource.hasTop() && docRange.contains(docSource.getTopKey())) { - topKey = docSource.getTopKey(); - value = docSource.getTopValue(); - return; - } - } - - nextId = false; - super.next(); - } - - @Override - protected void advanceToIntersection() throws IOException { - super.advanceToIntersection(); - - if (topKey==null || !doDocLookup) - return; - - if (logger.isTraceEnabled()) logger.trace("using top key to seek for doc: "+topKey.toString()); - docRange = new Range(buildStartKey(), true, buildEndKey(), false); - docSource.seek(docRange, docColfSet, true); - logger.debug("got doc key: "+docSource.getTopKey().toString()); - if (docSource.hasTop()&& docRange.contains(docSource.getTopKey())) { - value = docSource.getTopValue(); - } - logger.debug("got doc value: "+value.toString()); - - if (docSource.hasTop()) { - if (multiDoc && topKey != null) { - nextId = true; - } - topKey = docSource.getTopKey(); - } - } - - - public boolean isRangeTerm(Text term) { - return term.toString().startsWith("range\u0000"); - } - - protected Map<String, String> getRangeIteratorOptions(Text config) { - // we want the keys from Range Iterators to look like this: - // range|colf|lower|includeLower|upper|includeUpper - // e.g. range|geo|21332|true|21333|false - - // and we'll output a key like this: - // partition index:geo\0UUID ... - - - String[] range = config.toString().split("\u0000"); - Map<String, String> options = new HashMap<String, String>(); - options.put(IntersectionRange.OPTION_COLF, range[1]); - options.put(IntersectionRange.OPTION_OUTPUT_TERM, range[1]); - options.put(IntersectionRange.OPTION_LOWER_BOUND, range[2]); - options.put(IntersectionRange.OPTION_START_INCLUSIVE, range[3]); - options.put(IntersectionRange.OPTION_UPPER_BOUND, range[4]); - options.put(IntersectionRange.OPTION_END_INCLUSIVE, range[5]); - options.put(IntersectionRange.OPTION_OUTPUT_COLF, indexColf.toString()); - return options; - } - - /** - * Builds a range term for use with the IntersectingIterator - * @param colf The column family to search - * @param start The start of the range - * @param includeStart Whether the start of the range is inclusive or not - * @param end The end of the range - * @param includeEnd Whether the end of the range is inclusive or not - * @return A String formatted for use as a term a GMDenIntersectingIterator - */ - public static String getRangeTerm(String colf, String start, boolean includeStart, String end, boolean includeEnd) { - StringBuilder sb = new StringBuilder(); - sb.append("range\u0000"); - sb.append(colf).append("\u0000"); - sb.append(start).append("\u0000"); - sb.append(includeStart ? "true": "false").append("\u0000"); - sb.append(end).append("\u0000"); - sb.append(includeEnd ? "true": "false").append("\u0000"); - return sb.toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/common-query/src/main/java/ss/cloudbase/core/iterators/IntersectingIterator.java ---------------------------------------------------------------------- diff --git a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/IntersectingIterator.java b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/IntersectingIterator.java deleted file mode 100644 index 3b4961f..0000000 --- a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/IntersectingIterator.java +++ /dev/null @@ -1,557 +0,0 @@ -package ss.cloudbase.core.iterators; - -import java.io.IOException; -import java.util.Collection; -import java.util.Map; - -import org.apache.commons.codec.binary.Base64; -import org.apache.hadoop.io.Text; -import org.apache.log4j.Logger; - -import cloudbase.core.data.ByteSequence; -import cloudbase.core.data.Key; -import cloudbase.core.data.PartialKey; -import cloudbase.core.data.Range; -import cloudbase.core.data.Value; -import cloudbase.core.iterators.IteratorEnvironment; -import cloudbase.core.iterators.SortedKeyValueIterator; -import cloudbase.core.util.TextUtil; - -public class IntersectingIterator implements SortedKeyValueIterator<Key,Value> { - - protected Text nullText = new Text(); - - protected Text getPartition(Key key) { - return key.getRow(); - } - - protected Text getTerm(Key key) { - return key.getColumnFamily(); - } - - protected Text getDocID(Key key) { - return key.getColumnQualifier(); - } - - protected Key buildKey(Text partition, Text term) { - return new Key(partition,(term == null) ? nullText : term); - } - - protected Key buildKey(Text partition, Text term, Text docID) { - return new Key(partition,(term == null) ? nullText : term, docID); - } - - protected Key buildFollowingPartitionKey(Key key) { - return key.followingKey(PartialKey.ROW); - } - - protected static final Logger log = Logger.getLogger(IntersectingIterator.class); - - protected static class TermSource { - public SortedKeyValueIterator<Key,Value> iter; - public Text term; - public boolean notFlag; - - public TermSource(TermSource other) { - this.iter = other.iter; - this.term = other.term; - this.notFlag = other.notFlag; - } - - public TermSource(SortedKeyValueIterator<Key,Value> iter, Text term) { - this.iter = iter; - this.term = term; - this.notFlag = false; - } - public TermSource(SortedKeyValueIterator<Key,Value> iter, Text term, boolean notFlag) { - this.iter = iter; - this.term = term; - this.notFlag = notFlag; - } - - public String getTermString() { - return (this.term == null) ? new String("Iterator") : this.term.toString(); - } - } - - protected TermSource[] sources; - protected int sourcesCount = 0; - - protected Range overallRange; - - // query-time settings - protected Text currentPartition = null; - protected Text currentDocID = new Text(emptyByteArray); - protected static final byte [] emptyByteArray = new byte[0]; - - protected Key topKey = null; - protected Value value = new Value(emptyByteArray); - - protected Collection<ByteSequence> seekColumnFamilies; - - protected boolean inclusive; - - - public IntersectingIterator() - {} - - @Override - public SortedKeyValueIterator<Key, Value> deepCopy(IteratorEnvironment env) { - return new IntersectingIterator(this, env); - } - - public IntersectingIterator(IntersectingIterator other, IteratorEnvironment env) - { - if(other.sources != null) - { - sourcesCount = other.sourcesCount; - sources = new TermSource[sourcesCount]; - for(int i = 0; i < sourcesCount; i++) - { - sources[i] = new TermSource(other.sources[i].iter.deepCopy(env), other.sources[i].term); - } - } - } - - @Override - public Key getTopKey() { - return topKey; - } - - @Override - public Value getTopValue() { - // we don't really care about values - return value; - } - - @Override - public boolean hasTop() { - return currentPartition != null; - } - - // precondition: currentRow is not null - private boolean seekOneSource(int sourceID) throws IOException - { - // find the next key in the appropriate column family that is at or beyond the cursor (currentRow, currentCQ) - // advance the cursor if this source goes beyond it - // return whether we advanced the cursor - - // within this loop progress must be made in one of the following forms: - // - currentRow or currentCQ must be increased - // - the given source must advance its iterator - // this loop will end when any of the following criteria are met - // - the iterator for the given source is pointing to the key (currentRow, columnFamilies[sourceID], currentCQ) - // - the given source is out of data and currentRow is set to null - // - the given source has advanced beyond the endRow and currentRow is set to null - boolean advancedCursor = false; - - if (sources[sourceID].notFlag) - { - while(true) - { - if(sources[sourceID].iter.hasTop() == false) - { - // an empty column that you are negating is a valid condition - break; - } - // check if we're past the end key - int endCompare = -1; - // we should compare the row to the end of the range - if(overallRange.getEndKey() != null) - { - endCompare = overallRange.getEndKey().getRow().compareTo(sources[sourceID].iter.getTopKey().getRow()); - if((!overallRange.isEndKeyInclusive() && endCompare <= 0) || endCompare < 0) - { - // an empty column that you are negating is a valid condition - break; - } - } - int partitionCompare = currentPartition.compareTo(getPartition(sources[sourceID].iter.getTopKey())); - // check if this source is already at or beyond currentRow - // if not, then seek to at least the current row - - if(partitionCompare > 0) - { - // seek to at least the currentRow - Key seekKey = buildKey(currentPartition,sources[sourceID].term); - sources[sourceID].iter.seek(new Range(seekKey,true, null, false), seekColumnFamilies, inclusive); - continue; - } - // check if this source has gone beyond currentRow - // if so, this is a valid condition for negation - if(partitionCompare < 0) - { - break; - } - // we have verified that the current source is positioned in currentRow - // now we must make sure we're in the right columnFamily in the current row - // Note: Iterators are auto-magically set to the correct columnFamily - if(sources[sourceID].term != null) - { - int termCompare = sources[sourceID].term.compareTo(getTerm(sources[sourceID].iter.getTopKey())); - // check if this source is already on the right columnFamily - // if not, then seek forwards to the right columnFamily - if(termCompare > 0) - { - Key seekKey = buildKey(currentPartition,sources[sourceID].term,currentDocID); - sources[sourceID].iter.seek(new Range(seekKey,true,null,false), seekColumnFamilies, inclusive); - continue; - } - // check if this source is beyond the right columnFamily - // if so, then this is a valid condition for negating - if(termCompare < 0) - { - break; - } - } - - // we have verified that we are in currentRow and the correct column family - // make sure we are at or beyond columnQualifier - Text docID = getDocID(sources[sourceID].iter.getTopKey()); - int docIDCompare = currentDocID.compareTo(docID); - // If we are past the target, this is a valid result - if(docIDCompare < 0) - { - break; - } - // if this source is not yet at the currentCQ then advance in this source - if(docIDCompare > 0) - { - // seek forwards - Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID); - sources[sourceID].iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive); - continue; - } - // if we are equal to the target, this is an invalid result. - // Force the entire process to go to the next row. - // We are advancing column 0 because we forced that column to not contain a ! - // when we did the init() - if(docIDCompare == 0) - { - sources[0].iter.next(); - advancedCursor = true; - break; - } - } - } - else - { - while(true) - { - if(sources[sourceID].iter.hasTop() == false) - { - currentPartition = null; - // setting currentRow to null counts as advancing the cursor - return true; - } - // check if we're past the end key - int endCompare = -1; - // we should compare the row to the end of the range - - if(overallRange.getEndKey() != null) - { - endCompare = overallRange.getEndKey().getRow().compareTo(sources[sourceID].iter.getTopKey().getRow()); - if((!overallRange.isEndKeyInclusive() && endCompare <= 0) || endCompare < 0) - { - currentPartition = null; - // setting currentRow to null counts as advancing the cursor - return true; - } - } - int partitionCompare = currentPartition.compareTo(getPartition(sources[sourceID].iter.getTopKey())); - // check if this source is already at or beyond currentRow - // if not, then seek to at least the current row - if(partitionCompare > 0) - { - // seek to at least the currentRow - Key seekKey = buildKey(currentPartition,sources[sourceID].term); - sources[sourceID].iter.seek(new Range(seekKey,true, null, false), seekColumnFamilies, inclusive); - continue; - } - // check if this source has gone beyond currentRow - // if so, advance currentRow - if(partitionCompare < 0) - { - currentPartition.set(getPartition(sources[sourceID].iter.getTopKey())); - currentDocID.set(emptyByteArray); - advancedCursor = true; - continue; - } - // we have verified that the current source is positioned in currentRow - // now we must make sure we're in the right columnFamily in the current row - // Note: Iterators are auto-magically set to the correct columnFamily - - if(sources[sourceID].term != null) - { - int termCompare = sources[sourceID].term.compareTo(getTerm(sources[sourceID].iter.getTopKey())); - // check if this source is already on the right columnFamily - // if not, then seek forwards to the right columnFamily - if(termCompare > 0) - { - Key seekKey = buildKey(currentPartition,sources[sourceID].term,currentDocID); - sources[sourceID].iter.seek(new Range(seekKey,true,null,false), seekColumnFamilies, inclusive); - continue; - } - // check if this source is beyond the right columnFamily - // if so, then seek to the next row - if(termCompare < 0) - { - // we're out of entries in the current row, so seek to the next one - // byte[] currentRowBytes = currentRow.getBytes(); - // byte[] nextRow = new byte[currentRowBytes.length + 1]; - // System.arraycopy(currentRowBytes, 0, nextRow, 0, currentRowBytes.length); - // nextRow[currentRowBytes.length] = (byte)0; - // // we should reuse text objects here - // sources[sourceID].seek(new Key(new Text(nextRow),columnFamilies[sourceID])); - if(endCompare == 0) - { - // we're done - currentPartition = null; - // setting currentRow to null counts as advancing the cursor - return true; - } - Key seekKey = buildFollowingPartitionKey(sources[sourceID].iter.getTopKey()); - try { - sources[sourceID].iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive); - } catch (Exception e) { - // the seek will throw an exception if we have crossed a tablet boundary - // setting the Partition to null will advance to the next tablet - currentPartition = null; - return true; - } - continue; - } - } - // we have verified that we are in currentRow and the correct column family - // make sure we are at or beyond columnQualifier - Text docID = getDocID(sources[sourceID].iter.getTopKey()); - int docIDCompare = currentDocID.compareTo(docID); - // if this source has advanced beyond the current column qualifier then advance currentCQ and return true - if(docIDCompare < 0) - { - currentDocID.set(docID); - advancedCursor = true; - break; - } - // if this source is not yet at the currentCQ then seek in this source - if(docIDCompare > 0) - { - // seek forwards - Key seekKey = buildKey(currentPartition, sources[sourceID].term, currentDocID); - sources[sourceID].iter.seek(new Range(seekKey, true, null, false), seekColumnFamilies, inclusive); - continue; - } - // this source is at the current row, in its column family, and at currentCQ - break; - } - } - return advancedCursor; - } - - @Override - public void next() throws IOException { - if(currentPartition == null) - { - return; - } - // precondition: the current row is set up and the sources all have the same column qualifier - // while we don't have a match, seek in the source with the smallest column qualifier - sources[0].iter.next(); - advanceToIntersection(); - } - - protected void advanceToIntersection() throws IOException - { - boolean cursorChanged = true; - int numSeeks = 0; - while(cursorChanged) - { - // seek all of the sources to at least the highest seen column qualifier in the current row - cursorChanged = false; - for(int i = 0; i < sourcesCount; i++) - { - if(currentPartition == null) - { - topKey = null; - return; - } - numSeeks++; - if(seekOneSource(i)) - { - cursorChanged = true; - break; - } - } - } - topKey = buildKey(currentPartition,nullText,currentDocID); - } - - public static String stringTopKey(SortedKeyValueIterator<Key, Value> iter) { - if (iter.hasTop()) - return iter.getTopKey().toString(); - return ""; - } - - public static final String columnFamiliesOptionName = "columnFamilies"; - public static final String notFlagOptionName = "notFlag"; - - public static String encodeColumns(Text[] columns) - { - StringBuilder sb = new StringBuilder(); - for(int i = 0; i < columns.length; i++) - { - sb.append(new String(Base64.encodeBase64(TextUtil.getBytes(columns[i])))); - sb.append('\n'); - } - return sb.toString(); - } - - public static String encodeBooleans(boolean[] flags) - { - byte[] bytes = new byte[flags.length]; - for(int i = 0; i < flags.length; i++) - { - if(flags[i]) - bytes[i] = 1; - else - bytes[i] = 0; - } - return new String(Base64.encodeBase64(bytes)); - } - - public static Text[] decodeColumns(String columns) - { - String[] columnStrings = columns.split("\n"); - Text[] columnTexts = new Text[columnStrings.length]; - for(int i = 0; i < columnStrings.length; i++) - { - columnTexts[i] = new Text(Base64.decodeBase64(columnStrings[i].getBytes())); - } - return columnTexts; - } - - public static boolean[] decodeBooleans(String flags) - { - // return null of there were no flags - if(flags == null) - return null; - - byte[] bytes = Base64.decodeBase64(flags.getBytes()); - boolean[] bFlags = new boolean[bytes.length]; - for(int i = 0; i < bytes.length; i++) - { - if(bytes[i] == 1) - bFlags[i] = true; - else - bFlags[i] = false; - } - return bFlags; - } - - @Override - public void init(SortedKeyValueIterator<Key, Value> source, - Map<String, String> options, IteratorEnvironment env) throws IOException { - Text[] terms = decodeColumns(options.get(columnFamiliesOptionName)); - boolean[] notFlag = decodeBooleans(options.get(notFlagOptionName)); - - if(terms.length < 2) - { - throw new IOException("IntersectionIterator requires two or more columns families"); - } - - // Scan the not flags. - // There must be at least one term that isn't negated - // And we are going to re-order such that the first term is not a ! term - if(notFlag == null) - { - notFlag = new boolean[terms.length]; - for(int i = 0; i < terms.length; i++) - notFlag[i] = false; - } - if(notFlag[0]) { - for(int i = 1; i < notFlag.length; i++) - { - if(notFlag[i] == false) - { - Text swapFamily = new Text(terms[0]); - terms[0].set(terms[i]); - terms[i].set(swapFamily); - notFlag[0] = false; - notFlag[i] = true; - break; - } - } - if(notFlag[0]) - { - throw new IOException("IntersectionIterator requires at lest one column family without not"); - } - } - - - sources = new TermSource[terms.length]; - sources[0] = new TermSource(source, terms[0]); - for(int i = 1; i < terms.length; i++) - { - sources[i] = new TermSource(source.deepCopy(env), terms[i], notFlag[i]); - } - sourcesCount = terms.length; - } - - @Override - public void seek(Range range, Collection<ByteSequence> seekColumnFamilies, boolean inclusive) throws IOException { - overallRange = new Range(range); - currentPartition = new Text(); - currentDocID.set(emptyByteArray); - - this.seekColumnFamilies = seekColumnFamilies; - this.inclusive = inclusive; - - // seek each of the sources to the right column family within the row given by key - for(int i = 0; i < sourcesCount; i++) - { - Key sourceKey; - if(range.getStartKey() != null) - { - if(range.getStartKey().getColumnQualifier() != null) - { - sourceKey = buildKey(getPartition(range.getStartKey()),sources[i].term,range.getStartKey().getColumnQualifier()); - } - else - { - sourceKey = buildKey(getPartition(range.getStartKey()),sources[i].term); - } - sources[i].iter.seek(new Range(sourceKey, true, null, false), seekColumnFamilies, inclusive); - } - else - { - sources[i].iter.seek(range, seekColumnFamilies, inclusive); - } - } - advanceToIntersection(); - } - - public void addSource(SortedKeyValueIterator<Key, Value> source, IteratorEnvironment env, - Text term, boolean notFlag) { - // Check if we have space for the added Source - if(sources == null) - { - sources = new TermSource[1]; - } - else - { - // allocate space for node, and copy current tree. - // TODO: Should we change this to an ArrayList so that we can just add() ? - TermSource[] localSources = new TermSource[sources.length + 1]; - int currSource = 0; - for(TermSource myTerm : sources) - { - // TODO: Do I need to call new here? or can I just re-use the term? - localSources[currSource] = new TermSource(myTerm); - currSource++; - } - sources = localSources; - } - sources[sourcesCount] = new TermSource(source.deepCopy(env), term, notFlag); - sourcesCount++; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/common-query/src/main/java/ss/cloudbase/core/iterators/IntersectionRange.java ---------------------------------------------------------------------- diff --git a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/IntersectionRange.java b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/IntersectionRange.java deleted file mode 100644 index 04d5884..0000000 --- a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/IntersectionRange.java +++ /dev/null @@ -1,330 +0,0 @@ -package ss.cloudbase.core.iterators; - -import java.io.IOException; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; - -import org.apache.hadoop.io.Text; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; - -import cloudbase.core.client.CBException; -import cloudbase.core.data.ArrayByteSequence; -import cloudbase.core.data.ByteSequence; -import cloudbase.core.data.Key; -import cloudbase.core.data.PartialKey; -import cloudbase.core.data.Range; -import cloudbase.core.data.Value; -import cloudbase.core.iterators.IteratorEnvironment; -import cloudbase.core.iterators.SortedKeyValueIterator; - -/** - * When attempting to intersect a term which is a range (lowerval <= x <= upperval), the entire range - * must first be scanned so that the document keys can be sorted before passing them up to the - * intersecting iterator of choice. - * - * @author William Wall (wawall) - */ -public class IntersectionRange implements SortedKeyValueIterator<Key, Value>{ - private static final Logger logger = Logger.getLogger(IntersectionRange.class); - - public static final String OPTION_OUTPUT_COLF = "outputColf"; - public static final String OPTION_OUTPUT_TERM = "outputTerm"; - public static final String OPTION_COLF = "columnFamily"; - public static final String OPTION_LOWER_BOUND = "lower"; - public static final String OPTION_UPPER_BOUND = "upper"; - public static final String OPTION_DELIMITER = "delimiter"; - public static final String OPTION_START_INCLUSIVE = "startInclusive"; - public static final String OPTION_END_INCLUSIVE = "endInclusive"; - public static final String OPTION_TEST_OUTOFMEM = "testOutOfMemory"; - - protected SortedKeyValueIterator<Key, Value> source; - protected Text colf = null; - protected Text lower = null; - protected Text upper = null; - protected String delimiter = null; - protected String outputTerm = null; - protected Text outputColf = null; - protected Text currentPartition = null; - protected boolean startInclusive = true; - protected boolean endInclusive = false; - protected boolean testOutOfMemory = false; - - protected Key topKey = null; - - protected Iterator<Key> itr; - protected boolean sortComplete = false; - protected Range overallRange; - protected SortedSet<Key> docIds = new TreeSet<Key>(); - protected static Set<ByteSequence> indexColfSet; - - @Override - public SortedKeyValueIterator<Key, Value> deepCopy(IteratorEnvironment env) { - return new IntersectionRange(this, env); - } - - public IntersectionRange() { - logger.setLevel(Level.ALL); - } - - public IntersectionRange(IntersectionRange other, IteratorEnvironment env) { - source = other.source.deepCopy(env); - colf = other.colf; - lower = other.lower; - upper = other.upper; - delimiter = other.delimiter; - outputColf = other.outputColf; - outputTerm = other.outputTerm; - currentPartition = other.currentPartition; - startInclusive = other.startInclusive; - endInclusive = other.endInclusive; - topKey = other.topKey; - docIds.addAll(other.docIds); - itr = docIds.iterator(); - sortComplete = other.sortComplete; - overallRange = other.overallRange; - } - - public Text getOutputTerm() { - return new Text(outputTerm); - } - - public Text getOutputColumnFamily() { - return outputColf; - } - - @Override - public Key getTopKey() { - return topKey; - } - - @Override - public Value getTopValue() { - return IteratorConstants.emptyValue; - } - - @Override - public boolean hasTop() { - try { - if (topKey == null) next(); - } catch (IOException e) { - - } - - return topKey != null; - } - - protected String getDocID(Key key) { - try { - String s = key.getColumnQualifier().toString(); - int start = s.indexOf("\u0000") + 1; - int end = s.indexOf("\u0000", start); - if (end == -1) { - end = s.length(); - } - return s.substring(start, end); - } catch (Exception e) { - - } - return null; - } - - protected Text getTerm(Key key) { - try { - Text colq = key.getColumnQualifier(); - Text term = new Text(); - term.set(colq.getBytes(), 0, colq.find("\0")); - return term; - } catch (Exception e) { - } - return null; - } - - protected Text getPartition(Key key) { - return key.getRow(); - } - - protected Text getFollowingPartition(Key key) { - return key.followingKey(PartialKey.ROW).getRow(); - } - - @Override - public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException { - if (options.containsKey(OPTION_LOWER_BOUND)) { - lower = new Text(options.get(OPTION_LOWER_BOUND)); - } else { - lower = new Text("\u0000"); - } - - if (options.containsKey(OPTION_UPPER_BOUND)) { - upper = new Text(options.get(OPTION_UPPER_BOUND)); - } else { - upper = new Text("\u0000"); - } - - if (options.containsKey(OPTION_DELIMITER)) { - delimiter = options.get(OPTION_DELIMITER); - } else { - delimiter = "\u0000"; - } - - if (options.containsKey(OPTION_COLF)) { - colf = new Text(options.get(OPTION_COLF)); - } else { - colf = new Text("index"); - } - - if (options.containsKey(OPTION_OUTPUT_COLF)) { - outputColf = new Text(options.get(OPTION_OUTPUT_COLF)); - } else { - outputColf = colf; - } - - if (options.containsKey(OPTION_START_INCLUSIVE)) { - startInclusive = Boolean.parseBoolean(options.get(OPTION_START_INCLUSIVE)); - } - - if (options.containsKey(OPTION_END_INCLUSIVE)) { - endInclusive = Boolean.parseBoolean(options.get(OPTION_END_INCLUSIVE)); - } - - if (options.containsKey(OPTION_TEST_OUTOFMEM)) { - testOutOfMemory = Boolean.parseBoolean(options.get(OPTION_TEST_OUTOFMEM)); - } - - outputTerm = options.get(OPTION_OUTPUT_TERM); - this.source = source; - - indexColfSet = Collections.singleton((ByteSequence) new ArrayByteSequence(colf.getBytes(),0,colf.getLength())); - } - - /** - * Sets up the document/record IDs in a sorted structure. - * @throws IOException - * @throws CBException - */ - protected void setUpDocIds() throws IOException { - int count = 0; - try { - if (testOutOfMemory) { - throw new OutOfMemoryError(); - } - - long start = System.currentTimeMillis(); - if (source.hasTop()) { - docIds.clear(); - currentPartition = getPartition(source.getTopKey()); - while (currentPartition != null) { - Key lowerKey = new Key(currentPartition, colf, lower); - try { - source.seek(new Range(lowerKey, true, null, false), indexColfSet, true); - } catch (IllegalArgumentException e) { - // the range does not overlap the overall range? quit - currentPartition = null; - break; - } - - // if we don't have a value then quit - if (!source.hasTop()) { - currentPartition = null; - break; - } - - Key top; - while(source.hasTop()) { - top = source.getTopKey(); - - if (overallRange != null && overallRange.getEndKey() != null) { - // see if we're past the end of the partition range - int endCompare = overallRange.getEndKey().compareTo(top, PartialKey.ROW); - if ((!overallRange.isEndKeyInclusive() && endCompare <= 0) || endCompare < 0) { - // we're done - currentPartition = null; - break; - } - } - - // make sure we're still in the right partition - if (currentPartition.compareTo(getPartition(top)) < 0) { - currentPartition.set(getPartition(top)); - break; - } - - // make sure we're still in the right column family - if (colf.compareTo(top.getColumnFamily()) < 0) { - // if not, then get the next partition - currentPartition = getFollowingPartition(top); - break; - } - - Text term = getTerm(top); - int lowerCompare = term.compareTo(lower); - int upperCompare = term.compareTo(upper); - - // if we went past the upper bound, jump to the next partition - if ((endInclusive && upperCompare > 0) || (!endInclusive && upperCompare >= 0)) { - currentPartition = getFollowingPartition(top); - break; - } else if ((startInclusive && lowerCompare >= 0) || (!startInclusive && lowerCompare > 0)) { - // if the term is lexicographically between the upper and lower bounds, - // then add the doc ID - docIds.add(buildOutputKey(top)); - count++; - } - source.next(); - - // make sure we check to see if we're at the end before potentially seeking back - if (!source.hasTop()) { - currentPartition = null; - break; - } - } - } - itr = docIds.iterator(); - sortComplete = true; - logger.debug("setUpDocIds completed for " + lower + "<=" + colf + "<=" + upper + " in " + (System.currentTimeMillis() - start) + " ms. Count = " + count); - } else { - logger.warn("There appear to be no records on this tablet"); - } - } catch (OutOfMemoryError e) { - logger.warn("OutOfMemory error: Count = " + count); - throw new IOException("OutOfMemory error while sorting keys"); - } - } - - protected Key buildOutputKey(Key key) { - String id = getDocID(key); - return new Key(currentPartition, outputColf, new Text((outputTerm != null ? outputTerm: colf.toString()) + "\u0000" +id)); - } - - @Override - public void next() throws IOException { - if (itr != null && itr.hasNext()) { - topKey = itr.next(); - } else { - topKey = null; - } - } - - @Override - public void seek(Range range, Collection<ByteSequence> colfs, boolean inclusive) throws IOException { - if (!sortComplete) { - overallRange = range; - source.seek(range, colfs, inclusive); - setUpDocIds(); - } - - if (range.getStartKey() != null) { - while (hasTop() && topKey.compareTo(range.getStartKey(), PartialKey.ROW_COLFAM_COLQUAL) < 0) { - next(); - } - } else { - next(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/common-query/src/main/java/ss/cloudbase/core/iterators/IteratorConstants.java ---------------------------------------------------------------------- diff --git a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/IteratorConstants.java b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/IteratorConstants.java deleted file mode 100644 index 0db50f6..0000000 --- a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/IteratorConstants.java +++ /dev/null @@ -1,11 +0,0 @@ -package ss.cloudbase.core.iterators; - -import org.apache.hadoop.io.Text; - -import cloudbase.core.data.Value; - -public class IteratorConstants { - public static final byte[] emptyByteArray = new byte[0]; - public static final Value emptyValue = new Value(emptyByteArray); - public static final Text emptyText = new Text(emptyByteArray); -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/common-query/src/main/java/ss/cloudbase/core/iterators/SortedMinIterator.java ---------------------------------------------------------------------- diff --git a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/SortedMinIterator.java b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/SortedMinIterator.java deleted file mode 100644 index c25cc72..0000000 --- a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/SortedMinIterator.java +++ /dev/null @@ -1,173 +0,0 @@ -package ss.cloudbase.core.iterators; - -import java.io.IOException; -import java.util.Map; -import java.util.SortedSet; -import java.util.TreeSet; - -import org.apache.hadoop.io.Text; -import org.apache.log4j.Logger; - -import cloudbase.core.data.Key; -import cloudbase.core.data.PartialKey; -import cloudbase.core.data.Range; -import cloudbase.core.data.Value; -import cloudbase.core.iterators.IteratorEnvironment; -import cloudbase.core.iterators.SortedKeyValueIterator; - -/** - * Iterates over the minimum value of every term with the given prefix and parts delimeter. If, for example, you - * wanted to find each person's last known position, you would set up the following index: - * - * We want the last date instead of the first, so we'll use reverseDate in our index - * partitionX index:<prefix>_<personID>_<reverseDate>.<recordID> - * - * (where "." is actually "\u0000") - * - * <code>SortedMinIterator</code> initially seeks to index:prefix in the first partition. From there, it grabs the record - * as the "document" and then seeks to index:<whatever-the-term-was-up-to-last-delimiter> + "\uFFFD" (last unicode - * character), which then puts it at the next persion ID in our example. - * - * NOTE that this iterator gives a unique result per tablet server. You may have to process the results to determine - * the true minimum value. - * - * @author William Wall (wawall) - */ -public class SortedMinIterator extends SortedRangeIterator { - private static final Logger logger = Logger.getLogger(SortedMinIterator.class); - - /** - * The option to supply a prefix to the term combination. Defaults to "min" - */ - public static final String OPTION_PREFIX = "prefix"; - - /** - * The delimiter for the term (note that this is and must be different than the delimiter between the term and record ID). Defaults to "_" - */ - public static final String OPTION_PARTS_DELIMITER = "partsDelimiter"; - - protected String prefix = "min"; - protected String partsDelimiter = "_"; - protected boolean firstKey = true; - protected String lastPart = null; - - @Override - public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException { - super.init(source, options, env); - - prefix = options.get(OPTION_PREFIX); - String s = options.get(OPTION_PARTS_DELIMITER); - partsDelimiter = s != null ? s: "_"; - //TODO: make sure prefix and partsDelimeter is set - lower = new Text(prefix); - } - - protected String getPrefix(Key key) { - String s = key.getColumnQualifier().toString(); - int i = s.indexOf(partsDelimiter); - if (i > 0) { - return s.substring(0, i + partsDelimiter.length()); - } - return null; - } - - protected String getPart(Key key) { - String s = key.getColumnQualifier().toString(); - int i = s.lastIndexOf(partsDelimiter); - if (i > 0) { - return s.substring(0, i + 1); - } - return null; - } - - @Override - protected void setUpDocIds() throws IOException { - int count = 0; - try { - if (testOutOfMemory) { - throw new OutOfMemoryError(); - } - - long start = System.currentTimeMillis(); - if (source.hasTop()) { - SortedSet<Key> docIds = new TreeSet<Key>(); - currentPartition = getPartition(source.getTopKey()); - while (currentPartition != null) { - // seek to the prefix (aka lower) - Key lowerKey = new Key(currentPartition, colf, lower); - source.seek(new Range(lowerKey, true, null, false), indexColfSet, true); - - // if we don't have a value then quit - if (!source.hasTop()) { - currentPartition = null; - } - - Key top; - while(source.hasTop()) { - top = source.getTopKey(); - - if (overallRange != null && overallRange.getEndKey() != null) { - // see if we're past the end of the partition range - int endCompare = overallRange.getEndKey().compareTo(top, PartialKey.ROW); - if ((!overallRange.isEndKeyInclusive() && endCompare <= 0) || endCompare < 0) { - // we're done - currentPartition = null; - break; - } - } - - // make sure we're still in the right partition - if (currentPartition.compareTo(getPartition(top)) < 0) { - currentPartition.set(getPartition(top)); - break; - } - - // make sure we're still in the right column family - if (colf.compareTo(top.getColumnFamily()) < 0) { - // if not, then get the next partition - currentPartition = getFollowingPartition(top); - break; - } - - // make sure we're still in the index prefix - String p = getPrefix(top); - String part = getPart(top); - - if (p != null && p.startsWith(prefix)) { - if (part != null) { - if (!part.equals(lastPart)) { - // if the part (e.g. "lastPosition_personId_") is different, then it's valid - lastPart = part; - docIds.add(buildOutputKey(top)); - count++; - } - - // seek to the next part - lowerKey = new Key(currentPartition, colf, new Text(part + "\uFFFD")); - source.seek(new Range(lowerKey, true, null, false), indexColfSet, true); - } - } else { - // we're done in this partition - currentPartition = getFollowingPartition(top); - break; - } - - // make sure we check to see if we're at the end before potentially seeking back - if (!source.hasTop()) { - currentPartition = null; - break; - } - } - } - itr = docIds.iterator(); - sortComplete = true; - logger.debug("setUpDocIds completed in " + (System.currentTimeMillis() - start) + " ms. Count = " + count); - } else { - logger.warn("There appear to be no records on this tablet"); - } - } catch (OutOfMemoryError e) { - logger.warn("OutOfMemory error: Count = " + count); - throw new IOException("OutOfMemory error while sorting keys"); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/common-query/src/main/java/ss/cloudbase/core/iterators/SortedRangeIterator.java ---------------------------------------------------------------------- diff --git a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/SortedRangeIterator.java b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/SortedRangeIterator.java deleted file mode 100644 index 4541230..0000000 --- a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/SortedRangeIterator.java +++ /dev/null @@ -1,136 +0,0 @@ -package ss.cloudbase.core.iterators; - -import java.io.IOException; -import java.util.Collections; -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.io.Text; -import org.apache.log4j.Logger; - -import cloudbase.core.data.ArrayByteSequence; -import cloudbase.core.data.ByteSequence; -import cloudbase.core.data.Key; -import cloudbase.core.data.PartialKey; -import cloudbase.core.data.Range; -import cloudbase.core.data.Value; -import cloudbase.core.iterators.IteratorEnvironment; -import cloudbase.core.iterators.SortedKeyValueIterator; - -/** - * <code>SortedRangeIterator</code> uses the insertion sort functionality of <code>IntersectionRange</code> - * to store off document keys rather than term keys. - * - * @author William Wall (wawall) - */ -public class SortedRangeIterator extends IntersectionRange { - private static final Logger logger = Logger.getLogger(SortedRangeIterator.class); - - /** Use this option to set the document column family. Defaults to "event". **/ - public static final String OPTION_DOC_COLF = "docColf"; - - /** - * Use this option to retrieve all the documents that match the UUID rather than just the first. This - * is commonly used in cell-level security models that use the column-qualifier like this: - * UUID \0 field1 [] value - * UUID \0 securedField [ALPHA] secretValue - **/ - public static final String OPTION_MULTI_DOC = "multiDoc"; - - /** The source document iterator **/ - protected SortedKeyValueIterator<Key, Value> docSource; - - /** The document column family. Defaults to "event". **/ - protected Text docColf; - protected Value docValue; - - protected boolean nextId = false; - protected Range docRange = null; - protected boolean multiDoc; - - protected Set<ByteSequence> docColfSet; - - @Override - public void next() throws IOException { - if (multiDoc && nextId) { - docSource.next(); - - // check to make sure that the docSource top is less than our max key - if (docSource.hasTop() && docRange.contains(docSource.getTopKey())) { - topKey = docSource.getTopKey(); - docValue = docSource.getTopValue(); - return; - } - } - - super.next(); - - // if we're looking for multiple documents in the doc source, then - // set the max key for our range check - if (topKey != null) { - Text row = topKey.getRow(); - Text colf = topKey.getColumnFamily(); - if (multiDoc) { - docRange = new Range( - new Key (row, colf, new Text(topKey.getColumnQualifier().toString())), - true, - new Key (row, colf, new Text(topKey.getColumnQualifier().toString() + "\u0000\uFFFD")), - true - ); - } else { - docRange = new Range(new Key (row, colf, new Text(topKey.getColumnQualifier().toString())),true, null, false); - } - } - - nextId = false; - getDocument(); - } - - @Override - public Value getTopValue() { - return docValue; - } - - @Override - protected Key buildOutputKey(Key key) { - // we want to build the document key as the output key - return new Key(currentPartition, docColf, new Text(getDocID(key))); - } - - protected void getDocument() throws IOException { - // look up the document value - if (topKey != null) { - docSource.seek(docRange, docColfSet, true); - - if (docSource.hasTop() && docRange.contains(docSource.getTopKey())) { - // found it! - topKey = docSource.getTopKey(); - docValue = docSource.getTopValue(); - nextId = true; - } else { - // does not exist or user had auths that could see the index but not the event - logger.warn("Document: " + topKey + " does not exist or user had auths for " + colf + " but not " + docColf); - docValue = IteratorConstants.emptyValue; - } - } - } - - @Override - public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException { - super.init(source, options, env); - docSource = source.deepCopy(env); - if (options.containsKey(OPTION_DOC_COLF)) { - docColf = new Text(options.get(OPTION_DOC_COLF)); - } else { - docColf = new Text("event"); - } - - if (options.containsKey(OPTION_MULTI_DOC)) { - multiDoc = Boolean.parseBoolean(options.get(OPTION_MULTI_DOC)); - } else { - multiDoc = false; - } - - docColfSet = Collections.singleton((ByteSequence) new ArrayByteSequence(docColf.getBytes(), 0, docColf.getLength())); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/80faf06d/partition/common-query/src/main/java/ss/cloudbase/core/iterators/UniqueIterator.java ---------------------------------------------------------------------- diff --git a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/UniqueIterator.java b/partition/common-query/src/main/java/ss/cloudbase/core/iterators/UniqueIterator.java deleted file mode 100644 index 2111bbd..0000000 --- a/partition/common-query/src/main/java/ss/cloudbase/core/iterators/UniqueIterator.java +++ /dev/null @@ -1,95 +0,0 @@ -package ss.cloudbase.core.iterators; - -import java.io.IOException; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; - -import cloudbase.core.data.ByteSequence; -import cloudbase.core.data.Key; -import cloudbase.core.data.PartialKey; -import cloudbase.core.data.Range; -import cloudbase.core.data.Value; -import cloudbase.core.iterators.IteratorEnvironment; -import cloudbase.core.iterators.SkippingIterator; -import cloudbase.core.iterators.SortedKeyValueIterator; -import cloudbase.core.iterators.WrappingIterator; - -/** - * This iterator gets unique keys by the given depth. The depth defaults to PartialKey.ROW_COLFAM. - * - * @author William Wall - */ -public class UniqueIterator extends WrappingIterator { - public static final String OPTION_DEPTH = "depth"; - private static final Collection<ByteSequence> EMPTY_SET = Collections.emptySet(); - protected PartialKey depth; - protected Range range; - protected Key lastKey = null; - - public UniqueIterator() {} - - public UniqueIterator(UniqueIterator other) { - this.depth = other.depth; - this.range = other.range; - this.lastKey = other.lastKey; - } - - @Override - public void next() throws IOException { - consume(); - } - - protected void consume() throws IOException { - if (lastKey != null) { - int count = 0; - // next is way faster, so we'll try doing that 10 times before seeking - while (getSource().hasTop() && getSource().getTopKey().compareTo(lastKey, depth) == 0 && count < 10) { - getSource().next(); - count++; - } - if (getSource().hasTop() && getSource().getTopKey().compareTo(lastKey, depth) == 0) { - reseek(getSource().getTopKey().followingKey(depth)); - } - } - - if (getSource().hasTop()) { - lastKey = getSource().getTopKey(); - } - } - - protected void reseek(Key key) throws IOException { - if (range.afterEndKey(key)) { - range = new Range(range.getEndKey(), true, range.getEndKey(), range.isEndKeyInclusive()); - } else { - range = new Range(key, true, range.getEndKey(), range.isEndKeyInclusive()); - } - getSource().seek(range, EMPTY_SET, false); - } - - - @Override - public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env) throws IOException { - super.init(source, options, env); - - if (options.containsKey(OPTION_DEPTH)) { - depth = PartialKey.getByDepth(Integer.parseInt(options.get(OPTION_DEPTH))); - } else { - depth = PartialKey.ROW_COLFAM; - } - } - - @Override - public SortedKeyValueIterator<Key, Value> deepCopy(IteratorEnvironment env) { - UniqueIterator u = new UniqueIterator(this); - u.setSource(getSource().deepCopy(env)); - return u; - } - - @Override - public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { - this.range = range; - getSource().seek(range, columnFamilies, inclusive); - consume(); - } -}
