Modified: incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/logic/QueryLogic.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/logic/QueryLogic.java?rev=1210600&r1=1210599&r2=1210600&view=diff ============================================================================== --- incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/logic/QueryLogic.java (original) +++ incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/logic/QueryLogic.java Mon Dec 5 20:05:49 2011 @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package logic; import iterator.EvaluatingIterator; @@ -51,155 +51,152 @@ import com.google.protobuf.InvalidProtoc * QueryTable implementation that works with the JEXL grammar. This QueryTable * uses the metadata, global index, and partitioned table to return * results based on the query. Example queries: - * + * * <b>Single Term Query</b> * 'foo' - looks in global index for foo, and if any entries are found, then the query * is rewritten to be field1 == 'foo' or field2 == 'foo', etc. This is then passed * down the optimized query path which uses the intersecting iterators on the shard * table. - * + * * <b>Boolean expression</b> * field == 'foo' - For fielded queries, those that contain a field, an operator, and a literal (string or number), * the query is parsed and the set of eventFields in the query that are indexed is determined by * querying the metadata table. Depending on the conjunctions in the query (or, and, not) and the * eventFields that are indexed, the query may be sent down the optimized path or the full scan path. - * + * * We are not supporting all of the operators that JEXL supports at this time. We are supporting the following operators: - * + * * ==, !=, >, ≥, <, ≤, =~, and !~ - * + * * Custom functions can be created and registered with the Jexl engine. The functions can be used in the queries in conjunction * with other supported operators. A sample function has been created, called between, and is bound to the 'f' namespace. An * example using this function is : "f:between(LATITUDE,60.0, 70.0)" - * + * * <h2>Constraints on Query Structure</h2> * Queries that are sent to this class need to be formatted such that there is a space on either side of the operator. We are * rewriting the query in some cases and the current implementation is expecting a space on either side of the operator. Users * should also be aware that the literals used in the query need to match the data in the table. If an error occurs in the evaluation * we are skipping the event. - * + * * <h2>Notes on Optimization</h2> * Queries that meet any of the following criteria will perform a full scan of the events in the partitioned table: - * + * * 1. An 'or' conjunction exists in the query but not all of the terms are indexed. * 2. No indexed terms exist in the query * 3. An unsupported operator exists in the query - * + * * </pre> - * + * */ public class QueryLogic extends AbstractQueryLogic { - - protected static Logger log = Logger.getLogger(QueryLogic.class); - - private static String startPartition = "0"; + + protected static Logger log = Logger.getLogger(QueryLogic.class); + + private static String startPartition = "0"; + + public QueryLogic() { + super(); + } + + @Override + protected RangeCalculator getTermIndexInformation(Connector c, Authorizations auths, Multimap<String,Normalizer> indexedTerms, + Multimap<String,QueryTerm> terms, String indexTableName, String reverseIndexTableName, String queryString, int queryThreads, Set<String> typeFilter) + throws TableNotFoundException, org.apache.commons.jexl2.parser.ParseException { + RangeCalculator calc = new RangeCalculator(); + calc.execute(c, auths, indexedTerms, terms, queryString, this, typeFilter); + return calc; + } + + protected Collection<Range> getFullScanRange(Date begin, Date end, Multimap<String,QueryTerm> terms) { + String startKey = startPartition; + String endKey = Integer.toString(this.getNumPartitions()); + Range r = new Range(startKey, true, endKey, false); + return Collections.singletonList(r); + } + + @Override + protected IndexRanges getTermIndexInformation(Connector c, Authorizations auths, String value, Set<String> typeFilter) throws TableNotFoundException { + final String dummyTermName = "DUMMY"; + UnionIndexRanges indexRanges = new UnionIndexRanges(); - public QueryLogic() { - super(); + // The entries in the index are normalized, since we don't have a field, just try using the LcNoDiacriticsNormalizer. + String normalizedFieldValue = new LcNoDiacriticsNormalizer().normalizeFieldValue("", value); + // Remove the begin and end ' marks + if (normalizedFieldValue.startsWith("'") && normalizedFieldValue.endsWith("'")) { + normalizedFieldValue = normalizedFieldValue.substring(1, normalizedFieldValue.length() - 1); } - - @Override - protected RangeCalculator getTermIndexInformation(Connector c, - Authorizations auths, Multimap<String, Normalizer> indexedTerms, Multimap<String, QueryTerm> terms, - String indexTableName, String reverseIndexTableName, String queryString, int queryThreads, Set<String> typeFilter) - throws TableNotFoundException, org.apache.commons.jexl2.parser.ParseException { - RangeCalculator calc = new RangeCalculator(); - calc.execute(c, auths, indexedTerms, terms, queryString, this, typeFilter); - return calc; - } - - - protected Collection<Range> getFullScanRange(Date begin, Date end, Multimap<String, QueryTerm> terms) { - String startKey = startPartition; - String endKey = Integer.toString(this.getNumPartitions()); - Range r = new Range(startKey, true, endKey, false); - return Collections.singletonList(r); - } - - - @Override - protected IndexRanges getTermIndexInformation(Connector c, Authorizations auths, String value, Set<String> typeFilter) throws TableNotFoundException { - final String dummyTermName = "DUMMY"; - UnionIndexRanges indexRanges = new UnionIndexRanges(); - - //The entries in the index are normalized, since we don't have a field, just try using the LcNoDiacriticsNormalizer. - String normalizedFieldValue = new LcNoDiacriticsNormalizer().normalizeFieldValue("", value); - //Remove the begin and end ' marks - if (normalizedFieldValue.startsWith("'") && normalizedFieldValue.endsWith("'")) { - normalizedFieldValue = normalizedFieldValue.substring(1, normalizedFieldValue.length() - 1); - } - Text fieldValue = new Text(normalizedFieldValue); - if (log.isDebugEnabled()) { - log.debug("Querying index table : " + this.getIndexTableName() + " for normalized indexed term: " + fieldValue); - } - Scanner scanner = c.createScanner(this.getIndexTableName(), auths); - Range r = new Range(fieldValue); - scanner.setRange(r); - if (log.isDebugEnabled()) { - log.debug("Range for index query: " + r.toString()); - } - for (Entry<Key, Value> entry : scanner) { - if (log.isDebugEnabled()) { - log.debug("Index entry: " + entry.getKey().toString()); - } - //Get the shard id and datatype from the colq - String fieldName = entry.getKey().getColumnFamily().toString(); - String colq = entry.getKey().getColumnQualifier().toString(); - int separator = colq.indexOf(EvaluatingIterator.NULL_BYTE_STRING); - String shardId = null; - String datatype = null; - if (separator != -1) { - shardId = colq.substring(0, separator); - datatype = colq.substring(separator + 1); - } else { - shardId = colq; - } - //Skip this entry if the type is not correct - if (null != datatype && null != typeFilter && !typeFilter.contains(datatype)) - continue; - //Parse the UID.List object from the value - Uid.List uidList = null; - try { - uidList = Uid.List.parseFrom(entry.getValue().get()); - } catch (InvalidProtocolBufferException e) { - //Don't add UID information, at least we know what shards - //it is located in. - } - - //Add the count for this shard to the total count for the term. - long count = 0; - Long storedCount = indexRanges.getTermCardinality().get(dummyTermName); - if (null == storedCount) { - count = uidList.getCOUNT(); - } else { - count = uidList.getCOUNT() + storedCount; - } - indexRanges.getTermCardinality().put(dummyTermName, count); - //Add the field name - indexRanges.getFieldNamesAndValues().put(fieldName, normalizedFieldValue); - - //Create the keys - Text shard = new Text(shardId); - if (uidList.getIGNORE()) { - //Then we create a scan range that is the entire shard - indexRanges.add(dummyTermName, new Range(shard)); - } else { - //We should have UUIDs, create event ranges - for (String uuid : uidList.getUIDList()) { - Text cf = new Text(datatype); - TextUtil.textAppend(cf, uuid); - Key startKey = new Key(shard, cf); - Key endKey = new Key(shard, new Text(cf.toString() + EvaluatingIterator.NULL_BYTE_STRING)); - Range eventRange = new Range(startKey, true, endKey, false); - indexRanges.add(dummyTermName, eventRange); - } - } - } - if (log.isDebugEnabled()) { - log.debug("Found " + indexRanges.getRanges().size() + " entries in the index for field value: " + normalizedFieldValue); + Text fieldValue = new Text(normalizedFieldValue); + if (log.isDebugEnabled()) { + log.debug("Querying index table : " + this.getIndexTableName() + " for normalized indexed term: " + fieldValue); + } + Scanner scanner = c.createScanner(this.getIndexTableName(), auths); + Range r = new Range(fieldValue); + scanner.setRange(r); + if (log.isDebugEnabled()) { + log.debug("Range for index query: " + r.toString()); + } + for (Entry<Key,Value> entry : scanner) { + if (log.isDebugEnabled()) { + log.debug("Index entry: " + entry.getKey().toString()); + } + // Get the shard id and datatype from the colq + String fieldName = entry.getKey().getColumnFamily().toString(); + String colq = entry.getKey().getColumnQualifier().toString(); + int separator = colq.indexOf(EvaluatingIterator.NULL_BYTE_STRING); + String shardId = null; + String datatype = null; + if (separator != -1) { + shardId = colq.substring(0, separator); + datatype = colq.substring(separator + 1); + } else { + shardId = colq; + } + // Skip this entry if the type is not correct + if (null != datatype && null != typeFilter && !typeFilter.contains(datatype)) + continue; + // Parse the UID.List object from the value + Uid.List uidList = null; + try { + uidList = Uid.List.parseFrom(entry.getValue().get()); + } catch (InvalidProtocolBufferException e) { + // Don't add UID information, at least we know what shards + // it is located in. + } + + // Add the count for this shard to the total count for the term. + long count = 0; + Long storedCount = indexRanges.getTermCardinality().get(dummyTermName); + if (null == storedCount) { + count = uidList.getCOUNT(); + } else { + count = uidList.getCOUNT() + storedCount; + } + indexRanges.getTermCardinality().put(dummyTermName, count); + // Add the field name + indexRanges.getFieldNamesAndValues().put(fieldName, normalizedFieldValue); + + // Create the keys + Text shard = new Text(shardId); + if (uidList.getIGNORE()) { + // Then we create a scan range that is the entire shard + indexRanges.add(dummyTermName, new Range(shard)); + } else { + // We should have UUIDs, create event ranges + for (String uuid : uidList.getUIDList()) { + Text cf = new Text(datatype); + TextUtil.textAppend(cf, uuid); + Key startKey = new Key(shard, cf); + Key endKey = new Key(shard, new Text(cf.toString() + EvaluatingIterator.NULL_BYTE_STRING)); + Range eventRange = new Range(startKey, true, endKey, false); + indexRanges.add(dummyTermName, eventRange); } - return indexRanges; - + } } - + if (log.isDebugEnabled()) { + log.debug("Found " + indexRanges.getRanges().size() + " entries in the index for field value: " + normalizedFieldValue); + } + return indexRanges; + + } + }
Modified: incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/parser/EventFields.java URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/parser/EventFields.java?rev=1210600&r1=1210599&r2=1210600&view=diff ============================================================================== --- incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/parser/EventFields.java (original) +++ incubator/accumulo/trunk/contrib/accumulo_sample/query/src/main/java/parser/EventFields.java Mon Dec 5 20:05:49 2011 @@ -1,19 +1,19 @@ /* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package parser; import java.nio.ByteBuffer; @@ -36,189 +36,192 @@ import com.google.common.collect.Multise import com.google.common.collect.SetMultimap; /** - * Object used to hold the fields in an event. This is a multimap because fields can - * be repeated. + * Object used to hold the fields in an event. This is a multimap because fields can be repeated. */ -public class EventFields implements SetMultimap<String, FieldValue>, CustomSerialization { - - private static boolean kryoInitialized = false; - private static ArraySerializer valueSerializer = null; - - private Multimap<String,FieldValue> map = null; - - public static class FieldValue { - ColumnVisibility visibility; - byte[] value; - public FieldValue(ColumnVisibility visibility, byte[] value) { - super(); - this.visibility = visibility; - this.value = value; - } - public ColumnVisibility getVisibility() { - return visibility; - } - public byte[] getValue() { - return value; - } - public void setVisibility(ColumnVisibility visibility) { - this.visibility = visibility; - } - public void setValue(byte[] value) { - this.value = value; - } - - public int size() { - return visibility.flatten().length + value.length; - } - - @Override - public String toString() { - StringBuilder buf = new StringBuilder(); - if (null != visibility) - buf.append(" visibility: ").append(new String(visibility.flatten())); - if (null != value) - buf.append(" value size: ").append(value.length); - if (null != value) - buf.append(" value: " ).append(new String(value)); - return buf.toString(); - } - - } - - public EventFields() { - map = HashMultimap.create(); - } - - public int size() { - return map.size(); - } - - public boolean isEmpty() { - return map.isEmpty(); - } - - public boolean containsKey(Object key) { - return map.containsKey(key); - } - - public boolean containsValue(Object value) { - return map.containsValue(value); - } - - public boolean containsEntry(Object key, Object value) { - return map.containsEntry(key, value); - } - - public boolean put(String key, FieldValue value) { - return map.put(key, value); - } - - public boolean remove(Object key, Object value) { - return map.remove(key, value); - } - - public boolean putAll(String key, Iterable<? extends FieldValue> values) { - return map.putAll(key, values); - } - - public boolean putAll(Multimap<? extends String, ? extends FieldValue> multimap) { - return map.putAll(multimap); - } - - public void clear() { - map.clear(); - } - - public Set<String> keySet() { - return map.keySet(); - } - - public Multiset<String> keys() { - return map.keys(); - } - - public Collection<FieldValue> values() { - return map.values(); - } - - public Set<FieldValue> get(String key) { - return (Set<FieldValue>) map.get(key); - } - - public Set<FieldValue> removeAll(Object key) { - return (Set<FieldValue>) map.removeAll(key); - } - - public Set<FieldValue> replaceValues(String key, Iterable<? extends FieldValue> values) { - return (Set<FieldValue>) map.replaceValues(key, values); - } - - public Set<Entry<String, FieldValue>> entries() { - return (Set<Entry<String,FieldValue>>) map.entries(); - } - - public Map<String, Collection<FieldValue>> asMap() { - return map.asMap(); - } - - public int getByteSize() { - int count = 0; - for (Entry<String, FieldValue> e : map.entries()) { - count += e.getKey().getBytes().length + e.getValue().size(); - } - return count; - } - - @Override - public String toString() { - StringBuilder buf = new StringBuilder(); - for (Entry<String,FieldValue> entry : map.entries()) { - buf.append("\tkey: ").append(entry.getKey()).append(" -> ").append(entry.getValue().toString()).append("\n"); - } - return buf.toString(); - } - - public static synchronized void initializeKryo(Kryo kryo) { - if (kryoInitialized) - return; - valueSerializer = new ArraySerializer(kryo); - valueSerializer.setDimensionCount(1); - valueSerializer.setElementsAreSameType(true); - valueSerializer.setCanBeNull(false); - valueSerializer.setElementsCanBeNull(false); - kryo.register(byte[].class, valueSerializer); - kryoInitialized = true; - } - - public void readObjectData(Kryo kryo, ByteBuffer buf) { - if (!kryoInitialized) - EventFields.initializeKryo(kryo); - //Read in the number of map entries - int entries = IntSerializer.get(buf, true); - for (int i = 0; i < entries; i++) { - //Read in the key - String key = StringSerializer.get(buf); - //Read in the fields in the value - ColumnVisibility vis = new ColumnVisibility(valueSerializer.readObjectData(buf, byte[].class)); - byte[] value = valueSerializer.readObjectData(buf, byte[].class); - map.put(key, new FieldValue(vis, value)); - } - - } - - public void writeObjectData(Kryo kryo, ByteBuffer buf) { - if (!kryoInitialized) - EventFields.initializeKryo(kryo); - //Write out the number of entries; - IntSerializer.put(buf, map.size(), true); - for (Entry<String,FieldValue> entry : map.entries()) { - //Write the key - StringSerializer.put(buf, entry.getKey()); - //Write the fields in the value - valueSerializer.writeObjectData(buf, entry.getValue().getVisibility().flatten()); - valueSerializer.writeObjectData(buf, entry.getValue().getValue()); - } - } - - +public class EventFields implements SetMultimap<String,FieldValue>, CustomSerialization { + + private static boolean kryoInitialized = false; + private static ArraySerializer valueSerializer = null; + + private Multimap<String,FieldValue> map = null; + + public static class FieldValue { + ColumnVisibility visibility; + byte[] value; + + public FieldValue(ColumnVisibility visibility, byte[] value) { + super(); + this.visibility = visibility; + this.value = value; + } + + public ColumnVisibility getVisibility() { + return visibility; + } + + public byte[] getValue() { + return value; + } + + public void setVisibility(ColumnVisibility visibility) { + this.visibility = visibility; + } + + public void setValue(byte[] value) { + this.value = value; + } + + public int size() { + return visibility.flatten().length + value.length; + } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(); + if (null != visibility) + buf.append(" visibility: ").append(new String(visibility.flatten())); + if (null != value) + buf.append(" value size: ").append(value.length); + if (null != value) + buf.append(" value: ").append(new String(value)); + return buf.toString(); + } + + } + + public EventFields() { + map = HashMultimap.create(); + } + + public int size() { + return map.size(); + } + + public boolean isEmpty() { + return map.isEmpty(); + } + + public boolean containsKey(Object key) { + return map.containsKey(key); + } + + public boolean containsValue(Object value) { + return map.containsValue(value); + } + + public boolean containsEntry(Object key, Object value) { + return map.containsEntry(key, value); + } + + public boolean put(String key, FieldValue value) { + return map.put(key, value); + } + + public boolean remove(Object key, Object value) { + return map.remove(key, value); + } + + public boolean putAll(String key, Iterable<? extends FieldValue> values) { + return map.putAll(key, values); + } + + public boolean putAll(Multimap<? extends String,? extends FieldValue> multimap) { + return map.putAll(multimap); + } + + public void clear() { + map.clear(); + } + + public Set<String> keySet() { + return map.keySet(); + } + + public Multiset<String> keys() { + return map.keys(); + } + + public Collection<FieldValue> values() { + return map.values(); + } + + public Set<FieldValue> get(String key) { + return (Set<FieldValue>) map.get(key); + } + + public Set<FieldValue> removeAll(Object key) { + return (Set<FieldValue>) map.removeAll(key); + } + + public Set<FieldValue> replaceValues(String key, Iterable<? extends FieldValue> values) { + return (Set<FieldValue>) map.replaceValues(key, values); + } + + public Set<Entry<String,FieldValue>> entries() { + return (Set<Entry<String,FieldValue>>) map.entries(); + } + + public Map<String,Collection<FieldValue>> asMap() { + return map.asMap(); + } + + public int getByteSize() { + int count = 0; + for (Entry<String,FieldValue> e : map.entries()) { + count += e.getKey().getBytes().length + e.getValue().size(); + } + return count; + } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(); + for (Entry<String,FieldValue> entry : map.entries()) { + buf.append("\tkey: ").append(entry.getKey()).append(" -> ").append(entry.getValue().toString()).append("\n"); + } + return buf.toString(); + } + + public static synchronized void initializeKryo(Kryo kryo) { + if (kryoInitialized) + return; + valueSerializer = new ArraySerializer(kryo); + valueSerializer.setDimensionCount(1); + valueSerializer.setElementsAreSameType(true); + valueSerializer.setCanBeNull(false); + valueSerializer.setElementsCanBeNull(false); + kryo.register(byte[].class, valueSerializer); + kryoInitialized = true; + } + + public void readObjectData(Kryo kryo, ByteBuffer buf) { + if (!kryoInitialized) + EventFields.initializeKryo(kryo); + // Read in the number of map entries + int entries = IntSerializer.get(buf, true); + for (int i = 0; i < entries; i++) { + // Read in the key + String key = StringSerializer.get(buf); + // Read in the fields in the value + ColumnVisibility vis = new ColumnVisibility(valueSerializer.readObjectData(buf, byte[].class)); + byte[] value = valueSerializer.readObjectData(buf, byte[].class); + map.put(key, new FieldValue(vis, value)); + } + + } + + public void writeObjectData(Kryo kryo, ByteBuffer buf) { + if (!kryoInitialized) + EventFields.initializeKryo(kryo); + // Write out the number of entries; + IntSerializer.put(buf, map.size(), true); + for (Entry<String,FieldValue> entry : map.entries()) { + // Write the key + StringSerializer.put(buf, entry.getKey()); + // Write the fields in the value + valueSerializer.writeObjectData(buf, entry.getValue().getVisibility().flatten()); + valueSerializer.writeObjectData(buf, entry.getValue().getValue()); + } + } + }
