http://git-wip-us.apache.org/repos/asf/kylin/blob/545201f6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java deleted file mode 100644 index a900ea1..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.regionserver.RegionScanner; -import org.apache.kylin.gridtable.StorageSideBehavior; -import org.apache.kylin.measure.MeasureAggregator; -import org.apache.kylin.storage.hbase.common.coprocessor.AggrKey; -import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter; -import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector; -import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType; - -/** - * @author yangli9 - * - */ -public class AggregationScanner implements RegionScanner { - - private RegionScanner outerScanner; - private StorageSideBehavior behavior; - - public AggregationScanner(CoprocessorRowType type, CoprocessorFilter filter, CoprocessorProjector groupBy, ObserverAggregators aggrs, RegionScanner innerScanner, StorageSideBehavior behavior) throws IOException { - - AggregateRegionObserver.LOG.info("Kylin Coprocessor start"); - - this.behavior = behavior; - - ObserverAggregationCache aggCache; - Stats stats = new Stats(); - - aggCache = buildAggrCache(innerScanner, type, groupBy, aggrs, filter, stats); - stats.countOutputRow(aggCache.getSize()); - this.outerScanner = aggCache.getScanner(innerScanner); - - AggregateRegionObserver.LOG.info("Kylin Coprocessor aggregation done: " + stats); - } - - @SuppressWarnings("rawtypes") - ObserverAggregationCache buildAggrCache(final RegionScanner innerScanner, CoprocessorRowType type, CoprocessorProjector projector, ObserverAggregators aggregators, CoprocessorFilter filter, Stats stats) throws IOException { - - ObserverAggregationCache aggCache = new ObserverAggregationCache(aggregators); - - ObserverTuple tuple = new ObserverTuple(type); - boolean hasMore = true; - List<Cell> results = new ArrayList<Cell>(); - byte meaninglessByte = 0; - - while (hasMore) { - results.clear(); - hasMore = innerScanner.nextRaw(results); - if (results.isEmpty()) - continue; - - if (stats != null) - stats.countInputRow(results); - - Cell cell = results.get(0); - tuple.setUnderlying(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); - - if (behavior == StorageSideBehavior.SCAN) { - //touch every byte of the cell so that the cost of scanning will be trully reflected - int endIndex = cell.getRowOffset() + cell.getRowLength(); - for (int i = cell.getRowOffset(); i < endIndex; ++i) { - meaninglessByte += cell.getRowArray()[i]; - } - } else { - if (behavior.filterToggledOn()) { - if (filter != null && filter.evaluate(tuple) == false) - continue; - - if (behavior.aggrToggledOn()) { - AggrKey aggKey = projector.getAggrKey(results); - MeasureAggregator[] bufs = aggCache.getBuffer(aggKey); - aggregators.aggregate(bufs, results); - - if (behavior.ordinal() >= StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) { - aggCache.checkMemoryUsage(); - } - } - } - } - } - - if (behavior == StorageSideBehavior.SCAN) { - System.out.println("meaningless byte is now " + meaninglessByte); - } - - return aggCache; - } - - @Override - public boolean next(List<Cell> results) throws IOException { - return outerScanner.next(results); - } - - @Override - public boolean next(List<Cell> result, int limit) throws IOException { - return outerScanner.next(result, limit); - } - - @Override - public boolean nextRaw(List<Cell> result) throws IOException { - return outerScanner.nextRaw(result); - } - - @Override - public boolean nextRaw(List<Cell> result, int limit) throws IOException { - return outerScanner.nextRaw(result, limit); - } - - @Override - public void close() throws IOException { - outerScanner.close(); - } - - @Override - public HRegionInfo getRegionInfo() { - return outerScanner.getRegionInfo(); - } - - @Override - public boolean isFilterDone() throws IOException { - return outerScanner.isFilterDone(); - } - - @Override - public boolean reseek(byte[] row) throws IOException { - return outerScanner.reseek(row); - } - - @Override - public long getMaxResultSize() { - return outerScanner.getMaxResultSize(); - } - - @Override - public long getMvccReadPoint() { - return outerScanner.getMvccReadPoint(); - } - - private static class Stats { - long inputRows = 0; - long inputBytes = 0; - long outputRows = 0; - - // have no outputBytes because that requires actual serialize all the - // aggregator buffers - - public void countInputRow(List<Cell> row) { - inputRows++; - inputBytes += row.get(0).getRowLength(); - for (int i = 0, n = row.size(); i < n; i++) { - inputBytes += row.get(i).getValueLength(); - } - } - - public void countOutputRow(long rowCount) { - outputRows += rowCount; - } - - public String toString() { - double percent = (double) outputRows / inputRows * 100; - return Math.round(percent) + "% = " + outputRows + " (out rows) / " + inputRows + " (in rows); in bytes = " + inputBytes + "; est. out bytes = " + Math.round(inputBytes * percent / 100); - } - } -}
http://git-wip-us.apache.org/repos/asf/kylin/blob/545201f6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregationCache.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregationCache.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregationCache.java deleted file mode 100644 index 8404262..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregationCache.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Iterator; -import java.util.List; -import java.util.Map.Entry; - -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValue.Type; -import org.apache.hadoop.hbase.regionserver.RegionScanner; -import org.apache.kylin.measure.MeasureAggregator; -import org.apache.kylin.storage.hbase.common.coprocessor.AggrKey; -import org.apache.kylin.storage.hbase.common.coprocessor.AggregationCache; - -/** - * @author yangli9 - */ -@SuppressWarnings("rawtypes") -public class ObserverAggregationCache extends AggregationCache { - - private final ObserverAggregators aggregators; - - public ObserverAggregationCache(ObserverAggregators aggregators) { - this.aggregators = aggregators; - } - - public RegionScanner getScanner(RegionScanner innerScanner) { - return new AggregationRegionScanner(innerScanner); - } - - @Override - public MeasureAggregator[] createBuffer() { - return aggregators.createBuffer(); - } - - private class AggregationRegionScanner implements RegionScanner { - - private final RegionScanner innerScanner; - private final Iterator<Entry<AggrKey, MeasureAggregator[]>> iterator; - - public AggregationRegionScanner(RegionScanner innerScanner) { - this.innerScanner = innerScanner; - this.iterator = aggBufMap.entrySet().iterator(); - } - - @Override - public boolean next(List<Cell> results) throws IOException { - try { - // AggregateRegionObserver.LOG.info("Kylin Scanner next()"); - boolean hasMore = false; - if (iterator.hasNext()) { - Entry<AggrKey, MeasureAggregator[]> entry = iterator.next(); - makeCells(entry, results); - hasMore = iterator.hasNext(); - } - // AggregateRegionObserver.LOG.info("Kylin Scanner next() done"); - - return hasMore; - } catch (Exception e) { - throw new IOException("Error when calling next", e); - } - } - - private void makeCells(Entry<AggrKey, MeasureAggregator[]> entry, List<Cell> results) { - byte[][] families = aggregators.getHColFamilies(); - byte[][] qualifiers = aggregators.getHColQualifiers(); - int nHCols = aggregators.getHColsNum(); - - AggrKey rowKey = entry.getKey(); - MeasureAggregator[] aggBuf = entry.getValue(); - ByteBuffer[] rowValues = aggregators.getHColValues(aggBuf); - - if (nHCols == 0) { - Cell keyValue = new KeyValue(rowKey.get(), rowKey.offset(), rowKey.length(), // - null, 0, 0, // - null, 0, 0, // - HConstants.LATEST_TIMESTAMP, Type.Put, // - null, 0, 0); - results.add(keyValue); - } else { - for (int i = 0; i < nHCols; i++) { - Cell keyValue = new KeyValue(rowKey.get(), rowKey.offset(), rowKey.length(), // - families[i], 0, families[i].length, // - qualifiers[i], 0, qualifiers[i].length, // - HConstants.LATEST_TIMESTAMP, Type.Put, // - rowValues[i].array(), 0, rowValues[i].position()); - results.add(keyValue); - } - } - } - - @Override - public boolean next(List<Cell> result, int limit) throws IOException { - return next(result); - } - - @Override - public boolean nextRaw(List<Cell> result) throws IOException { - return next(result); - } - - @Override - public boolean nextRaw(List<Cell> result, int limit) throws IOException { - return next(result); - } - - @Override - public void close() throws IOException { - // AggregateRegionObserver.LOG.info("Kylin Scanner close()"); - innerScanner.close(); - // AggregateRegionObserver.LOG.info("Kylin Scanner close() done"); - } - - @Override - public HRegionInfo getRegionInfo() { - // AggregateRegionObserver.LOG.info("Kylin Scanner getRegionInfo()"); - return innerScanner.getRegionInfo(); - } - - @Override - public long getMaxResultSize() { - // AggregateRegionObserver.LOG.info("Kylin Scanner getMaxResultSize()"); - return Long.MAX_VALUE; - } - - @Override - public boolean isFilterDone() throws IOException { - // AggregateRegionObserver.LOG.info("Kylin Scanner isFilterDone()"); - return false; - } - - @Override - public boolean reseek(byte[] row) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public long getMvccReadPoint() { - // AggregateRegionObserver.LOG.info("Kylin Scanner getMvccReadPoint()"); - return Long.MAX_VALUE; - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/545201f6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java deleted file mode 100644 index 29a30c1..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java +++ /dev/null @@ -1,265 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer; - -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Collection; -import java.util.Comparator; -import java.util.List; - -import org.apache.hadoop.hbase.Cell; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.BytesSerializer; -import org.apache.kylin.common.util.BytesUtil; -import org.apache.kylin.cube.model.HBaseColumnDesc; -import org.apache.kylin.measure.BufferedMeasureCodec; -import org.apache.kylin.measure.MeasureAggregator; -import org.apache.kylin.measure.MeasureType; -import org.apache.kylin.measure.MeasureTypeFactory; -import org.apache.kylin.metadata.datatype.DataType; -import org.apache.kylin.metadata.model.MeasureDesc; -import org.apache.kylin.storage.hbase.steps.RowValueDecoder; - -/** - * @author yangli9 - */ -@SuppressWarnings({ "rawtypes", "unchecked" }) -public class ObserverAggregators { - - public static ObserverAggregators fromValueDecoders(Collection<RowValueDecoder> rowValueDecoders) { - - // each decoder represents one HBase column - HCol[] hcols = new HCol[rowValueDecoders.size()]; - int i = 0; - for (RowValueDecoder rowValueDecoder : rowValueDecoders) { - hcols[i++] = buildHCol(rowValueDecoder.getHBaseColumn()); - } - - ObserverAggregators aggrs = new ObserverAggregators(hcols); - return aggrs; - - } - - private static HCol buildHCol(HBaseColumnDesc desc) { - byte[] family = Bytes.toBytes(desc.getColumnFamilyName()); - byte[] qualifier = Bytes.toBytes(desc.getQualifier()); - MeasureDesc[] measures = desc.getMeasures(); - - String[] funcNames = new String[measures.length]; - String[] dataTypes = new String[measures.length]; - - for (int i = 0; i < measures.length; i++) { - funcNames[i] = measures[i].getFunction().getExpression(); - dataTypes[i] = measures[i].getFunction().getReturnType(); - } - - return new HCol(family, qualifier, funcNames, dataTypes); - } - - public static byte[] serialize(ObserverAggregators o) { - ByteBuffer buf = ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE); - serializer.serialize(o, buf); - byte[] result = new byte[buf.position()]; - System.arraycopy(buf.array(), 0, result, 0, buf.position()); - return result; - } - - public static ObserverAggregators deserialize(byte[] bytes) { - return serializer.deserialize(ByteBuffer.wrap(bytes)); - } - - private static final BytesSerializer<ObserverAggregators> serializer = new BytesSerializer<ObserverAggregators>() { - - @Override - public void serialize(ObserverAggregators value, ByteBuffer out) { - BytesUtil.writeVInt(value.nHCols, out); - for (int i = 0; i < value.nHCols; i++) { - HCol col = value.hcols[i]; - BytesUtil.writeByteArray(col.family, out); - BytesUtil.writeByteArray(col.qualifier, out); - BytesUtil.writeAsciiStringArray(col.funcNames, out); - BytesUtil.writeAsciiStringArray(col.dataTypes, out); - } - } - - @Override - public ObserverAggregators deserialize(ByteBuffer in) { - int nHCols = BytesUtil.readVInt(in); - HCol[] hcols = new HCol[nHCols]; - for (int i = 0; i < nHCols; i++) { - byte[] family = BytesUtil.readByteArray(in); - byte[] qualifier = BytesUtil.readByteArray(in); - String[] funcNames = BytesUtil.readAsciiStringArray(in); - String[] dataTypes = BytesUtil.readAsciiStringArray(in); - hcols[i] = new HCol(family, qualifier, funcNames, dataTypes); - } - return new ObserverAggregators(hcols); - } - - }; - - // ============================================================================ - - final HCol[] hcols; - final int nHCols; - final ByteBuffer[] hColValues; - final int nTotalMeasures; - - MeasureType[] measureTypes; - - public ObserverAggregators(HCol[] _hcols) { - this.hcols = sort(_hcols); - this.nHCols = hcols.length; - this.hColValues = new ByteBuffer[nHCols]; - - int nTotalMeasures = 0; - for (HCol col : hcols) - nTotalMeasures += col.nMeasures; - this.nTotalMeasures = nTotalMeasures; - } - - private HCol[] sort(HCol[] hcols) { - HCol[] copy = Arrays.copyOf(hcols, hcols.length); - Arrays.sort(copy, new Comparator<HCol>() { - @Override - public int compare(HCol o1, HCol o2) { - int comp = Bytes.compareTo(o1.family, o2.family); - if (comp != 0) - return comp; - comp = Bytes.compareTo(o1.qualifier, o2.qualifier); - return comp; - } - }); - return copy; - } - - public MeasureAggregator[] createBuffer() { - if (measureTypes == null) { - measureTypes = new MeasureType[nTotalMeasures]; - int i = 0; - for (HCol col : hcols) { - for (int j = 0; j < col.nMeasures; j++) - measureTypes[i++] = MeasureTypeFactory.create(col.funcNames[j], DataType.getType(col.dataTypes[j])); - } - } - - MeasureAggregator[] aggrs = new MeasureAggregator[nTotalMeasures]; - for (int i = 0; i < nTotalMeasures; i++) { - aggrs[i] = measureTypes[i].newAggregator(); - } - return aggrs; - } - - public void aggregate(MeasureAggregator[] measureAggrs, List<Cell> rowCells) { - int i = 0; - for (int ci = 0; ci < nHCols; ci++) { - HCol col = hcols[ci]; - Cell cell = findCell(col, rowCells); - - if (cell == null) { - i += col.nMeasures; - continue; - } - - ByteBuffer input = ByteBuffer.wrap(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); - - col.measureCodec.decode(input, col.measureValues); - for (int j = 0; j < col.nMeasures; j++) - measureAggrs[i++].aggregate(col.measureValues[j]); - } - } - - private Cell findCell(HCol col, List<Cell> cells) { - // cells are ordered by timestamp asc, thus search from back, first hit - // is the latest version - for (int i = cells.size() - 1; i >= 0; i--) { - Cell cell = cells.get(i); - if (match(col, cell)) { - return cell; - } - } - return null; - } - - public static boolean match(HCol col, Cell cell) { - return Bytes.compareTo(col.family, 0, col.family.length, cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) == 0 && Bytes.compareTo(col.qualifier, 0, col.qualifier.length, cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) == 0; - } - - public int getHColsNum() { - return nHCols; - } - - public byte[][] getHColFamilies() { - byte[][] result = new byte[nHCols][]; - for (int i = 0; i < nHCols; i++) - result[i] = hcols[i].family; - return result; - } - - public byte[][] getHColQualifiers() { - byte[][] result = new byte[nHCols][]; - for (int i = 0; i < nHCols; i++) - result[i] = hcols[i].qualifier; - return result; - } - - public ByteBuffer[] getHColValues(MeasureAggregator[] aggrs) { - int i = 0; - for (int ci = 0; ci < nHCols; ci++) { - HCol col = hcols[ci]; - for (int j = 0; j < col.nMeasures; j++) - col.measureValues[j] = aggrs[i++].getState(); - - hColValues[ci] = col.measureCodec.encode(col.measureValues); - } - return hColValues; - } - - // ============================================================================ - - public static class HCol { - final byte[] family; - final byte[] qualifier; - final String[] funcNames; - final String[] dataTypes; - final int nMeasures; - - final BufferedMeasureCodec measureCodec; - final Object[] measureValues; - - public HCol(byte[] bFamily, byte[] bQualifier, String[] funcNames, String[] dataTypes) { - this.family = bFamily; - this.qualifier = bQualifier; - this.funcNames = funcNames; - this.dataTypes = dataTypes; - this.nMeasures = funcNames.length; - assert funcNames.length == dataTypes.length; - - this.measureCodec = new BufferedMeasureCodec(dataTypes); - this.measureValues = new Object[nMeasures]; - } - - @Override - public String toString() { - return "HCol [bFamily=" + Bytes.toString(family) + ", bQualifier=" + Bytes.toString(qualifier) + ", nMeasures=" + nMeasures + "]"; - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/545201f6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java deleted file mode 100644 index 394b3e2..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer; - -import java.io.IOException; -import java.util.Collection; -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.regionserver.RegionScanner; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.debug.BackdoorToggles; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.metadata.filter.TupleFilter; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.storage.StorageContext; -import org.apache.kylin.gridtable.StorageSideBehavior; -import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter; -import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector; -import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType; -import org.apache.kylin.storage.hbase.common.coprocessor.FilterDecorator; -import org.apache.kylin.storage.hbase.cube.v1.RegionScannerAdapter; -import org.apache.kylin.storage.hbase.cube.v1.ResultScannerAdapter; -import org.apache.kylin.storage.hbase.steps.RowValueDecoder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -/** - * @author yangli9 - */ -public class ObserverEnabler { - - private static final Logger logger = LoggerFactory.getLogger(ObserverEnabler.class); - - static final String FORCE_COPROCESSOR = "forceObserver"; - static final Map<String, Boolean> CUBE_OVERRIDES = Maps.newConcurrentMap(); - - public static ResultScanner scanWithCoprocessorIfBeneficial(CubeSegment segment, Cuboid cuboid, TupleFilter tupleFiler, // - Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context, HTableInterface table, Scan scan) throws IOException { - - if (context.isCoprocessorEnabled() == false) { - return table.getScanner(scan); - } - - CoprocessorRowType type = CoprocessorRowType.fromCuboid(segment, cuboid); - CoprocessorFilter filter = CoprocessorFilter.fromFilter(segment.getDimensionEncodingMap(), tupleFiler, FilterDecorator.FilterConstantsTreatment.REPLACE_WITH_GLOBAL_DICT); - CoprocessorProjector projector = CoprocessorProjector.makeForObserver(segment, cuboid, groupBy); - ObserverAggregators aggrs = ObserverAggregators.fromValueDecoders(rowValueDecoders); - - boolean localCoprocessor = KylinConfig.getInstanceFromEnv().getQueryRunLocalCoprocessor() || BackdoorToggles.getRunLocalCoprocessor(); - - if (localCoprocessor) { - RegionScanner innerScanner = new RegionScannerAdapter(table.getScanner(scan)); - AggregationScanner aggrScanner = new AggregationScanner(type, filter, projector, aggrs, innerScanner, StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM); - return new ResultScannerAdapter(aggrScanner); - } else { - - // debug/profiling purpose - String toggle = BackdoorToggles.getCoprocessorBehavior(); - if (toggle == null) { - toggle = StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString(); //default behavior - } else { - logger.info("The execution of this query will use " + toggle + " as observer's behavior"); - } - - scan.setAttribute(AggregateRegionObserver.COPROCESSOR_ENABLE, new byte[] { 0x01 }); - scan.setAttribute(AggregateRegionObserver.BEHAVIOR, toggle.getBytes()); - scan.setAttribute(AggregateRegionObserver.TYPE, CoprocessorRowType.serialize(type)); - scan.setAttribute(AggregateRegionObserver.PROJECTOR, CoprocessorProjector.serialize(projector)); - scan.setAttribute(AggregateRegionObserver.AGGREGATORS, ObserverAggregators.serialize(aggrs)); - scan.setAttribute(AggregateRegionObserver.FILTER, CoprocessorFilter.serialize(filter)); - return table.getScanner(scan); - } - } - - public static void enableCoprocessorIfBeneficial(CubeInstance cube, Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context) { - if (isCoprocessorBeneficial(cube, groupBy, rowValueDecoders, context)) { - context.enableCoprocessor(); - } - } - - private static boolean isCoprocessorBeneficial(CubeInstance cube, Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context) { - - String forceFlag = System.getProperty(FORCE_COPROCESSOR); - if (forceFlag != null) { - boolean r = Boolean.parseBoolean(forceFlag); - logger.info("Coprocessor is " + (r ? "enabled" : "disabled") + " according to sys prop " + FORCE_COPROCESSOR); - return r; - } - - Boolean cubeOverride = CUBE_OVERRIDES.get(cube.getName()); - if (cubeOverride != null) { - boolean r = cubeOverride.booleanValue(); - logger.info("Coprocessor is " + (r ? "enabled" : "disabled") + " according to cube overrides"); - return r; - } - - if (RowValueDecoder.hasMemHungryMeasures(rowValueDecoders)) { - logger.info("Coprocessor is disabled because there is memory hungry count distinct"); - return false; - } - - if (context.isExactAggregation()) { - logger.info("Coprocessor is disabled because exactAggregation is true"); - return false; - } - - Cuboid cuboid = context.getCuboid(); - Set<TblColRef> toAggr = Sets.newHashSet(cuboid.getAggregationColumns()); - toAggr.removeAll(groupBy); - if (toAggr.isEmpty()) { - logger.info("Coprocessor is disabled because no additional columns to aggregate"); - return false; - } - - logger.info("Coprocessor is enabled to aggregate " + toAggr + ", returning " + groupBy); - return true; - } - - @SuppressWarnings("unused") - private static int getBitsToScan(byte[] startKey, byte[] stopKey) { - // find the first bit difference from the beginning - int totalBits = startKey.length * 8; - int bitsToScan = totalBits; - for (int i = 0; i < totalBits; i++) { - int byteIdx = i / 8; - int bitIdx = 7 - i % 8; - byte bitMask = (byte) (1 << bitIdx); - if ((startKey[byteIdx] & bitMask) == (stopKey[byteIdx] & bitMask)) - bitsToScan--; - else - break; - } - return bitsToScan; - } - - public static void forceCoprocessorOn() { - System.setProperty(FORCE_COPROCESSOR, "true"); - } - - public static void forceCoprocessorOff() { - System.setProperty(FORCE_COPROCESSOR, "false"); - } - - public static String getForceCoprocessor() { - return System.getProperty(FORCE_COPROCESSOR); - } - - public static void forceCoprocessorUnset() { - System.clearProperty(FORCE_COPROCESSOR); - } - - public static void updateCubeOverride(String cubeName, String force) { - if ("null".equalsIgnoreCase(force) || "default".equalsIgnoreCase(force)) { - CUBE_OVERRIDES.remove(cubeName); - } else if ("true".equalsIgnoreCase(force)) { - CUBE_OVERRIDES.put(cubeName, Boolean.TRUE); - } else if ("false".equalsIgnoreCase(force)) { - CUBE_OVERRIDES.put(cubeName, Boolean.FALSE); - } - } - - public static Map<String, Boolean> getCubeOverrides() { - return CUBE_OVERRIDES; - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/545201f6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverTuple.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverTuple.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverTuple.java deleted file mode 100644 index e2236d3..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverTuple.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer; - -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.kylin.common.util.Dictionary; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.metadata.tuple.IEvaluatableTuple; -import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType; - -/** - * A special kind of tuple that exposes column value (dictionary ID) directly on - * top of row key. - * - * @author yangli9 - */ -public class ObserverTuple implements IEvaluatableTuple { - - final CoprocessorRowType type; - - ImmutableBytesWritable rowkey; - String[] values; - - public ObserverTuple(CoprocessorRowType type) { - this.type = type; - this.rowkey = new ImmutableBytesWritable(); - this.values = new String[type.getColumnCount()]; - } - - public void setUnderlying(byte[] array, int offset, int length) { - rowkey.set(array, offset, length); - for (int i = 0; i < values.length; i++) { - values[i] = null; - } - } - - private String getValueAt(int i) { - int n = type.getColumnCount(); - if (i < 0 || i >= n) - return null; - - if (values[i] == null) { - values[i] = Dictionary.dictIdToString(rowkey.get(), rowkey.getOffset() + type.columnOffsets[i], type.columnSizes[i]); - } - - return values[i]; - } - - @Override - public Object getValue(TblColRef col) { - int i = type.getColIndexByTblColRef(col); - return getValueAt(i); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/545201f6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/FuzzyRowFilterV2.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/FuzzyRowFilterV2.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/FuzzyRowFilterV2.java deleted file mode 100644 index ea33f9a..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/FuzzyRowFilterV2.java +++ /dev/null @@ -1,574 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.storage.hbase.cube.v1.filter; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Comparator; -import java.util.List; -import java.util.PriorityQueue; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.filter.FilterBase; -import org.apache.hadoop.hbase.util.ByteStringer; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.kylin.storage.hbase.cube.v1.filter.generated.FilterProtosExt; - -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.InvalidProtocolBufferException; - -/** - * This is optimized version of a standard FuzzyRowFilter Filters data based on fuzzy row key. - * Performs fast-forwards during scanning. It takes pairs (row key, fuzzy info) to match row keys. - * Where fuzzy info is a byte array with 0 or 1 as its values: - * <ul> - * <li>0 - means that this byte in provided row key is fixed, i.e. row key's byte at same position - * must match</li> - * <li>1 - means that this byte in provided row key is NOT fixed, i.e. row key's byte at this - * position can be different from the one in provided row key</li> - * </ul> - * Example: Let's assume row key format is userId_actionId_year_month. Length of userId is fixed and - * is 4, length of actionId is 2 and year and month are 4 and 2 bytes long respectively. Let's - * assume that we need to fetch all users that performed certain action (encoded as "99") in Jan of - * any year. Then the pair (row key, fuzzy info) would be the following: row key = "????_99_????_01" - * (one can use any value instead of "?") fuzzy info = - * "\x01\x01\x01\x01\x00\x00\x00\x00\x01\x01\x01\x01\x00\x00\x00" I.e. fuzzy info tells the matching - * mask is "????_99_????_01", where at ? can be any value. - */ [email protected] [email protected] -public class FuzzyRowFilterV2 extends FilterBase { - private List<Pair<byte[], byte[]>> fuzzyKeysData; - private boolean done = false; - - /** - * The index of a last successfully found matching fuzzy string (in fuzzyKeysData). We will start - * matching next KV with this one. If they do not match then we will return back to the one-by-one - * iteration over fuzzyKeysData. - */ - private int lastFoundIndex = -1; - - /** - * Row tracker (keeps all next rows after SEEK_NEXT_USING_HINT was returned) - */ - private RowTracker tracker; - - public FuzzyRowFilterV2(List<Pair<byte[], byte[]>> fuzzyKeysData) { - Pair<byte[], byte[]> p; - for (int i = 0; i < fuzzyKeysData.size(); i++) { - p = fuzzyKeysData.get(i); - if (p.getFirst().length != p.getSecond().length) { - Pair<String, String> readable = Pair.newPair(Bytes.toStringBinary(p.getFirst()), Bytes.toStringBinary(p.getSecond())); - throw new IllegalArgumentException("Fuzzy pair lengths do not match: " + readable); - } - // update mask ( 0 -> -1 (0xff), 1 -> 0) - p.setSecond(preprocessMask(p.getSecond())); - preprocessSearchKey(p); - } - this.fuzzyKeysData = fuzzyKeysData; - this.tracker = new RowTracker(); - } - - private void preprocessSearchKey(Pair<byte[], byte[]> p) { - if (UnsafeAccess.isAvailable() == false) { - // do nothing - return; - } - byte[] key = p.getFirst(); - byte[] mask = p.getSecond(); - for (int i = 0; i < mask.length; i++) { - // set non-fixed part of a search key to 0. - if (mask[i] == 0) - key[i] = 0; - } - } - - /** - * We need to preprocess mask array, as since we treat 0's as unfixed positions and -1 (0xff) as - * fixed positions - * @param mask - * @return mask array - */ - private byte[] preprocessMask(byte[] mask) { - if (UnsafeAccess.isAvailable() == false) { - // do nothing - return mask; - } - if (isPreprocessedMask(mask)) - return mask; - for (int i = 0; i < mask.length; i++) { - if (mask[i] == 0) { - mask[i] = -1; // 0 -> -1 - } else if (mask[i] == 1) { - mask[i] = 0;// 1 -> 0 - } - } - return mask; - } - - private boolean isPreprocessedMask(byte[] mask) { - for (int i = 0; i < mask.length; i++) { - if (mask[i] != -1 && mask[i] != 0) { - return false; - } - } - return true; - } - - @Override - public ReturnCode filterKeyValue(Cell c) { - final int startIndex = lastFoundIndex >= 0 ? lastFoundIndex : 0; - final int size = fuzzyKeysData.size(); - for (int i = startIndex; i < size + startIndex; i++) { - final int index = i % size; - Pair<byte[], byte[]> fuzzyData = fuzzyKeysData.get(index); - SatisfiesCode satisfiesCode = satisfies(isReversed(), c.getRowArray(), c.getRowOffset(), c.getRowLength(), fuzzyData.getFirst(), fuzzyData.getSecond()); - if (satisfiesCode == SatisfiesCode.YES) { - lastFoundIndex = index; - return ReturnCode.INCLUDE; - } - } - // NOT FOUND -> seek next using hint - lastFoundIndex = -1; - return ReturnCode.SEEK_NEXT_USING_HINT; - - } - - @Override - public Cell getNextCellHint(Cell currentCell) { - boolean result = tracker.updateTracker(currentCell); - if (result == false) { - done = true; - return null; - } - byte[] nextRowKey = tracker.nextRow(); - return KeyValue.createFirstOnRow(nextRowKey); - } - - /** - * If we have multiple fuzzy keys, row tracker should improve overall performance. It calculates - * all next rows (one per every fuzzy key) and put them (the fuzzy key is bundled) into a priority - * queue so that the smallest row key always appears at queue head, which helps to decide the - * "Next Cell Hint". As scanning going on, the number of candidate rows in the RowTracker will - * remain the size of fuzzy keys until some of the fuzzy keys won't possibly have matches any - * more. - */ - private class RowTracker { - private final PriorityQueue<Pair<byte[], Pair<byte[], byte[]>>> nextRows; - private boolean initialized = false; - - RowTracker() { - nextRows = new PriorityQueue<Pair<byte[], Pair<byte[], byte[]>>>(fuzzyKeysData.size(), new Comparator<Pair<byte[], Pair<byte[], byte[]>>>() { - @Override - public int compare(Pair<byte[], Pair<byte[], byte[]>> o1, Pair<byte[], Pair<byte[], byte[]>> o2) { - int compare = Bytes.compareTo(o1.getFirst(), o2.getFirst()); - if (!isReversed()) { - return compare; - } else { - return -compare; - } - } - }); - } - - byte[] nextRow() { - if (nextRows.isEmpty()) { - throw new IllegalStateException("NextRows should not be empty, make sure to call nextRow() after updateTracker() return true"); - } else { - return nextRows.peek().getFirst(); - } - } - - boolean updateTracker(Cell currentCell) { - if (!initialized) { - for (Pair<byte[], byte[]> fuzzyData : fuzzyKeysData) { - updateWith(currentCell, fuzzyData); - } - initialized = true; - } else { - while (!nextRows.isEmpty() && !lessThan(currentCell, nextRows.peek().getFirst())) { - Pair<byte[], Pair<byte[], byte[]>> head = nextRows.poll(); - Pair<byte[], byte[]> fuzzyData = head.getSecond(); - updateWith(currentCell, fuzzyData); - } - } - return !nextRows.isEmpty(); - } - - boolean lessThan(Cell currentCell, byte[] nextRowKey) { - int compareResult = Bytes.compareTo(currentCell.getRowArray(), currentCell.getRowOffset(), currentCell.getRowLength(), nextRowKey, 0, nextRowKey.length); - return (!isReversed() && compareResult < 0) || (isReversed() && compareResult > 0); - } - - void updateWith(Cell currentCell, Pair<byte[], byte[]> fuzzyData) { - byte[] nextRowKeyCandidate = getNextForFuzzyRule(isReversed(), currentCell.getRowArray(), currentCell.getRowOffset(), currentCell.getRowLength(), fuzzyData.getFirst(), fuzzyData.getSecond()); - if (nextRowKeyCandidate != null) { - nextRows.add(Pair.newPair(nextRowKeyCandidate, fuzzyData)); - } - } - - } - - @Override - public boolean filterAllRemaining() { - return done; - } - - /** - * @return The filter serialized using pb - */ - public byte[] toByteArray() { - FilterProtosExt.FuzzyRowFilterV2.Builder builder = FilterProtosExt.FuzzyRowFilterV2.newBuilder(); - for (Pair<byte[], byte[]> fuzzyData : fuzzyKeysData) { - FilterProtosExt.BytesBytesPair.Builder bbpBuilder = FilterProtosExt.BytesBytesPair.newBuilder(); - bbpBuilder.setFirst(ByteStringer.wrap(fuzzyData.getFirst())); - bbpBuilder.setSecond(ByteStringer.wrap(fuzzyData.getSecond())); - builder.addFuzzyKeysData(bbpBuilder); - } - return builder.build().toByteArray(); - } - - public static FuzzyRowFilterV2 parseFrom(final byte[] pbBytes) throws DeserializationException { - FilterProtosExt.FuzzyRowFilterV2 proto; - try { - proto = FilterProtosExt.FuzzyRowFilterV2.parseFrom(pbBytes); - } catch (InvalidProtocolBufferException e) { - throw new DeserializationException(e); - } - int count = proto.getFuzzyKeysDataCount(); - ArrayList<Pair<byte[], byte[]>> fuzzyKeysData = new ArrayList<Pair<byte[], byte[]>>(count); - for (int i = 0; i < count; ++i) { - FilterProtosExt.BytesBytesPair current = proto.getFuzzyKeysData(i); - byte[] keyBytes = current.getFirst().toByteArray(); - byte[] keyMeta = current.getSecond().toByteArray(); - fuzzyKeysData.add(Pair.newPair(keyBytes, keyMeta)); - } - return new FuzzyRowFilterV2(fuzzyKeysData); - } - - @Override - public String toString() { - final StringBuilder sb = new StringBuilder(); - sb.append("FuzzyRowFilter"); - sb.append("{fuzzyKeysData="); - for (Pair<byte[], byte[]> fuzzyData : fuzzyKeysData) { - sb.append('{').append(Bytes.toStringBinary(fuzzyData.getFirst())).append(":"); - sb.append(Bytes.toStringBinary(fuzzyData.getSecond())).append('}'); - } - sb.append("}, "); - return sb.toString(); - } - - // Utility methods - - static enum SatisfiesCode { - /** row satisfies fuzzy rule */ - YES, - /** row doesn't satisfy fuzzy rule, but there's possible greater row that does */ - NEXT_EXISTS, - /** row doesn't satisfy fuzzy rule and there's no greater row that does */ - NO_NEXT - } - - @VisibleForTesting - static SatisfiesCode satisfies(boolean reverse, byte[] row, byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) { - return satisfies(reverse, row, 0, row.length, fuzzyKeyBytes, fuzzyKeyMeta); - } - - static SatisfiesCode satisfies(boolean reverse, byte[] row, int offset, int length, byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) { - - if (UnsafeAccess.isAvailable() == false) { - return satisfiesNoUnsafe(reverse, row, offset, length, fuzzyKeyBytes, fuzzyKeyMeta); - } - - if (row == null) { - // do nothing, let scan to proceed - return SatisfiesCode.YES; - } - length = Math.min(length, fuzzyKeyBytes.length); - int numWords = length / Bytes.SIZEOF_LONG; - - int j = numWords << 3; // numWords * SIZEOF_LONG; - - for (int i = 0; i < j; i += Bytes.SIZEOF_LONG) { - long fuzzyBytes = UnsafeAccess.toLong(fuzzyKeyBytes, i); - long fuzzyMeta = UnsafeAccess.toLong(fuzzyKeyMeta, i); - long rowValue = UnsafeAccess.toLong(row, offset + i); - if ((rowValue & fuzzyMeta) != (fuzzyBytes)) { - // We always return NEXT_EXISTS - return SatisfiesCode.NEXT_EXISTS; - } - } - - int off = j; - - if (length - off >= Bytes.SIZEOF_INT) { - int fuzzyBytes = UnsafeAccess.toInt(fuzzyKeyBytes, off); - int fuzzyMeta = UnsafeAccess.toInt(fuzzyKeyMeta, off); - int rowValue = UnsafeAccess.toInt(row, offset + off); - if ((rowValue & fuzzyMeta) != (fuzzyBytes)) { - // We always return NEXT_EXISTS - return SatisfiesCode.NEXT_EXISTS; - } - off += Bytes.SIZEOF_INT; - } - - if (length - off >= Bytes.SIZEOF_SHORT) { - short fuzzyBytes = UnsafeAccess.toShort(fuzzyKeyBytes, off); - short fuzzyMeta = UnsafeAccess.toShort(fuzzyKeyMeta, off); - short rowValue = UnsafeAccess.toShort(row, offset + off); - if ((rowValue & fuzzyMeta) != (fuzzyBytes)) { - // We always return NEXT_EXISTS - // even if it does not (in this case getNextForFuzzyRule - // will return null) - return SatisfiesCode.NEXT_EXISTS; - } - off += Bytes.SIZEOF_SHORT; - } - - if (length - off >= Bytes.SIZEOF_BYTE) { - int fuzzyBytes = fuzzyKeyBytes[off] & 0xff; - int fuzzyMeta = fuzzyKeyMeta[off] & 0xff; - int rowValue = row[offset + off] & 0xff; - if ((rowValue & fuzzyMeta) != (fuzzyBytes)) { - // We always return NEXT_EXISTS - return SatisfiesCode.NEXT_EXISTS; - } - } - return SatisfiesCode.YES; - } - - static SatisfiesCode satisfiesNoUnsafe(boolean reverse, byte[] row, int offset, int length, byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) { - if (row == null) { - // do nothing, let scan to proceed - return SatisfiesCode.YES; - } - - Order order = Order.orderFor(reverse); - boolean nextRowKeyCandidateExists = false; - - for (int i = 0; i < fuzzyKeyMeta.length && i < length; i++) { - // First, checking if this position is fixed and not equals the given one - boolean byteAtPositionFixed = fuzzyKeyMeta[i] == 0; - boolean fixedByteIncorrect = byteAtPositionFixed && fuzzyKeyBytes[i] != row[i + offset]; - if (fixedByteIncorrect) { - // in this case there's another row that satisfies fuzzy rule and bigger than this row - if (nextRowKeyCandidateExists) { - return SatisfiesCode.NEXT_EXISTS; - } - - // If this row byte is less than fixed then there's a byte array bigger than - // this row and which satisfies the fuzzy rule. Otherwise there's no such byte array: - // this row is simply bigger than any byte array that satisfies the fuzzy rule - boolean rowByteLessThanFixed = (row[i + offset] & 0xFF) < (fuzzyKeyBytes[i] & 0xFF); - if (rowByteLessThanFixed && !reverse) { - return SatisfiesCode.NEXT_EXISTS; - } else if (!rowByteLessThanFixed && reverse) { - return SatisfiesCode.NEXT_EXISTS; - } else { - return SatisfiesCode.NO_NEXT; - } - } - - // Second, checking if this position is not fixed and byte value is not the biggest. In this - // case there's a byte array bigger than this row and which satisfies the fuzzy rule. To get - // bigger byte array that satisfies the rule we need to just increase this byte - // (see the code of getNextForFuzzyRule below) by one. - // Note: if non-fixed byte is already at biggest value, this doesn't allow us to say there's - // bigger one that satisfies the rule as it can't be increased. - if (fuzzyKeyMeta[i] == 1 && !order.isMax(fuzzyKeyBytes[i])) { - nextRowKeyCandidateExists = true; - } - } - return SatisfiesCode.YES; - } - - @VisibleForTesting - static byte[] getNextForFuzzyRule(byte[] row, byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) { - return getNextForFuzzyRule(false, row, 0, row.length, fuzzyKeyBytes, fuzzyKeyMeta); - } - - @VisibleForTesting - static byte[] getNextForFuzzyRule(boolean reverse, byte[] row, byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) { - return getNextForFuzzyRule(reverse, row, 0, row.length, fuzzyKeyBytes, fuzzyKeyMeta); - } - - /** Abstracts directional comparisons based on scan direction. */ - private enum Order { - ASC { - public boolean lt(int lhs, int rhs) { - return lhs < rhs; - } - - public boolean gt(int lhs, int rhs) { - return lhs > rhs; - } - - public byte inc(byte val) { - // TODO: what about over/underflow? - return (byte) (val + 1); - } - - public boolean isMax(byte val) { - return val == (byte) 0xff; - } - - public byte min() { - return 0; - } - }, - DESC { - public boolean lt(int lhs, int rhs) { - return lhs > rhs; - } - - public boolean gt(int lhs, int rhs) { - return lhs < rhs; - } - - public byte inc(byte val) { - // TODO: what about over/underflow? - return (byte) (val - 1); - } - - public boolean isMax(byte val) { - return val == 0; - } - - public byte min() { - return (byte) 0xFF; - } - }; - - public static Order orderFor(boolean reverse) { - return reverse ? DESC : ASC; - } - - /** Returns true when {@code lhs < rhs}. */ - public abstract boolean lt(int lhs, int rhs); - - /** Returns true when {@code lhs > rhs}. */ - public abstract boolean gt(int lhs, int rhs); - - /** Returns {@code val} incremented by 1. */ - public abstract byte inc(byte val); - - /** Return true when {@code val} is the maximum value */ - public abstract boolean isMax(byte val); - - /** Return the minimum value according to this ordering scheme. */ - public abstract byte min(); - } - - /** - * @return greater byte array than given (row) which satisfies the fuzzy rule if it exists, null - * otherwise - */ - @VisibleForTesting - static byte[] getNextForFuzzyRule(boolean reverse, byte[] row, int offset, int length, byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) { - // To find out the next "smallest" byte array that satisfies fuzzy rule and "greater" than - // the given one we do the following: - // 1. setting values on all "fixed" positions to the values from fuzzyKeyBytes - // 2. if during the first step given row did not increase, then we increase the value at - // the first "non-fixed" position (where it is not maximum already) - - // It is easier to perform this by using fuzzyKeyBytes copy and setting "non-fixed" position - // values than otherwise. - byte[] result = Arrays.copyOf(fuzzyKeyBytes, length > fuzzyKeyBytes.length ? length : fuzzyKeyBytes.length); - if (reverse && length > fuzzyKeyBytes.length) { - // we need trailing 0xff's instead of trailing 0x00's - for (int i = fuzzyKeyBytes.length; i < result.length; i++) { - result[i] = (byte) 0xFF; - } - } - int toInc = -1; - final Order order = Order.orderFor(reverse); - - boolean increased = false; - for (int i = 0; i < result.length; i++) { - if (i >= fuzzyKeyMeta.length || fuzzyKeyMeta[i] == 0 /* non-fixed */) { - result[i] = row[offset + i]; - if (!order.isMax(row[offset + i])) { - // this is "non-fixed" position and is not at max value, hence we can increase it - toInc = i; - } - } else if (i < fuzzyKeyMeta.length && fuzzyKeyMeta[i] == -1 /* fixed */) { - if (order.lt((row[i + offset] & 0xFF), (fuzzyKeyBytes[i] & 0xFF))) { - // if setting value for any fixed position increased the original array, - // we are OK - increased = true; - break; - } - - if (order.gt((row[i + offset] & 0xFF), (fuzzyKeyBytes[i] & 0xFF))) { - // if setting value for any fixed position makes array "smaller", then just stop: - // in case we found some non-fixed position to increase we will do it, otherwise - // there's no "next" row key that satisfies fuzzy rule and "greater" than given row - break; - } - } - } - - if (!increased) { - if (toInc < 0) { - return null; - } - result[toInc] = order.inc(result[toInc]); - - // Setting all "non-fixed" positions to zeroes to the right of the one we increased so - // that found "next" row key is the smallest possible - for (int i = toInc + 1; i < result.length; i++) { - if (i >= fuzzyKeyMeta.length || fuzzyKeyMeta[i] == 0 /* non-fixed */) { - result[i] = order.min(); - } - } - } - - return result; - } - - /** - * @return true if and only if the fields of the filter that are serialized are equal to the - * corresponding fields in other. Used for testing. - */ - boolean areSerializedFieldsEqual(Filter o) { - if (o == this) - return true; - if (!(o instanceof FuzzyRowFilterV2)) - return false; - - FuzzyRowFilterV2 other = (FuzzyRowFilterV2) o; - if (this.fuzzyKeysData.size() != other.fuzzyKeysData.size()) - return false; - for (int i = 0; i < fuzzyKeysData.size(); ++i) { - Pair<byte[], byte[]> thisData = this.fuzzyKeysData.get(i); - Pair<byte[], byte[]> otherData = other.fuzzyKeysData.get(i); - if (!(Bytes.equals(thisData.getFirst(), otherData.getFirst()) && Bytes.equals(thisData.getSecond(), otherData.getSecond()))) { - return false; - } - } - return true; - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/545201f6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/UnsafeAccess.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/UnsafeAccess.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/UnsafeAccess.java deleted file mode 100644 index 34328ef..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/UnsafeAccess.java +++ /dev/null @@ -1,433 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.storage.hbase.cube.v1.filter; - -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.lang.reflect.Field; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.security.AccessController; -import java.security.PrivilegedAction; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hbase.util.Bytes; - -import sun.misc.Unsafe; -import sun.nio.ch.DirectBuffer; - [email protected] [email protected] -public final class UnsafeAccess { - - private static final Log LOG = LogFactory.getLog(UnsafeAccess.class); - - static final Unsafe theUnsafe; - - /** The offset to the first element in a byte array. */ - static final long BYTE_ARRAY_BASE_OFFSET; - - static final boolean littleEndian = ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN); - - static { - theUnsafe = (Unsafe) AccessController.doPrivileged(new PrivilegedAction<Object>() { - @Override - public Object run() { - try { - Field f = Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - return f.get(null); - } catch (Throwable e) { - LOG.warn("sun.misc.Unsafe is not accessible", e); - } - return null; - } - }); - - if (theUnsafe != null) { - BYTE_ARRAY_BASE_OFFSET = theUnsafe.arrayBaseOffset(byte[].class); - } else { - BYTE_ARRAY_BASE_OFFSET = -1; - } - } - - private UnsafeAccess() { - } - - /** - * @return true when the running JVM is having sun's Unsafe package available in it. - */ - public static boolean isAvailable() { - return theUnsafe != null; - } - - // APIs to read primitive data from a byte[] using Unsafe way - /** - * Converts a byte array to a short value considering it was written in big-endian format. - * @param bytes byte array - * @param offset offset into array - * @return the short value - */ - public static short toShort(byte[] bytes, int offset) { - if (littleEndian) { - return Short.reverseBytes(theUnsafe.getShort(bytes, offset + BYTE_ARRAY_BASE_OFFSET)); - } else { - return theUnsafe.getShort(bytes, offset + BYTE_ARRAY_BASE_OFFSET); - } - } - - /** - * Converts a byte array to an int value considering it was written in big-endian format. - * @param bytes byte array - * @param offset offset into array - * @return the int value - */ - public static int toInt(byte[] bytes, int offset) { - if (littleEndian) { - return Integer.reverseBytes(theUnsafe.getInt(bytes, offset + BYTE_ARRAY_BASE_OFFSET)); - } else { - return theUnsafe.getInt(bytes, offset + BYTE_ARRAY_BASE_OFFSET); - } - } - - /** - * Converts a byte array to a long value considering it was written in big-endian format. - * @param bytes byte array - * @param offset offset into array - * @return the long value - */ - public static long toLong(byte[] bytes, int offset) { - if (littleEndian) { - return Long.reverseBytes(theUnsafe.getLong(bytes, offset + BYTE_ARRAY_BASE_OFFSET)); - } else { - return theUnsafe.getLong(bytes, offset + BYTE_ARRAY_BASE_OFFSET); - } - } - - // APIs to write primitive data to a byte[] using Unsafe way - /** - * Put a short value out to the specified byte array position in big-endian format. - * @param bytes the byte array - * @param offset position in the array - * @param val short to write out - * @return incremented offset - */ - public static int putShort(byte[] bytes, int offset, short val) { - if (littleEndian) { - val = Short.reverseBytes(val); - } - theUnsafe.putShort(bytes, offset + BYTE_ARRAY_BASE_OFFSET, val); - return offset + Bytes.SIZEOF_SHORT; - } - - /** - * Put an int value out to the specified byte array position in big-endian format. - * @param bytes the byte array - * @param offset position in the array - * @param val int to write out - * @return incremented offset - */ - public static int putInt(byte[] bytes, int offset, int val) { - if (littleEndian) { - val = Integer.reverseBytes(val); - } - theUnsafe.putInt(bytes, offset + BYTE_ARRAY_BASE_OFFSET, val); - return offset + Bytes.SIZEOF_INT; - } - - /** - * Put a long value out to the specified byte array position in big-endian format. - * @param bytes the byte array - * @param offset position in the array - * @param val long to write out - * @return incremented offset - */ - public static int putLong(byte[] bytes, int offset, long val) { - if (littleEndian) { - val = Long.reverseBytes(val); - } - theUnsafe.putLong(bytes, offset + BYTE_ARRAY_BASE_OFFSET, val); - return offset + Bytes.SIZEOF_LONG; - } - - // APIs to read primitive data from a ByteBuffer using Unsafe way - /** - * Reads a short value at the given buffer's offset considering it was written in big-endian - * format. - * - * @param buf - * @param offset - * @return short value at offset - */ - public static short toShort(ByteBuffer buf, int offset) { - if (littleEndian) { - return Short.reverseBytes(getAsShort(buf, offset)); - } - return getAsShort(buf, offset); - } - - /** - * Reads bytes at the given offset as a short value. - * @param buf - * @param offset - * @return short value at offset - */ - static short getAsShort(ByteBuffer buf, int offset) { - if (buf.isDirect()) { - return theUnsafe.getShort(((DirectBuffer) buf).address() + offset); - } - return theUnsafe.getShort(buf.array(), BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset); - } - - /** - * Reads an int value at the given buffer's offset considering it was written in big-endian - * format. - * - * @param buf - * @param offset - * @return int value at offset - */ - public static int toInt(ByteBuffer buf, int offset) { - if (littleEndian) { - return Integer.reverseBytes(getAsInt(buf, offset)); - } - return getAsInt(buf, offset); - } - - /** - * Reads bytes at the given offset as an int value. - * @param buf - * @param offset - * @return int value at offset - */ - static int getAsInt(ByteBuffer buf, int offset) { - if (buf.isDirect()) { - return theUnsafe.getInt(((DirectBuffer) buf).address() + offset); - } - return theUnsafe.getInt(buf.array(), BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset); - } - - /** - * Reads a long value at the given buffer's offset considering it was written in big-endian - * format. - * - * @param buf - * @param offset - * @return long value at offset - */ - public static long toLong(ByteBuffer buf, int offset) { - if (littleEndian) { - return Long.reverseBytes(getAsLong(buf, offset)); - } - return getAsLong(buf, offset); - } - - /** - * Reads bytes at the given offset as a long value. - * @param buf - * @param offset - * @return long value at offset - */ - static long getAsLong(ByteBuffer buf, int offset) { - if (buf.isDirect()) { - return theUnsafe.getLong(((DirectBuffer) buf).address() + offset); - } - return theUnsafe.getLong(buf.array(), BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset); - } - - /** - * Put an int value out to the specified ByteBuffer offset in big-endian format. - * @param buf the ByteBuffer to write to - * @param offset offset in the ByteBuffer - * @param val int to write out - * @return incremented offset - */ - public static int putInt(ByteBuffer buf, int offset, int val) { - if (littleEndian) { - val = Integer.reverseBytes(val); - } - if (buf.isDirect()) { - theUnsafe.putInt(((DirectBuffer) buf).address() + offset, val); - } else { - theUnsafe.putInt(buf.array(), offset + buf.arrayOffset() + BYTE_ARRAY_BASE_OFFSET, val); - } - return offset + Bytes.SIZEOF_INT; - } - - // APIs to copy data. This will be direct memory location copy and will be much faster - /** - * Copies the bytes from given array's offset to length part into the given buffer. - * @param src - * @param srcOffset - * @param dest - * @param destOffset - * @param length - */ - public static void copy(byte[] src, int srcOffset, ByteBuffer dest, int destOffset, int length) { - long destAddress = destOffset; - Object destBase = null; - if (dest.isDirect()) { - destAddress = destAddress + ((DirectBuffer) dest).address(); - } else { - destAddress = destAddress + BYTE_ARRAY_BASE_OFFSET + dest.arrayOffset(); - destBase = dest.array(); - } - long srcAddress = srcOffset + BYTE_ARRAY_BASE_OFFSET; - theUnsafe.copyMemory(src, srcAddress, destBase, destAddress, length); - } - - /** - * Copies specified number of bytes from given offset of {@code src} ByteBuffer to the - * {@code dest} array. - * - * @param src - * @param srcOffset - * @param dest - * @param destOffset - * @param length - */ - public static void copy(ByteBuffer src, int srcOffset, byte[] dest, int destOffset, int length) { - long srcAddress = srcOffset; - Object srcBase = null; - if (src.isDirect()) { - srcAddress = srcAddress + ((DirectBuffer) src).address(); - } else { - srcAddress = srcAddress + BYTE_ARRAY_BASE_OFFSET + src.arrayOffset(); - srcBase = src.array(); - } - long destAddress = destOffset + BYTE_ARRAY_BASE_OFFSET; - theUnsafe.copyMemory(srcBase, srcAddress, dest, destAddress, length); - } - - /** - * Copies specified number of bytes from given offset of {@code src} buffer into the {@code dest} - * buffer. - * - * @param src - * @param srcOffset - * @param dest - * @param destOffset - * @param length - */ - public static void copy(ByteBuffer src, int srcOffset, ByteBuffer dest, int destOffset, int length) { - long srcAddress, destAddress; - Object srcBase = null, destBase = null; - if (src.isDirect()) { - srcAddress = srcOffset + ((DirectBuffer) src).address(); - } else { - srcAddress = srcOffset + src.arrayOffset() + BYTE_ARRAY_BASE_OFFSET; - srcBase = src.array(); - } - if (dest.isDirect()) { - destAddress = destOffset + ((DirectBuffer) dest).address(); - } else { - destAddress = destOffset + BYTE_ARRAY_BASE_OFFSET + dest.arrayOffset(); - destBase = dest.array(); - } - theUnsafe.copyMemory(srcBase, srcAddress, destBase, destAddress, length); - } - - // APIs to add primitives to BBs - /** - * Put a short value out to the specified BB position in big-endian format. - * @param buf the byte buffer - * @param offset position in the buffer - * @param val short to write out - * @return incremented offset - */ - public static int putShort(ByteBuffer buf, int offset, short val) { - if (littleEndian) { - val = Short.reverseBytes(val); - } - if (buf.isDirect()) { - theUnsafe.putShort(((DirectBuffer) buf).address() + offset, val); - } else { - theUnsafe.putShort(buf.array(), BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset, val); - } - return offset + Bytes.SIZEOF_SHORT; - } - - /** - * Put a long value out to the specified BB position in big-endian format. - * @param buf the byte buffer - * @param offset position in the buffer - * @param val long to write out - * @return incremented offset - */ - public static int putLong(ByteBuffer buf, int offset, long val) { - if (littleEndian) { - val = Long.reverseBytes(val); - } - if (buf.isDirect()) { - theUnsafe.putLong(((DirectBuffer) buf).address() + offset, val); - } else { - theUnsafe.putLong(buf.array(), BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset, val); - } - return offset + Bytes.SIZEOF_LONG; - } - - /** - * Put a byte value out to the specified BB position in big-endian format. - * @param buf the byte buffer - * @param offset position in the buffer - * @param b byte to write out - * @return incremented offset - */ - public static int putByte(ByteBuffer buf, int offset, byte b) { - if (buf.isDirect()) { - theUnsafe.putByte(((DirectBuffer) buf).address() + offset, b); - } else { - theUnsafe.putByte(buf.array(), BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset, b); - } - return offset + 1; - } - - /** - * Returns the byte at the given offset - * @param buf the buffer to read - * @param offset the offset at which the byte has to be read - * @return the byte at the given offset - */ - public static byte toByte(ByteBuffer buf, int offset) { - if (buf.isDirect()) { - return theUnsafe.getByte(((DirectBuffer) buf).address() + offset); - } else { - return theUnsafe.getByte(buf.array(), BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset); - } - } -}
