http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/HBaseFilterBuilder.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/HBaseFilterBuilder.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/HBaseFilterBuilder.java new file mode 100755 index 0000000..8209445 --- /dev/null +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/HBaseFilterBuilder.java @@ -0,0 +1,585 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.log.entity.filter; + +import org.apache.eagle.common.config.EagleConfigFactory; +import org.apache.eagle.log.entity.EntityQualifierUtils; +import org.apache.eagle.log.entity.meta.EntityDefinition; +import org.apache.eagle.log.entity.meta.Qualifier; +import org.apache.eagle.common.ByteUtil; +import org.apache.eagle.query.parser.*; +import org.apache.hadoop.hbase.filter.*; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.filter.FilterList.Operator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.Charset; +import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * the steps of building hbase filters + * 1. receive ORExpression from eagle-antlr + * 2. iterate all ANDExpression in ORExpression + * 2.1 put each ANDExpression to a new filter list with MUST_PASS_ONE option + * 2.2 iterate all AtomicExpression in ANDExpression + * 2.2.1 group AtomicExpression into 2 groups by looking up metadata, one is for tag filters, the other is for column filters + * 2.2.2 put the above 2 filters to a filter list with MUST_PASS_ALL option + */ +public class HBaseFilterBuilder { + private static final Logger LOG = LoggerFactory.getLogger(HBaseFilterBuilder.class); + + /** + * syntax is @<fieldname> + */ +// private static final String fnRegex = "^@(.*)$"; + private static final Pattern _fnPattern = TokenConstant.ID_PATTERN;// Pattern.compile(fnRegex); + private static final Charset _defaultCharset = Charset.forName("ISO-8859-1"); + + private ORExpression _orExpr; + private EntityDefinition _ed; + private boolean _filterIfMissing; + private Charset _charset = _defaultCharset; + + /** + * TODO: Verify performance impact + * + * @return + */ + public Set<String> getFilterFields() { + return _filterFields; + } + + /** + * Just add filter fields for expression filter + */ + private Set<String> _filterFields; + + public HBaseFilterBuilder(EntityDefinition ed, ORExpression orExpr) { + this(ed, orExpr, false); + } + + public HBaseFilterBuilder(EntityDefinition ed, ORExpression orExpr, boolean filterIfMissing) { + this._ed = ed; + this._orExpr = orExpr; + this._filterIfMissing = filterIfMissing; + } + + public void setCharset(String charsetName){ + _charset = Charset.forName(charsetName); + } + + public Charset getCharset(){ + return _charset; + } + + /** + * Because we don't have metadata for tag, we regard non-qualifer field as tag. So one field possibly is not a real tag when this function return true. This happens + * when a user input an wrong field name which is neither tag or qualifier + * + * @param field + */ + private boolean isTag(String field){ + return _ed.isTag(field); + } + + /** + * check whether this field is one entity attribute or not + * @param fieldName + * @return + */ + private String parseEntityAttribute(String fieldName){ + Matcher m = _fnPattern.matcher(fieldName); + if(m.find()){ + return m.group(1); + } + return null; + } + + /** + * Return the partition values for each or expression. The size of the returned list should be equal to + * the size of FilterList that {@link #buildFilters()} returns. + * + * TODO: For now we don't support one query to query multiple partitions. In future if partition is defined, + * for the entity, internally We need to spawn multiple queries and send one query for each partition. + * + * @return Return the partition values for each or expression. Return null if the entity doesn't support + * partition + */ + public List<String[]> getPartitionValues() { + final String[] partitions = _ed.getPartitions(); + if (partitions == null || partitions.length == 0) { + return null; + } + final List<String[]> result = new ArrayList<String[]>(); + final Map<String, String> partitionKeyValueMap = new HashMap<String, String>(); + for(ANDExpression andExpr : _orExpr.getANDExprList()) { + partitionKeyValueMap.clear(); + for(AtomicExpression ae : andExpr.getAtomicExprList()) { + // TODO temporarily ignore those fields which are not for attributes + if(ae.getKeyType() == TokenType.ID) { + final String fieldName = parseEntityAttribute(ae.getKey()); + if (fieldName == null) { + LOG.warn(fieldName + " field does not have format @<FieldName>, ignored"); + continue; + } + if (_ed.isPartitionTag(fieldName) && ComparisonOperator.EQUAL.equals(ae.getOp())) { + final String value = ae.getValue(); + partitionKeyValueMap.put(fieldName, value); + } + } + } + final String[] values = new String[partitions.length]; + result.add(values); + for (int i = 0; i < partitions.length; ++i) { + final String partition = partitions[i]; + final String value = partitionKeyValueMap.get(partition); + values[i] = value; + } + } + return result; + } + + /** + * @see org.apache.eagle.query.parser.TokenType + * + * @return + */ + public FilterList buildFilters(){ + // TODO: Optimize to select between row filter or column filter for better performance + // Use row key filter priority by default + boolean rowFilterPriority = true; + + FilterList fltList = new FilterList(Operator.MUST_PASS_ONE); + for(ANDExpression andExpr : _orExpr.getANDExprList()){ + + FilterList list = new FilterList(Operator.MUST_PASS_ALL); + Map<String, List<String>> tagFilters = new HashMap<String, List<String>>(); + List<QualifierFilterEntity> qualifierFilters = new ArrayList<QualifierFilterEntity>(); +// List<QualifierFilterEntry> tagLikeQualifierFilters = new ArrayList<QualifierFilterEntry>(); + + // TODO refactor not to use too much if/else + for(AtomicExpression ae : andExpr.getAtomicExprList()){ + // TODO temporarily ignore those fields which are not for attributes + + String fieldName = ae.getKey(); + if(ae.getKeyType() == TokenType.ID){ + fieldName = parseEntityAttribute(fieldName); + if(fieldName == null){ + LOG.warn(fieldName + " field does not have format @<FieldName>, ignored"); + continue; + } + } + + String value = ae.getValue(); + ComparisonOperator op = ae.getOp(); + TokenType keyType = ae.getKeyType(); + TokenType valueType = ae.getValueType(); + QualifierFilterEntity entry = new QualifierFilterEntity(fieldName,value,op,keyType,valueType); + + // TODO Exact match, need to add escape for those special characters here, including: + // "-", "[", "]", "/", "{", "}", "(", ")", "*", "+", "?", ".", "\\", "^", "$", "|" + + if(keyType == TokenType.ID && isTag(fieldName)){ + if ((ComparisonOperator.EQUAL.equals(op) || ComparisonOperator.IS.equals(op)) + && !TokenType.NULL.equals(valueType)) + { + // Use RowFilter for equal TAG + if(tagFilters.get(fieldName) == null) tagFilters.put(fieldName, new ArrayList<String>()); + tagFilters.get(fieldName).add(value); + } else if (rowFilterPriority && ComparisonOperator.IN.equals(op)) + { + // Use RowFilter here by default + if(tagFilters.get(fieldName) == null) tagFilters.put(fieldName, new ArrayList<String>()); + tagFilters.get(fieldName).addAll(EntityQualifierUtils.parseList(value)); + } else if (ComparisonOperator.LIKE.equals(op) + || ComparisonOperator.NOT_LIKE.equals(op) + || ComparisonOperator.CONTAINS.equals(op) + || ComparisonOperator.NOT_CONTAINS.equals(op) + || ComparisonOperator.IN.equals(op) + || ComparisonOperator.IS.equals(op) + || ComparisonOperator.IS_NOT.equals(op) + || ComparisonOperator.NOT_EQUAL.equals(op) + || ComparisonOperator.EQUAL.equals(op) + || ComparisonOperator.NOT_IN.equals(op)) + { + qualifierFilters.add(entry); + } else + { + LOG.warn("Don't support operation: \"" + op + "\" on tag field: " + fieldName + " yet, going to ignore"); + throw new IllegalArgumentException("Don't support operation: "+op+" on tag field: "+fieldName+", avaliable options: =, =!, =~, !=~, in, not in, contains, not contains"); + } + }else{ + qualifierFilters.add(entry); + } + } + + // Build RowFilter for equal tags + list.addFilter(buildTagFilter(tagFilters)); + + // Build SingleColumnValueFilter + FilterList qualifierFilterList = buildQualifierFilter(qualifierFilters); + if(qualifierFilterList != null && qualifierFilterList.getFilters().size()>0){ + list.addFilter(qualifierFilterList); + }else { + if(LOG.isDebugEnabled()) LOG.debug("Ignore empty qualifier filter from "+qualifierFilters.toString()); + } + fltList.addFilter(list); + } + LOG.info("Query: " + _orExpr.toString() + " => Filter: " + fltList.toString()); + return fltList; + } + + /** + * _charset is used to decode the byte array, in hbase server, RegexStringComparator uses the same + * charset to decode the byte array stored in qualifier + * for tag filter regex, it's always ISO-8859-1 as it only comes from String's hashcode (Integer) + * Note: regex comparasion is to compare String + */ + protected Filter buildTagFilter(Map<String, List<String>> tagFilters){ + RegexStringComparator regexStringComparator = new RegexStringComparator(buildTagFilterRegex(tagFilters)); + regexStringComparator.setCharset(_charset); + RowFilter filter = new RowFilter(CompareOp.EQUAL, regexStringComparator); + return filter; + } + + /** + * all qualifiers' condition must be satisfied. + * + * <H1>Use RegexStringComparator for:</H1> + * IN + * LIKE + * NOT_LIKE + * + * <H1>Use SubstringComparator for:</H1> + * CONTAINS + * + * <H1>Use EntityQualifierHelper for:</H1> + * EQUALS + * NOT_EUQALS + * LESS + * LESS_OR_EQUAL + * GREATER + * GREATER_OR_EQUAL + * + * <H2> + * TODO: Compare performance of RegexStringComparator ,SubstringComparator ,EntityQualifierHelper + * </H2> + * + * @param qualifierFilters + * @return + */ + protected FilterList buildQualifierFilter(List<QualifierFilterEntity> qualifierFilters){ + FilterList list = new FilterList(Operator.MUST_PASS_ALL); + // iterate all the qualifiers + for(QualifierFilterEntity entry : qualifierFilters){ + // if contains expression based filter + if(entry.getKeyType() == TokenType.EXP + || entry.getValueType() == TokenType.EXP + || entry.getKeyType() != TokenType.ID){ + if(!EagleConfigFactory.load().isCoprocessorEnabled()) { + LOG.warn("Expression in filter may not support, because custom filter and coprocessor is disabled: " + entry.toString()); + } + list.addFilter(buildExpressionBasedFilter(entry)); + continue; + } + + // else using SingleColumnValueFilter + String qualifierName = entry.getKey(); + if(!isTag(entry.getKey())){ + Qualifier qualifier = _ed.getDisplayNameMap().get(entry.getKey()); + qualifierName = qualifier.getQualifierName(); + } + + // Comparator to be used for building HBase Filter + // WritableByteArrayComparable comparator; + ByteArrayComparable comparable; + if(ComparisonOperator.IN.equals(entry.getOp()) + || ComparisonOperator.NOT_IN.equals(entry.getOp())){ + Filter setFilter = buildListQualifierFilter(entry); + if(setFilter!=null){ + list.addFilter(setFilter); + } + }else{ + // If [=,!=,is,is not] NULL, use NullComparator else throw exception + if(TokenType.NULL.equals(entry.getValueType())){ + if(ComparisonOperator.EQUAL.equals(entry.getOp()) + ||ComparisonOperator.NOT_EQUAL.equals(entry.getOp()) + ||ComparisonOperator.IS.equals(entry.getOp()) + ||ComparisonOperator.IS_NOT.equals(entry.getOp())) + comparable = new NullComparator(); + else + throw new IllegalArgumentException("Operation: "+entry.getOp()+" with NULL is not supported yet: "+entry.toString()+", avaliable options: [=, !=, is, is not] null|NULL"); + } + // If [contains, not contains],use SubstringComparator + else if (ComparisonOperator.CONTAINS.equals(entry.getOp()) + || ComparisonOperator.NOT_CONTAINS.equals(entry.getOp())) { + comparable = new SubstringComparator(entry.getValue()); + } + // If [like, not like], use RegexStringComparator + else if (ComparisonOperator.LIKE.equals(entry.getOp()) + || ComparisonOperator.NOT_LIKE.equals(entry.getOp())){ + // Use RegexStringComparator for LIKE / NOT_LIKE + RegexStringComparator _comparator = new RegexStringComparator(buildQualifierRegex(entry.getValue())); + _comparator.setCharset(_charset); + comparable = _comparator; + } else{ + Class type = EntityQualifierUtils.getType(_ed, entry.getKey()); + // if type is null (is Tag or not found) or not defined for TypedByteArrayComparator + if(!EagleConfigFactory.load().isCoprocessorEnabled() || type == null || TypedByteArrayComparator.get(type) == null){ + comparable = new BinaryComparator(EntityQualifierUtils.toBytes(_ed, entry.getKey(), entry.getValue())); + }else { + comparable = new TypedByteArrayComparator(EntityQualifierUtils.toBytes(_ed, entry.getKey(), entry.getValue()),type); + } + } + + SingleColumnValueFilter filter = + new SingleColumnValueFilter(_ed.getColumnFamily().getBytes(), qualifierName.getBytes(), convertToHBaseCompareOp(entry.getOp()), comparable); + filter.setFilterIfMissing(_filterIfMissing); + list.addFilter(filter); + } + } + + return list; + } + + private Filter buildExpressionBasedFilter(QualifierFilterEntity entry) { + BooleanExpressionComparator expressionComparator = new BooleanExpressionComparator(entry,_ed); + _filterFields = expressionComparator.getRequiredFields(); + RowValueFilter filter = new RowValueFilter(expressionComparator); + return filter; + } + + /** + * Currently use BinaryComparator only + * <h2>TODO: </h2> + * Possibility to tune performance by using: OR[BinaryComparator,...] instead of RegexStringComparator? + * + *<br/> <br/> + * + * ! Check op must be IN or NOTIN in caller + * + * @param entry + * @return + */ + private Filter buildListQualifierFilter(QualifierFilterEntity entry){ + List<String> valueSet = EntityQualifierUtils.parseList(entry.getValue()); + Iterator<String> it = valueSet.iterator(); + String fieldName = entry.getKey(); + String qualifierName = fieldName; + if(!_ed.isTag(entry.getKey())){ + qualifierName = _ed.getDisplayNameMap().get(entry.getKey()).getQualifierName(); + } + +// TODO: Try to use RegExp just work if possible +// Because single SingleColumnValueFilter is much faster than multi SingleColumnValueFilters in OR list. +// Class qualifierType = EntityQualifierHelper.getType(_ed,fieldName); +// if( qualifierType == null || qualifierType == String.class){ +// boolean first = true; +// StringBuilder filterRegex = new StringBuilder(); +// filterRegex.append("^("); +// while(it.hasNext()) { +// String value = it.next(); +// if(value == null) { +// logger.warn("ignore empty value in set qualifier filter: "+entry.toString()); +// continue; +// } +// if(!first) filterRegex.append("|"); +// filterRegex.append(value); +// first = false; +// } +// filterRegex.append(")$"); +// RegexStringComparator regexStringComparator = new RegexStringComparator(filterRegex.toString()); +// return new SingleColumnValueFilter(_ed.getColumnFamily().getBytes(), qualifierName.getBytes(), +// convertToHBaseCompareOp(entry.getOp()), regexStringComparator); +// }else{ + FilterList setFilterList; + if(ComparisonOperator.IN.equals(entry.getOp())){ + setFilterList = new FilterList(Operator.MUST_PASS_ONE); + }else if(ComparisonOperator.NOT_IN.equals(entry.getOp())) { + setFilterList = new FilterList(Operator.MUST_PASS_ALL); + }else{ + throw new IllegalArgumentException(String.format("Don't support operation: %s on LIST type of value yet: %s, valid options: IN/NOT IN [LIST]",entry.getOp(),entry.toString())); + } + + while(it.hasNext()) { + String value = it.next(); + BinaryComparator comparator = new BinaryComparator(EntityQualifierUtils.toBytes(_ed, fieldName, value)); + SingleColumnValueFilter filter = + new SingleColumnValueFilter(_ed.getColumnFamily().getBytes(), qualifierName.getBytes(), convertToHBaseCompareOp(entry.getOp()), comparator); + filter.setFilterIfMissing(_filterIfMissing); + setFilterList.addFilter(filter); + } + + return setFilterList; +// } + } + + /** + * Just used for LIKE and NOT_LIKE + * + * @param qualifierValue + * @return + */ + protected String buildQualifierRegex(String qualifierValue){ + StringBuilder sb = new StringBuilder(); +// sb.append("(?s)"); + sb.append("^"); + sb.append(qualifierValue); + sb.append("$"); + return sb.toString(); + } + + /** + * Appends the given ID to the given buffer, followed by "\\E". + * [steal it from opentsdb, thanks opentsdb :) https://github.com/OpenTSDB/opentsdb/blob/master/src/core/TsdbQuery.java] + */ + private static void addId(final StringBuilder buf, final byte[] id) { + buf.append("\\Q"); + boolean backslash = false; + for (final byte b : id) { + buf.append((char) (b & 0xFF)); + if (b == 'E' && backslash) { // If we saw a `\' and now we have a `E'. + // So we just terminated the quoted section because we just added \E + // to `buf'. So let's put a litteral \E now and start quoting again. + buf.append("\\\\E\\Q"); + } else { + backslash = b == '\\'; + } + } + buf.append("\\E"); + } + + @SuppressWarnings("unused") + private static void addId(final StringBuilder buf, final String id) { + buf.append("\\Q"); + int len = id.length()-1; + boolean backslash = false; + for (int i =0; i < len; i++) { + char c = id.charAt(i); + buf.append(c); + if (c == 'E' && backslash) { // If we saw a `\' and now we have a `E'. + // So we just terminated the quoted section because we just added \E + // to `buf'. So let's put a litteral \E now and start quoting again. + buf.append("\\\\E\\Q"); + } else { + backslash = c == '\\'; + } + } + buf.append("\\E"); + } + + /** + * one search tag may have multiple values which have OR relationship, and relationship between + * different search tags is AND + * the query is like "(TAG1=value11 OR TAG1=value12) AND TAG2=value2" + * @param tags + * @return + */ + protected String buildTagFilterRegex(Map<String, List<String>> tags){ + // TODO need consider that \E could be part of tag, refer to https://github.com/OpenTSDB/opentsdb/blob/master/src/core/TsdbQuery.java + final SortedMap<Integer, List<Integer>> tagHash = new TreeMap<Integer, List<Integer>>(); + final int numOfPartitionFields = (_ed.getPartitions() == null) ? 0 : _ed.getPartitions().length; + for(Map.Entry<String, List<String>> entry : tags.entrySet()){ + String tagName = entry.getKey(); + // Ignore tag if the tag is one of partition fields + if (_ed.isPartitionTag(tagName)) { + continue; + } + List<String> stringValues = entry.getValue(); + List<Integer> hashValues = new ArrayList<Integer>(stringValues.size()); + for(String value : stringValues){ + hashValues.add(value.hashCode()); + } + tagHash.put(tagName.hashCode(), hashValues); + } + + // header = prefix(4 bytes) + partition_hashes(4*N bytes) + timestamp (8 bytes) + final int headerLength = 4 + numOfPartitionFields * 4 + 8; + + // <tag1:4><value1:4> ... <tagn:4><valuen:4> + StringBuilder sb = new StringBuilder(); + sb.append("(?s)"); + sb.append("^(?:.{").append(headerLength).append("})"); + sb.append("(?:.{").append(8).append("})*"); // for any number of tags + for (Map.Entry<Integer, List<Integer>> entry : tagHash.entrySet()) { + try { + addId(sb, ByteUtil.intToBytes(entry.getKey())); + List<Integer> hashValues = entry.getValue(); + sb.append("(?:"); + boolean first = true; + for(Integer value : hashValues){ + if(!first){ + sb.append('|'); + } + addId(sb, ByteUtil.intToBytes(value)); + first = false; + } + sb.append(")"); + sb.append("(?:.{").append(8).append("})*"); // for any number of tags + } catch (Exception ex) { + LOG.error("constructing regex error", ex); + } + } + sb.append("$"); + if(LOG.isDebugEnabled()) LOG.debug("Tag filter pattern is " + sb.toString()); + return sb.toString(); + } + + /** + * Convert ComparisonOperator to native HBase CompareOp + * + * Support: + * =, =~,CONTAINS,<,<=,>,>=,!=,!=~ + * + * @param comp + * @return + */ + protected static CompareOp convertToHBaseCompareOp(ComparisonOperator comp) { + if(comp == ComparisonOperator.EQUAL || comp == ComparisonOperator.LIKE + || comp == ComparisonOperator.CONTAINS + || comp == ComparisonOperator.IN + || comp == ComparisonOperator.IS + ) { + return CompareOp.EQUAL; + }else if(comp == ComparisonOperator.LESS) { + return CompareOp.LESS; + } else if(comp == ComparisonOperator.LESS_OR_EQUAL){ + return CompareOp.LESS_OR_EQUAL; + }else if(comp == ComparisonOperator.GREATER) { + return CompareOp.GREATER; + } else if(comp == ComparisonOperator.GREATER_OR_EQUAL){ + return CompareOp.GREATER_OR_EQUAL; + } else if(comp == ComparisonOperator.NOT_EQUAL + || comp == ComparisonOperator.NOT_LIKE + || comp == ComparisonOperator.NOT_CONTAINS + || comp == ComparisonOperator.IS_NOT + || comp == ComparisonOperator.NOT_IN) + { + return CompareOp.NOT_EQUAL; + } else { + LOG.error("{} operation is not supported now\n", comp); + throw new IllegalArgumentException("Illegal operation: "+comp+ ", avaliable options: "+ Arrays.toString(ComparisonOperator.values())); + } + } + + protected static CompareOp getHBaseCompareOp(String comp) { + return convertToHBaseCompareOp(ComparisonOperator.locateOperator(comp)); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/QualifierFilterEntity.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/QualifierFilterEntity.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/QualifierFilterEntity.java new file mode 100755 index 0000000..6cdc77b --- /dev/null +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/QualifierFilterEntity.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.log.entity.filter; + +import org.apache.eagle.query.parser.ComparisonOperator; +import org.apache.eagle.query.parser.TokenType; +import org.apache.hadoop.io.Writable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class QualifierFilterEntity implements Writable{ + public String key; + public String value; + public ComparisonOperator op; + public TokenType valueType; + public TokenType keyType; + + public QualifierFilterEntity(){} + public QualifierFilterEntity(String key, String value, ComparisonOperator comp, TokenType keyType, TokenType valueType) { + super(); + this.key = key; + this.value = value; + this.op = comp; + this.keyType = keyType; + this.valueType = valueType; + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + + public ComparisonOperator getOp() { + return op; + } + + public void setOp(ComparisonOperator op) { + this.op = op; + } + + public TokenType getValueType() { + return valueType; + } + + public void setValueType(TokenType valueType) { + this.valueType = valueType; + } + + public void setKeyType(TokenType keyType){ + this.keyType = keyType; + } + public TokenType getKeyType(){ + return this.keyType; + } + + @Override + public String toString() { + return String.format("%s %s %s",this.key,this.op,this.value); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(this.key); + out.writeUTF(this.getValue()); + out.writeUTF(this.op.name()); + out.writeUTF(this.keyType.name()); + out.writeUTF(this.valueType.name()); + } + + @Override + public void readFields(DataInput in) throws IOException { + this.key = in.readUTF(); + this.value = in.readUTF(); + this.op = ComparisonOperator.valueOf(in.readUTF()); + this.keyType = TokenType.valueOf(in.readUTF()); + this.valueType = TokenType.valueOf(in.readUTF()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/RowValueFilter.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/RowValueFilter.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/RowValueFilter.java new file mode 100755 index 0000000..a4b97ea --- /dev/null +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/RowValueFilter.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.log.entity.filter; + +import com.google.common.io.ByteArrayDataInput; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterBase; +import org.apache.hadoop.io.WritableComparable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; + +/** + * TODO: Critical performance problem!!! + * TODO: Refactor to specified multi-column filter so that avoid return all qualifier columns from region server to client side + * + * @since 2014/11/17 + */ +public class RowValueFilter extends FilterBase { + private final static Logger LOG = LoggerFactory.getLogger(RowValueFilter.class); + private boolean filterOutRow = false; + private WritableComparable<List<KeyValue>> comparator; + + // TODO: Use qualifiers to reduce network tranfer +// private List<byte[]> qualifiers; + public RowValueFilter(){} + + /** + * Filter out row if WritableComparable.compareTo return 0 + * @param comparator <code>WritableComparable[List[KeyValue]]</code> + */ + public RowValueFilter(WritableComparable<List<KeyValue>> comparator){ + this.comparator = comparator; + } + +// public RowValueFilter(List<byte[]> qualifiers,WritableComparable<List<KeyValue>> comparator){ +// this.qualifiers = qualifiers; +// this.comparator = comparator; +// } + + /** + * Old interface in hbase-0.94 + * + * @param out + * @throws IOException + */ + @Deprecated + public void write(DataOutput out) throws IOException { + this.comparator.write(out); + } + + /** + * Old interface in hbase-0.94 + * + * @param in + * @throws IOException + */ +// @Override + @Deprecated + public void readFields(DataInput in) throws IOException { + this.comparator = new BooleanExpressionComparator(); + this.comparator.readFields(in); + } + + /** + * TODO: Currently still use older serialization method from hbase-0.94, need to migrate into ProtoBuff based + * + * @return + * @throws IOException + */ + @Override + public byte[] toByteArray() throws IOException { + ByteArrayDataOutput byteArrayDataOutput = ByteStreams.newDataOutput(); + this.comparator.write(byteArrayDataOutput); + return byteArrayDataOutput.toByteArray(); + } + + /** + * TODO: Currently still use older serialization method from hbase-0.94, need to migrate into ProtoBuff based + */ + // Override static method + public static Filter parseFrom(final byte [] pbBytes) throws DeserializationException { + ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(pbBytes); + RowValueFilter filter = new RowValueFilter(); + try { + filter.readFields(byteArrayDataInput); + } catch (IOException e) { + LOG.error("Got error to deserialize RowValueFilter from PB bytes",e); + throw new DeserializationException(e); + } + return filter; + } + + @Override + public boolean hasFilterRow(){ + return true; + } + + @Override + public void filterRow(List<KeyValue> row) { + filterOutRow = (this.comparator.compareTo(row) == 0); + } + + @Override + public void reset() { + this.filterOutRow = false; + } + + @Override + public boolean filterRow(){ + return filterOutRow; + } + + @Override + public String toString() { + return super.toString()+" ( "+this.comparator.toString()+" )"; + } + +// public List<byte[]> getQualifiers() { +// return qualifiers; +// } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/TypedByteArrayComparator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/TypedByteArrayComparator.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/TypedByteArrayComparator.java new file mode 100755 index 0000000..ecaf8cc --- /dev/null +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/TypedByteArrayComparator.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.log.entity.filter; + +import com.google.common.io.ByteArrayDataInput; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.filter.ByteArrayComparable; +import org.apache.hadoop.io.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * <h1>TypedByteArrayComparator</h1> + * + * Compare byte array: <code>byte[] value</code> with class type: <code>Class type</code> + * + * <br/> + * <br/> + * Built-in support: + * + * <pre> + * Double + * double + * Integer + * int + * Long + * long + * Short + * short + * Boolean + * boolean + * </pre> + * + * And can be extend by defining new {@link RawComparator} and register with {@link #define(Class type, RawComparator comparator)} + * <br/> + * <br/> + */ +public class TypedByteArrayComparator extends ByteArrayComparable { + private final static Logger LOG = LoggerFactory.getLogger(TypedByteArrayComparator.class); + + private Class type; + + // Not need to be writable + private RawComparator comparator; + + /** + * Default constructor for writable + */ + @SuppressWarnings("unused") + public TypedByteArrayComparator(){ + super(null); + } + + public TypedByteArrayComparator(byte[] value, Class type){ + super(value); + this.type = type; + this.comparator = get(this.type); + if(this.comparator == null) throw new IllegalArgumentException("No comparator found for class: "+type); + } + + /** + * @param in hbase-0.94 interface + * @throws IOException + */ +// @Override + public void readFields(DataInput in) throws IOException { +// super.readFields(in); + try { + String _type = in.readUTF(); + type = _primitiveTypeClassMap.get(_type); + if(type == null) { + type = Class.forName(_type); + } + comparator = get(type); + if(comparator == null) throw new IllegalArgumentException("No comparator found for class: "+type); + } catch (ClassNotFoundException e) { + throw new IOException(e.getMessage(),e); + } + } + + /** + * @param out hbase-0.94 interface + * @throws IOException + */ +// @Override + public void write(DataOutput out) throws IOException { +// super.write(out); + String typeName = type.getName(); + out.writeUTF(typeName); + } + + /** + * For hbase 0.98 + * + * @return serialized byte array + */ + @Override + public byte[] toByteArray() { + ByteArrayDataOutput byteArrayDataOutput = ByteStreams.newDataOutput(); + try { + this.write(byteArrayDataOutput); + return byteArrayDataOutput.toByteArray(); + } catch (IOException e) { + LOG.error("Failed to serialize due to: "+e.getMessage(),e); + throw new RuntimeException(e); + } + } + + /** + * For hbase 0.98 + * + * @param bytes raw byte array + * @return Comparator instance + * @throws DeserializationException + */ + public static TypedByteArrayComparator parseFrom(final byte [] bytes) + throws DeserializationException { + TypedByteArrayComparator comparator = new TypedByteArrayComparator(); + ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(bytes); + try { + comparator.readFields(byteArrayDataInput); + } catch (IOException e) { + LOG.error("Got error to deserialize TypedByteArrayComparator from PB bytes",e); + throw new DeserializationException(e); + } + return comparator; + } + + @Override + public int compareTo(byte[] value, int offset, int length) { + return this.comparator.compare(this.getValue(), 0, this.getValue().length, value, offset, length); + } + + /** + * <ol> + * <li>Try registered comparator</li> + * <li>If not found, try all possible WritableComparator</li> + * </ol> + * + * If not found finally, throw new IllegalArgumentException("unable to get comparator for class: "+type); + * + * @param type value type class + * @return RawComparator + */ + public static RawComparator get(Class type){ + RawComparator comparator = null; + try { + comparator = _typedClassComparator.get(type); + }catch (ClassCastException ex){ + // ignore + } + try { + if (comparator == null) comparator = WritableComparator.get(type); + }catch (ClassCastException ex){ + // ignore + } + return comparator; + } + + private final static Map<Class,RawComparator> _typedClassComparator = new HashMap<Class, RawComparator>(); + public static void define(Class type, RawComparator comparator){ + _typedClassComparator.put(type,comparator); + } + + static{ + define(Double.class, WritableComparator.get(DoubleWritable.class)); + define(double.class, WritableComparator.get(DoubleWritable.class)); + define(Integer.class, WritableComparator.get(IntWritable.class)); + define(int.class, WritableComparator.get(IntWritable.class)); + define(Long.class, WritableComparator.get(LongWritable.class)); + define(long.class, WritableComparator.get(LongWritable.class)); + define(Short.class, WritableComparator.get(ShortWritable.class)); + define(short.class, WritableComparator.get(ShortWritable.class)); + define(Boolean.class, WritableComparator.get(BooleanWritable.class)); + define(boolean.class, WritableComparator.get(BooleanWritable.class)); + } + + /** + * Because {@link Class#forName } can't find class for primitive type + */ + private final static Map<String,Class> _primitiveTypeClassMap = new HashMap<String, Class>(); + static { + _primitiveTypeClassMap.put(int.class.getName(),int.class); + _primitiveTypeClassMap.put(double.class.getName(),double.class); + _primitiveTypeClassMap.put(long.class.getName(),long.class); + _primitiveTypeClassMap.put(short.class.getName(),short.class); + _primitiveTypeClassMap.put(boolean.class.getName(),boolean.class); + _primitiveTypeClassMap.put(char.class.getName(),char.class); + _primitiveTypeClassMap.put(byte.class.getName(),byte.class); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexLogReader.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexLogReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexLogReader.java new file mode 100755 index 0000000..418ab33 --- /dev/null +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexLogReader.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.log.entity.index; + +import org.apache.eagle.log.entity.LogReader; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; + +public abstract class IndexLogReader implements LogReader { + + // TODO: Work around https://issues.apache.org/jira/browse/HBASE-2198. More graceful implementation should use SingleColumnValueExcludeFilter, + // but it's complicated in current implementation. + protected static void workaroundHBASE2198(Get get, Filter filter,byte[][] qualifiers) { + if (filter instanceof SingleColumnValueFilter) { + if(qualifiers == null) { + get.addFamily(((SingleColumnValueFilter) filter).getFamily()); + }else{ + get.addColumn(((SingleColumnValueFilter) filter).getFamily(), ((SingleColumnValueFilter) filter).getQualifier()); + } + return; + } + if (filter instanceof FilterList) { + for (Filter f : ((FilterList)filter).getFilters()) { + workaroundHBASE2198(get, f,qualifiers); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexStreamReader.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexStreamReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexStreamReader.java new file mode 100755 index 0000000..9e059f2 --- /dev/null +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexStreamReader.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.log.entity.index; + +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; +import org.apache.eagle.log.entity.*; +import org.apache.eagle.log.entity.meta.EntityDefinition; +import org.apache.eagle.log.entity.meta.IndexDefinition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +public abstract class IndexStreamReader extends StreamReader { + protected final IndexDefinition indexDef; + protected final SearchCondition condition; + protected final List<byte[]> indexRowkeys; + protected LogReader<InternalLog> reader; + protected long lastTimestamp = 0; + protected long firstTimestamp = 0; + + protected static final Logger LOG = LoggerFactory.getLogger(IndexStreamReader.class); + + public IndexStreamReader(IndexDefinition indexDef, SearchCondition condition, List<byte[]> indexRowkeys) { + this.indexDef = indexDef; + this.condition = condition; + this.indexRowkeys = indexRowkeys; + this.reader = null; + } + + @Override + public long getLastTimestamp() { + return lastTimestamp; + } + + @Override + public long getFirstTimestamp() { + return this.firstTimestamp; + } + + @Override + public void readAsStream() throws Exception { + if (reader == null) { + reader = createIndexReader(); + } + final EntityDefinition entityDef = indexDef.getEntityDefinition(); + try{ + reader.open(); + InternalLog log; + int count = 0; + while ((log = reader.read()) != null) { + TaggedLogAPIEntity entity = HBaseInternalLogHelper.buildEntity(log, entityDef); + entity.setSerializeAlias(condition.getOutputAlias()); + entity.setSerializeVerbose(condition.isOutputVerbose()); + + if (lastTimestamp == 0 || lastTimestamp < entity.getTimestamp()) { + lastTimestamp = entity.getTimestamp(); + } + if(firstTimestamp == 0 || firstTimestamp > entity.getTimestamp()){ + firstTimestamp = entity.getTimestamp(); + } + for(EntityCreationListener l : _listeners){ + l.entityCreated(entity); + } + if(++count == condition.getPageSize()) { + break; + } + } + }catch(IOException ioe){ + LOG.error("Fail reading log", ioe); + throw ioe; + }finally{ + reader.close(); + } + } + + protected abstract LogReader createIndexReader(); + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexLogReader.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexLogReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexLogReader.java new file mode 100755 index 0000000..e6a5c96 --- /dev/null +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexLogReader.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.log.entity.index; + +import org.apache.eagle.common.config.EagleConfigFactory; +import org.apache.eagle.log.entity.HBaseInternalLogHelper; +import org.apache.eagle.log.entity.InternalLog; +import org.apache.eagle.log.entity.meta.IndexDefinition; +import org.apache.eagle.common.ByteUtil; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.filter.Filter; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + + +public class NonClusteredIndexLogReader extends IndexLogReader { + private final IndexDefinition indexDef; + private final List<byte[]> indexRowkeys; + private final byte[][] qualifiers; + private final Filter filter; + private HTableInterface tbl; + private boolean isOpen = false; + private Result[] results; + private int index = -1; + private final List<Scan> scans; + private int currentScanIndex = 0; + private ResultScanner currentResultScanner; + + // Max tag key/value. + private static final byte[] MAX_TAG_VALUE_BYTES = {(byte) 0XFF,(byte) 0XFF,(byte) 0XFF,(byte) 0XFF,(byte) 0XFF,(byte) 0XFF,(byte) 0XFF,(byte) 0XFF,(byte) 0XFF}; + private static final int BATCH_MULTIGET_SIZE = 1000; + + public NonClusteredIndexLogReader(IndexDefinition indexDef, List<byte[]> indexRowkeys, byte[][] qualifiers, Filter filter) { + this.indexDef = indexDef; + this.indexRowkeys = indexRowkeys; + this.qualifiers = qualifiers; + this.filter = filter; + this.scans = buildScans(); + } + + + private List<Scan> buildScans() { + final ArrayList<Scan> result = new ArrayList<Scan>(indexRowkeys.size()); + for (byte[] rowkey : indexRowkeys) { + Scan s = new Scan(); + s.setStartRow(rowkey); + // In rowkey the tag key/value is sorted by the hash code of the key, so MAX_TAG_VALUE_BYTES is enough as the end key + final byte[] stopRowkey = ByteUtil.concat(rowkey, MAX_TAG_VALUE_BYTES); + s.setStopRow(stopRowkey); + // TODO the # of cached rows should be minimum of (pagesize and 100) + int cs = EagleConfigFactory.load().getHBaseClientScanCacheSize(); + s.setCaching(cs); + // TODO not optimized for all applications + s.setCacheBlocks(true); + // scan specified columnfamily for all qualifiers + s.addFamily(indexDef.getEntityDefinition().getColumnFamily().getBytes()); + result.add(s); + } + return result; + } + + @Override + public void open() throws IOException { + if (isOpen) + return; // silently return + try { + tbl = EagleConfigFactory.load().getHTable(indexDef.getEntityDefinition().getTable()); + } catch (RuntimeException ex) { + throw new IOException(ex); + } + currentScanIndex = 0; + openNewScan(); + fillResults(); + } + + private boolean openNewScan() throws IOException { + closeCurrentScanResult(); + if (currentScanIndex >= scans.size()) { + return false; + } + final Scan scan = scans.get(currentScanIndex++); + currentResultScanner = tbl.getScanner(scan); + return true; + } + + private void fillResults() throws IOException { + if (currentResultScanner == null) { + return; + } + index = 0; + int count = 0; + Result r = null; + final List<Get> gets = new ArrayList<Get>(BATCH_MULTIGET_SIZE); + final byte[] family = indexDef.getEntityDefinition().getColumnFamily().getBytes(); + while (count < BATCH_MULTIGET_SIZE) { + r = currentResultScanner.next(); + if (r == null) { + if (openNewScan()) { + continue; + } else { + break; + } + } + for (byte[] rowkey : r.getFamilyMap(family).keySet()) { + if (rowkey.length == 0) { // invalid rowkey + continue; + } + final Get get = new Get(rowkey); + if (filter != null) { + get.setFilter(filter); + } + if(qualifiers != null) { + for (int j = 0; j < qualifiers.length; ++j) { + // Return the specified qualifiers + get.addColumn(family, qualifiers[j]); + } + }else { + get.addFamily(family); + } + workaroundHBASE2198(get, filter,qualifiers); + gets.add(get); + ++count; + } + } + if (count == 0) { + results = null; + return; + } + results = tbl.get(gets); + if (results == null || results.length == 0) { + fillResults(); + } + } + + + private void closeCurrentScanResult() { + if (currentResultScanner != null) { + currentResultScanner.close(); + currentResultScanner = null; + } + } + + + @Override + public void close() throws IOException { + if(tbl != null){ + new HTableFactory().releaseHTableInterface(tbl); + } + closeCurrentScanResult(); + } + + @Override + public InternalLog read() throws IOException { + if (tbl == null) { + throw new IllegalArgumentException("Haven't open before reading"); + } + + Result r = null; + InternalLog t = null; + while ((r = getNextResult()) != null) { + if (r.getRow() == null) { + continue; + } + t = HBaseInternalLogHelper.parse(indexDef.getEntityDefinition(), r, qualifiers); + break; + } + return t; + } + + + private Result getNextResult() throws IOException { + if (results == null || results.length == 0 || index >= results.length) { + fillResults(); + } + if (results == null || results.length == 0 || index >= results.length) { + return null; + } + return results[index++]; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexStreamReader.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexStreamReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexStreamReader.java new file mode 100755 index 0000000..ec5631a --- /dev/null +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexStreamReader.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.log.entity.index; + +import org.apache.eagle.log.entity.HBaseInternalLogHelper; +import org.apache.eagle.log.entity.LogReader; +import org.apache.eagle.log.entity.SearchCondition; +import org.apache.eagle.log.entity.meta.EntityDefinition; +import org.apache.eagle.log.entity.meta.IndexDefinition; +import org.apache.eagle.log.entity.meta.IndexDefinition.IndexType; + +import java.util.ArrayList; +import java.util.List; + +public class NonClusteredIndexStreamReader extends IndexStreamReader { + public NonClusteredIndexStreamReader(IndexDefinition indexDef, SearchCondition condition) { + super(indexDef, condition, new ArrayList<byte[]>()); + final IndexType type = indexDef.canGoThroughIndex(condition.getQueryExpression(), indexRowkeys); + if (!IndexType.NON_CLUSTER_INDEX.equals(type)) { + throw new IllegalArgumentException("This query can't go through index: " + condition.getQueryExpression()); + } + } + + public NonClusteredIndexStreamReader(IndexDefinition indexDef, SearchCondition condition, List<byte[]> indexRowkeys) { + super(indexDef, condition, indexRowkeys); + } + + @Override + protected LogReader createIndexReader() { + final EntityDefinition entityDef = indexDef.getEntityDefinition(); + byte[][] outputQualifiers = null; + if(!condition.isOutputAll()) { + outputQualifiers = HBaseInternalLogHelper.getOutputQualifiers(entityDef, condition.getOutputFields()); + } + return new NonClusteredIndexLogReader(indexDef, indexRowkeys, outputQualifiers, condition.getFilter()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/RowKeyLogReader.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/RowKeyLogReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/RowKeyLogReader.java new file mode 100755 index 0000000..1c16dc8 --- /dev/null +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/RowKeyLogReader.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.log.entity.index; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.eagle.common.config.EagleConfigFactory; +import org.apache.eagle.log.entity.HBaseInternalLogHelper; +import org.apache.eagle.log.entity.InternalLog; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTableFactory; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Result; + +import org.apache.eagle.log.entity.meta.EntityDefinition; + +public class RowKeyLogReader extends IndexLogReader { + private final EntityDefinition ed; + private final List<byte[]> rowkeys; + private final byte[][] qualifiers; + private HTableInterface tbl; + private boolean isOpen = false; + private Result[] entityResult; + private int getIndex = -1; + + public RowKeyLogReader(EntityDefinition ed, byte[] rowkey) { + this.ed = ed; + this.rowkeys = new ArrayList<>(); + this.rowkeys.add(rowkey); + this.qualifiers = null; + } + + public RowKeyLogReader(EntityDefinition ed, byte[] rowkey,byte[][] qualifiers) { + this.ed = ed; + this.rowkeys = new ArrayList<>(); + this.rowkeys.add(rowkey); + this.qualifiers = qualifiers; + } + + public RowKeyLogReader(EntityDefinition ed, List<byte[]> rowkeys,byte[][] qualifiers) { + this.ed = ed; + this.rowkeys = rowkeys; + this.qualifiers = qualifiers; + } + + @Override + public void open() throws IOException { + if (isOpen) + return; // silently return + try { + tbl = EagleConfigFactory.load().getHTable(ed.getTable()); + } catch (RuntimeException ex) { + throw new IOException(ex); + } + final byte[] family = ed.getColumnFamily().getBytes(); + List<Get> gets = new ArrayList<>(this.rowkeys.size()); + + for(byte[] rowkey:rowkeys) { + Get get = new Get(rowkey); + get.addFamily(family); + + if(qualifiers != null) { + for(byte[] qualifier: qualifiers){ + get.addColumn(family,qualifier); + } + } + + gets.add(get); + } + + entityResult = tbl.get(gets); + isOpen = true; + } + + @Override + public void close() throws IOException { + if(tbl != null){ + new HTableFactory().releaseHTableInterface(tbl); + } + } + + @Override + public InternalLog read() throws IOException { + if(entityResult == null || entityResult.length == 0 || this.getIndex >= entityResult.length - 1){ + return null; + } + getIndex ++; + InternalLog t = HBaseInternalLogHelper.parse(ed, entityResult[getIndex], this.qualifiers); + return t; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/UniqueIndexLogReader.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/UniqueIndexLogReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/UniqueIndexLogReader.java new file mode 100755 index 0000000..8ff3448 --- /dev/null +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/UniqueIndexLogReader.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.log.entity.index; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.NavigableMap; + +import org.apache.eagle.common.config.EagleConfigFactory; +import org.apache.eagle.log.entity.HBaseInternalLogHelper; +import org.apache.eagle.log.entity.InternalLog; +import org.apache.eagle.log.entity.meta.IndexDefinition; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTableFactory; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.filter.Filter; + +public class UniqueIndexLogReader extends IndexLogReader { + + private final IndexDefinition indexDef; + private final List<byte[]> indexRowkeys; + private final byte[][] qualifiers; + private final Filter filter; + private HTableInterface tbl; + private boolean isOpen = false; + private Result[] entityResults; + private int index = -1; + + public UniqueIndexLogReader(IndexDefinition indexDef, List<byte[]> indexRowkeys, byte[][] qualifiers, Filter filter) { + this.indexDef = indexDef; + this.indexRowkeys = indexRowkeys; + this.qualifiers = qualifiers; + this.filter = filter; + } + + @Override + public void open() throws IOException { + if (isOpen) + return; // silently return + try { + tbl = EagleConfigFactory.load().getHTable(indexDef.getEntityDefinition().getTable()); + } catch (RuntimeException ex) { + throw new IOException(ex); + } + final byte[] family = indexDef.getEntityDefinition().getColumnFamily().getBytes(); + final List<Get> indexGets = new ArrayList<>(); + for (byte[] rowkey : indexRowkeys) { + Get get = new Get(rowkey); + // Return all index qualifiers + get.addFamily(family); + indexGets.add(get); + } + final Result[] indexResults = tbl.get(indexGets); + indexGets.clear(); + for (Result indexResult : indexResults) { + final NavigableMap<byte[], byte[]> map = indexResult.getFamilyMap(family); + if (map == null) { + continue; + } + for (byte[] entityRowkey : map.keySet()) { + Get get = new Get(entityRowkey); + if (filter != null) { + get.setFilter(filter); + } + if(qualifiers == null) { + // filter all qualifiers if output qualifiers are null + get.addFamily(family); + }else { + for (int i = 0; i < qualifiers.length; ++i) { + // Return the specified qualifiers + get.addColumn(family, qualifiers[i]); + } + } + workaroundHBASE2198(get, filter,qualifiers); + indexGets.add(get); + } + } + entityResults = tbl.get(indexGets); + isOpen = true; + } + + @Override + public void close() throws IOException { + if(tbl != null){ + new HTableFactory().releaseHTableInterface(tbl); + } + } + + @Override + public InternalLog read() throws IOException { + if (entityResults == null) { + throw new IllegalArgumentException("entityResults haven't been initialized before reading"); + } + InternalLog t = null; + while (entityResults.length > ++index) { + Result r = entityResults[index]; + if (r != null) { + if (r.getRow() == null) { + continue; + } + t = HBaseInternalLogHelper.parse(indexDef.getEntityDefinition(), r, qualifiers); + break; + } + } + return t; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/UniqueIndexStreamReader.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/UniqueIndexStreamReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/UniqueIndexStreamReader.java new file mode 100755 index 0000000..0391d57 --- /dev/null +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/UniqueIndexStreamReader.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.log.entity.index; + +import org.apache.eagle.log.entity.HBaseInternalLogHelper; +import org.apache.eagle.log.entity.LogReader; +import org.apache.eagle.log.entity.SearchCondition; +import org.apache.eagle.log.entity.meta.EntityDefinition; +import org.apache.eagle.log.entity.meta.IndexDefinition; +import org.apache.eagle.log.entity.meta.IndexDefinition.IndexType; + +import java.util.ArrayList; +import java.util.List; + +public class UniqueIndexStreamReader extends IndexStreamReader { + public UniqueIndexStreamReader(IndexDefinition indexDef, SearchCondition condition) { + super(indexDef, condition, new ArrayList<byte[]>()); + final IndexType type = indexDef.canGoThroughIndex(condition.getQueryExpression(), indexRowkeys); + if (!IndexType.UNIQUE_INDEX.equals(type)) { + throw new IllegalArgumentException("This query can't go through index: " + condition.getQueryExpression()); + } + } + + public UniqueIndexStreamReader(IndexDefinition indexDef, SearchCondition condition, List<byte[]> indexRowkeys) { + super(indexDef, condition, indexRowkeys); + } + + @Override + protected LogReader createIndexReader() { + final EntityDefinition entityDef = indexDef.getEntityDefinition(); +// final + byte[][] outputQualifiers = null; + if(!condition.isOutputAll()) { + outputQualifiers = HBaseInternalLogHelper.getOutputQualifiers(entityDef, condition.getOutputFields()); + } + return new UniqueIndexLogReader(indexDef, indexRowkeys, outputQualifiers, condition.getFilter()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/BooleanSerDeser.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/BooleanSerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/BooleanSerDeser.java new file mode 100755 index 0000000..cf40e31 --- /dev/null +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/BooleanSerDeser.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.log.entity.meta; + +/** + * @since : 7/3/14,2014 + */ +public class BooleanSerDeser implements EntitySerDeser<Boolean> { + + public BooleanSerDeser(){} + + @Override + public Boolean deserialize(byte[] bytes){ + if(bytes != null && bytes.length > 0){ + if(bytes[0] == 0){ + return false; + }else if(bytes[0] == 1){ + return true; + } + } + return null; + } + + @Override + public byte[] serialize(Boolean obj){ + if(obj != null){ + if(obj){ + return new byte[]{1}; + }else{ + return new byte[]{0}; + } + } + return null; + } + + @Override + public Class<Boolean> type() { + return Boolean.class; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Column.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Column.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Column.java new file mode 100644 index 0000000..b64e528 --- /dev/null +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Column.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.log.entity.meta; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Target({ElementType.FIELD}) +@Retention(RetentionPolicy.RUNTIME) +public @interface Column { + String value() default ""; +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ColumnFamily.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ColumnFamily.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ColumnFamily.java new file mode 100644 index 0000000..6e3e9c6 --- /dev/null +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/ColumnFamily.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.log.entity.meta; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Target({ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +public @interface ColumnFamily { + String value() default "f"; +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Double2DArraySerDeser.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Double2DArraySerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Double2DArraySerDeser.java new file mode 100644 index 0000000..27b011c --- /dev/null +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/Double2DArraySerDeser.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.log.entity.meta; + +import org.apache.eagle.common.ByteUtil; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +/** + * @since 7/22/15 + */ +public class Double2DArraySerDeser implements EntitySerDeser<double[][]> { + private final int SIZE = 8; + @Override + public double[][] deserialize(byte[] bytes){ +// if((bytes.length-4) % SIZE != 0) +// return null; + int offset = 0; + // get size of int array + int rowSize = ByteUtil.bytesToInt(bytes, offset); + offset += 4; + + double[][] data = new double[rowSize][]; + for(int i=0; i<rowSize; i++) { + int colSize = ByteUtil.bytesToInt(bytes, offset); + offset += 4; + double[] values = null; + if (colSize >= 0){ + values = new double[colSize]; + for (int j = 0; j < colSize; j++) { + values[j] = ByteUtil.bytesToDouble(bytes, offset); + offset += SIZE; + } + } + data[i] = values; + } + + return data; + } + + /** + * + * @param obj + * @return + */ + @Override + public byte[] serialize(double[][] obj){ + if(obj == null) return null; + ByteArrayOutputStream data = new ByteArrayOutputStream(); + int size = obj.length; + byte[] sizeBytes = ByteUtil.intToBytes(size); + data.write(sizeBytes,0,sizeBytes.length); + + try{ + for(double[] o:obj){ + if(o!=null){ + data.write(ByteUtil.intToBytes(o.length)); + for(double d:o){ + data.write(ByteUtil.doubleToBytes(d),0,SIZE); + } + }else{ + data.write(ByteUtil.intToBytes(-1),0,4); + } + } + } catch (IOException e) { + e.printStackTrace(); + } + + byte[] bytes = data.toByteArray(); + try { + data.close(); + } catch (IOException e) { + throw new IllegalStateException(e); + } + return bytes; + } + + @Override + public Class<double[][]> type() { + return double[][].class; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DoubleArraySerDeser.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DoubleArraySerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DoubleArraySerDeser.java new file mode 100755 index 0000000..d87e31c --- /dev/null +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DoubleArraySerDeser.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.log.entity.meta; + +import org.apache.eagle.common.ByteUtil; + +public class DoubleArraySerDeser implements EntitySerDeser<double[]>{ + + public DoubleArraySerDeser(){} + + private final int SIZE = 8; + @Override + public double[] deserialize(byte[] bytes){ + if((bytes.length-4) % SIZE != 0) + return null; + int offset = 0; + // get size of int array + int size = ByteUtil.bytesToInt(bytes, offset); + offset += 4; + double[] values = new double[size]; + for(int i=0; i<size; i++){ + values[i] = ByteUtil.bytesToDouble(bytes, offset); + offset += SIZE; + } + return values; + } + + /** + * + * @param obj + * @return + */ + @Override + public byte[] serialize(double[] obj){ + if(obj == null) + return null; + int size = obj.length; + byte[] array = new byte[4 + SIZE*size]; + byte[] first = ByteUtil.intToBytes(size); + int offset = 0; + System.arraycopy(first, 0, array, offset, first.length); + offset += first.length; + for(int i=0; i<size; i++){ + System.arraycopy(ByteUtil.doubleToBytes(obj[i]), 0, array, offset, SIZE); + offset += SIZE; + } + return array; + } + + @Override + public Class<double[]> type() { + return double[].class; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DoubleSerDeser.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DoubleSerDeser.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DoubleSerDeser.java new file mode 100755 index 0000000..330a99d --- /dev/null +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/DoubleSerDeser.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.log.entity.meta; + +import org.apache.eagle.common.ByteUtil; + +public class DoubleSerDeser implements EntitySerDeser<Double>{ + + @Override + public Double deserialize(byte[] bytes){ + if(bytes.length < 8) + return null; + return ByteUtil.bytesToDouble(bytes); + } + + @Override + public byte[] serialize(Double obj){ + if(obj == null) + return null; + return ByteUtil.doubleToBytes(obj); + } + + @Override + public Class<Double> type(){ + return Double.class; + } +} http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityConstants.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityConstants.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityConstants.java new file mode 100644 index 0000000..930743e --- /dev/null +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/meta/EntityConstants.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.log.entity.meta; + +import org.apache.eagle.common.DateTimeUtil; + +public class EntityConstants { + + public static final String FIXED_WRITE_HUMANTIME = "1970-01-02 00:00:00"; + public static final String FIXED_READ_START_HUMANTIME = "1970-01-01 00:00:00"; + public static final String FIXED_READ_END_HUMANTIME = "1970-01-03 00:00:00"; + + public static final long FIXED_WRITE_TIMESTAMP = + DateTimeUtil.humanDateToSecondsWithoutException(FIXED_WRITE_HUMANTIME) * 1000; + +}