http://git-wip-us.apache.org/repos/asf/kylin/blob/545201f6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java
deleted file mode 100644
index a900ea1..0000000
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.kylin.gridtable.StorageSideBehavior;
-import org.apache.kylin.measure.MeasureAggregator;
-import org.apache.kylin.storage.hbase.common.coprocessor.AggrKey;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
-
-/**
- * @author yangli9
- * 
- */
-public class AggregationScanner implements RegionScanner {
-
-    private RegionScanner outerScanner;
-    private StorageSideBehavior behavior;
-
-    public AggregationScanner(CoprocessorRowType type, CoprocessorFilter 
filter, CoprocessorProjector groupBy, ObserverAggregators aggrs, RegionScanner 
innerScanner, StorageSideBehavior behavior) throws IOException {
-
-        AggregateRegionObserver.LOG.info("Kylin Coprocessor start");
-
-        this.behavior = behavior;
-
-        ObserverAggregationCache aggCache;
-        Stats stats = new Stats();
-
-        aggCache = buildAggrCache(innerScanner, type, groupBy, aggrs, filter, 
stats);
-        stats.countOutputRow(aggCache.getSize());
-        this.outerScanner = aggCache.getScanner(innerScanner);
-
-        AggregateRegionObserver.LOG.info("Kylin Coprocessor aggregation done: 
" + stats);
-    }
-
-    @SuppressWarnings("rawtypes")
-    ObserverAggregationCache buildAggrCache(final RegionScanner innerScanner, 
CoprocessorRowType type, CoprocessorProjector projector, ObserverAggregators 
aggregators, CoprocessorFilter filter, Stats stats) throws IOException {
-
-        ObserverAggregationCache aggCache = new 
ObserverAggregationCache(aggregators);
-
-        ObserverTuple tuple = new ObserverTuple(type);
-        boolean hasMore = true;
-        List<Cell> results = new ArrayList<Cell>();
-        byte meaninglessByte = 0;
-
-        while (hasMore) {
-            results.clear();
-            hasMore = innerScanner.nextRaw(results);
-            if (results.isEmpty())
-                continue;
-
-            if (stats != null)
-                stats.countInputRow(results);
-
-            Cell cell = results.get(0);
-            tuple.setUnderlying(cell.getRowArray(), cell.getRowOffset(), 
cell.getRowLength());
-
-            if (behavior == StorageSideBehavior.SCAN) {
-                //touch every byte of the cell so that the cost of scanning 
will be trully reflected
-                int endIndex = cell.getRowOffset() + cell.getRowLength();
-                for (int i = cell.getRowOffset(); i < endIndex; ++i) {
-                    meaninglessByte += cell.getRowArray()[i];
-                }
-            } else {
-                if (behavior.filterToggledOn()) {
-                    if (filter != null && filter.evaluate(tuple) == false)
-                        continue;
-
-                    if (behavior.aggrToggledOn()) {
-                        AggrKey aggKey = projector.getAggrKey(results);
-                        MeasureAggregator[] bufs = aggCache.getBuffer(aggKey);
-                        aggregators.aggregate(bufs, results);
-
-                        if (behavior.ordinal() >= 
StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) {
-                            aggCache.checkMemoryUsage();
-                        }
-                    }
-                }
-            }
-        }
-
-        if (behavior == StorageSideBehavior.SCAN) {
-            System.out.println("meaningless byte is now " + meaninglessByte);
-        }
-
-        return aggCache;
-    }
-
-    @Override
-    public boolean next(List<Cell> results) throws IOException {
-        return outerScanner.next(results);
-    }
-
-    @Override
-    public boolean next(List<Cell> result, int limit) throws IOException {
-        return outerScanner.next(result, limit);
-    }
-
-    @Override
-    public boolean nextRaw(List<Cell> result) throws IOException {
-        return outerScanner.nextRaw(result);
-    }
-
-    @Override
-    public boolean nextRaw(List<Cell> result, int limit) throws IOException {
-        return outerScanner.nextRaw(result, limit);
-    }
-
-    @Override
-    public void close() throws IOException {
-        outerScanner.close();
-    }
-
-    @Override
-    public HRegionInfo getRegionInfo() {
-        return outerScanner.getRegionInfo();
-    }
-
-    @Override
-    public boolean isFilterDone() throws IOException {
-        return outerScanner.isFilterDone();
-    }
-
-    @Override
-    public boolean reseek(byte[] row) throws IOException {
-        return outerScanner.reseek(row);
-    }
-
-    @Override
-    public long getMaxResultSize() {
-        return outerScanner.getMaxResultSize();
-    }
-
-    @Override
-    public long getMvccReadPoint() {
-        return outerScanner.getMvccReadPoint();
-    }
-
-    private static class Stats {
-        long inputRows = 0;
-        long inputBytes = 0;
-        long outputRows = 0;
-
-        // have no outputBytes because that requires actual serialize all the
-        // aggregator buffers
-
-        public void countInputRow(List<Cell> row) {
-            inputRows++;
-            inputBytes += row.get(0).getRowLength();
-            for (int i = 0, n = row.size(); i < n; i++) {
-                inputBytes += row.get(i).getValueLength();
-            }
-        }
-
-        public void countOutputRow(long rowCount) {
-            outputRows += rowCount;
-        }
-
-        public String toString() {
-            double percent = (double) outputRows / inputRows * 100;
-            return Math.round(percent) + "% = " + outputRows + " (out rows) / 
" + inputRows + " (in rows); in bytes = " + inputBytes + "; est. out bytes = " 
+ Math.round(inputBytes * percent / 100);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/545201f6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregationCache.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregationCache.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregationCache.java
deleted file mode 100644
index 8404262..0000000
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregationCache.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValue.Type;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.kylin.measure.MeasureAggregator;
-import org.apache.kylin.storage.hbase.common.coprocessor.AggrKey;
-import org.apache.kylin.storage.hbase.common.coprocessor.AggregationCache;
-
-/**
- * @author yangli9
- */
-@SuppressWarnings("rawtypes")
-public class ObserverAggregationCache extends AggregationCache {
-
-    private final ObserverAggregators aggregators;
-
-    public ObserverAggregationCache(ObserverAggregators aggregators) {
-        this.aggregators = aggregators;
-    }
-
-    public RegionScanner getScanner(RegionScanner innerScanner) {
-        return new AggregationRegionScanner(innerScanner);
-    }
-
-    @Override
-    public MeasureAggregator[] createBuffer() {
-        return aggregators.createBuffer();
-    }
-
-    private class AggregationRegionScanner implements RegionScanner {
-
-        private final RegionScanner innerScanner;
-        private final Iterator<Entry<AggrKey, MeasureAggregator[]>> iterator;
-
-        public AggregationRegionScanner(RegionScanner innerScanner) {
-            this.innerScanner = innerScanner;
-            this.iterator = aggBufMap.entrySet().iterator();
-        }
-
-        @Override
-        public boolean next(List<Cell> results) throws IOException {
-            try {
-                // AggregateRegionObserver.LOG.info("Kylin Scanner next()");
-                boolean hasMore = false;
-                if (iterator.hasNext()) {
-                    Entry<AggrKey, MeasureAggregator[]> entry = 
iterator.next();
-                    makeCells(entry, results);
-                    hasMore = iterator.hasNext();
-                }
-                // AggregateRegionObserver.LOG.info("Kylin Scanner next() 
done");
-
-                return hasMore;
-            } catch (Exception e) {
-                throw new IOException("Error when calling next", e);
-            }
-        }
-
-        private void makeCells(Entry<AggrKey, MeasureAggregator[]> entry, 
List<Cell> results) {
-            byte[][] families = aggregators.getHColFamilies();
-            byte[][] qualifiers = aggregators.getHColQualifiers();
-            int nHCols = aggregators.getHColsNum();
-
-            AggrKey rowKey = entry.getKey();
-            MeasureAggregator[] aggBuf = entry.getValue();
-            ByteBuffer[] rowValues = aggregators.getHColValues(aggBuf);
-
-            if (nHCols == 0) {
-                Cell keyValue = new KeyValue(rowKey.get(), rowKey.offset(), 
rowKey.length(), //
-                        null, 0, 0, //
-                        null, 0, 0, //
-                        HConstants.LATEST_TIMESTAMP, Type.Put, //
-                        null, 0, 0);
-                results.add(keyValue);
-            } else {
-                for (int i = 0; i < nHCols; i++) {
-                    Cell keyValue = new KeyValue(rowKey.get(), 
rowKey.offset(), rowKey.length(), //
-                            families[i], 0, families[i].length, //
-                            qualifiers[i], 0, qualifiers[i].length, //
-                            HConstants.LATEST_TIMESTAMP, Type.Put, //
-                            rowValues[i].array(), 0, rowValues[i].position());
-                    results.add(keyValue);
-                }
-            }
-        }
-
-        @Override
-        public boolean next(List<Cell> result, int limit) throws IOException {
-            return next(result);
-        }
-
-        @Override
-        public boolean nextRaw(List<Cell> result) throws IOException {
-            return next(result);
-        }
-
-        @Override
-        public boolean nextRaw(List<Cell> result, int limit) throws 
IOException {
-            return next(result);
-        }
-
-        @Override
-        public void close() throws IOException {
-            // AggregateRegionObserver.LOG.info("Kylin Scanner close()");
-            innerScanner.close();
-            // AggregateRegionObserver.LOG.info("Kylin Scanner close() done");
-        }
-
-        @Override
-        public HRegionInfo getRegionInfo() {
-            // AggregateRegionObserver.LOG.info("Kylin Scanner 
getRegionInfo()");
-            return innerScanner.getRegionInfo();
-        }
-
-        @Override
-        public long getMaxResultSize() {
-            // AggregateRegionObserver.LOG.info("Kylin Scanner 
getMaxResultSize()");
-            return Long.MAX_VALUE;
-        }
-
-        @Override
-        public boolean isFilterDone() throws IOException {
-            // AggregateRegionObserver.LOG.info("Kylin Scanner 
isFilterDone()");
-            return false;
-        }
-
-        @Override
-        public boolean reseek(byte[] row) throws IOException {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public long getMvccReadPoint() {
-            // AggregateRegionObserver.LOG.info("Kylin Scanner 
getMvccReadPoint()");
-            return Long.MAX_VALUE;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/545201f6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java
deleted file mode 100644
index 29a30c1..0000000
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.List;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesSerializer;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.measure.BufferedMeasureCodec;
-import org.apache.kylin.measure.MeasureAggregator;
-import org.apache.kylin.measure.MeasureType;
-import org.apache.kylin.measure.MeasureTypeFactory;
-import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.storage.hbase.steps.RowValueDecoder;
-
-/**
- * @author yangli9
- */
-@SuppressWarnings({ "rawtypes", "unchecked" })
-public class ObserverAggregators {
-
-    public static ObserverAggregators 
fromValueDecoders(Collection<RowValueDecoder> rowValueDecoders) {
-
-        // each decoder represents one HBase column
-        HCol[] hcols = new HCol[rowValueDecoders.size()];
-        int i = 0;
-        for (RowValueDecoder rowValueDecoder : rowValueDecoders) {
-            hcols[i++] = buildHCol(rowValueDecoder.getHBaseColumn());
-        }
-
-        ObserverAggregators aggrs = new ObserverAggregators(hcols);
-        return aggrs;
-
-    }
-
-    private static HCol buildHCol(HBaseColumnDesc desc) {
-        byte[] family = Bytes.toBytes(desc.getColumnFamilyName());
-        byte[] qualifier = Bytes.toBytes(desc.getQualifier());
-        MeasureDesc[] measures = desc.getMeasures();
-
-        String[] funcNames = new String[measures.length];
-        String[] dataTypes = new String[measures.length];
-
-        for (int i = 0; i < measures.length; i++) {
-            funcNames[i] = measures[i].getFunction().getExpression();
-            dataTypes[i] = measures[i].getFunction().getReturnType();
-        }
-
-        return new HCol(family, qualifier, funcNames, dataTypes);
-    }
-
-    public static byte[] serialize(ObserverAggregators o) {
-        ByteBuffer buf = 
ByteBuffer.allocate(BytesSerializer.SERIALIZE_BUFFER_SIZE);
-        serializer.serialize(o, buf);
-        byte[] result = new byte[buf.position()];
-        System.arraycopy(buf.array(), 0, result, 0, buf.position());
-        return result;
-    }
-
-    public static ObserverAggregators deserialize(byte[] bytes) {
-        return serializer.deserialize(ByteBuffer.wrap(bytes));
-    }
-
-    private static final BytesSerializer<ObserverAggregators> serializer = new 
BytesSerializer<ObserverAggregators>() {
-
-        @Override
-        public void serialize(ObserverAggregators value, ByteBuffer out) {
-            BytesUtil.writeVInt(value.nHCols, out);
-            for (int i = 0; i < value.nHCols; i++) {
-                HCol col = value.hcols[i];
-                BytesUtil.writeByteArray(col.family, out);
-                BytesUtil.writeByteArray(col.qualifier, out);
-                BytesUtil.writeAsciiStringArray(col.funcNames, out);
-                BytesUtil.writeAsciiStringArray(col.dataTypes, out);
-            }
-        }
-
-        @Override
-        public ObserverAggregators deserialize(ByteBuffer in) {
-            int nHCols = BytesUtil.readVInt(in);
-            HCol[] hcols = new HCol[nHCols];
-            for (int i = 0; i < nHCols; i++) {
-                byte[] family = BytesUtil.readByteArray(in);
-                byte[] qualifier = BytesUtil.readByteArray(in);
-                String[] funcNames = BytesUtil.readAsciiStringArray(in);
-                String[] dataTypes = BytesUtil.readAsciiStringArray(in);
-                hcols[i] = new HCol(family, qualifier, funcNames, dataTypes);
-            }
-            return new ObserverAggregators(hcols);
-        }
-
-    };
-
-    // 
============================================================================
-
-    final HCol[] hcols;
-    final int nHCols;
-    final ByteBuffer[] hColValues;
-    final int nTotalMeasures;
-
-    MeasureType[] measureTypes;
-
-    public ObserverAggregators(HCol[] _hcols) {
-        this.hcols = sort(_hcols);
-        this.nHCols = hcols.length;
-        this.hColValues = new ByteBuffer[nHCols];
-
-        int nTotalMeasures = 0;
-        for (HCol col : hcols)
-            nTotalMeasures += col.nMeasures;
-        this.nTotalMeasures = nTotalMeasures;
-    }
-
-    private HCol[] sort(HCol[] hcols) {
-        HCol[] copy = Arrays.copyOf(hcols, hcols.length);
-        Arrays.sort(copy, new Comparator<HCol>() {
-            @Override
-            public int compare(HCol o1, HCol o2) {
-                int comp = Bytes.compareTo(o1.family, o2.family);
-                if (comp != 0)
-                    return comp;
-                comp = Bytes.compareTo(o1.qualifier, o2.qualifier);
-                return comp;
-            }
-        });
-        return copy;
-    }
-
-    public MeasureAggregator[] createBuffer() {
-        if (measureTypes == null) {
-            measureTypes = new MeasureType[nTotalMeasures];
-            int i = 0;
-            for (HCol col : hcols) {
-                for (int j = 0; j < col.nMeasures; j++)
-                    measureTypes[i++] = 
MeasureTypeFactory.create(col.funcNames[j], DataType.getType(col.dataTypes[j]));
-            }
-        }
-
-        MeasureAggregator[] aggrs = new MeasureAggregator[nTotalMeasures];
-        for (int i = 0; i < nTotalMeasures; i++) {
-            aggrs[i] = measureTypes[i].newAggregator();
-        }
-        return aggrs;
-    }
-
-    public void aggregate(MeasureAggregator[] measureAggrs, List<Cell> 
rowCells) {
-        int i = 0;
-        for (int ci = 0; ci < nHCols; ci++) {
-            HCol col = hcols[ci];
-            Cell cell = findCell(col, rowCells);
-
-            if (cell == null) {
-                i += col.nMeasures;
-                continue;
-            }
-
-            ByteBuffer input = ByteBuffer.wrap(cell.getValueArray(), 
cell.getValueOffset(), cell.getValueLength());
-
-            col.measureCodec.decode(input, col.measureValues);
-            for (int j = 0; j < col.nMeasures; j++)
-                measureAggrs[i++].aggregate(col.measureValues[j]);
-        }
-    }
-
-    private Cell findCell(HCol col, List<Cell> cells) {
-        // cells are ordered by timestamp asc, thus search from back, first hit
-        // is the latest version
-        for (int i = cells.size() - 1; i >= 0; i--) {
-            Cell cell = cells.get(i);
-            if (match(col, cell)) {
-                return cell;
-            }
-        }
-        return null;
-    }
-
-    public static boolean match(HCol col, Cell cell) {
-        return Bytes.compareTo(col.family, 0, col.family.length, 
cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) == 0 && 
Bytes.compareTo(col.qualifier, 0, col.qualifier.length, 
cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) 
== 0;
-    }
-
-    public int getHColsNum() {
-        return nHCols;
-    }
-
-    public byte[][] getHColFamilies() {
-        byte[][] result = new byte[nHCols][];
-        for (int i = 0; i < nHCols; i++)
-            result[i] = hcols[i].family;
-        return result;
-    }
-
-    public byte[][] getHColQualifiers() {
-        byte[][] result = new byte[nHCols][];
-        for (int i = 0; i < nHCols; i++)
-            result[i] = hcols[i].qualifier;
-        return result;
-    }
-
-    public ByteBuffer[] getHColValues(MeasureAggregator[] aggrs) {
-        int i = 0;
-        for (int ci = 0; ci < nHCols; ci++) {
-            HCol col = hcols[ci];
-            for (int j = 0; j < col.nMeasures; j++)
-                col.measureValues[j] = aggrs[i++].getState();
-
-            hColValues[ci] = col.measureCodec.encode(col.measureValues);
-        }
-        return hColValues;
-    }
-
-    // 
============================================================================
-
-    public static class HCol {
-        final byte[] family;
-        final byte[] qualifier;
-        final String[] funcNames;
-        final String[] dataTypes;
-        final int nMeasures;
-
-        final BufferedMeasureCodec measureCodec;
-        final Object[] measureValues;
-
-        public HCol(byte[] bFamily, byte[] bQualifier, String[] funcNames, 
String[] dataTypes) {
-            this.family = bFamily;
-            this.qualifier = bQualifier;
-            this.funcNames = funcNames;
-            this.dataTypes = dataTypes;
-            this.nMeasures = funcNames.length;
-            assert funcNames.length == dataTypes.length;
-
-            this.measureCodec = new BufferedMeasureCodec(dataTypes);
-            this.measureValues = new Object[nMeasures];
-        }
-
-        @Override
-        public String toString() {
-            return "HCol [bFamily=" + Bytes.toString(family) + ", bQualifier=" 
+ Bytes.toString(qualifier) + ", nMeasures=" + nMeasures + "]";
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/545201f6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java
deleted file mode 100644
index 394b3e2..0000000
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.debug.BackdoorToggles;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.gridtable.StorageSideBehavior;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
-import org.apache.kylin.storage.hbase.common.coprocessor.FilterDecorator;
-import org.apache.kylin.storage.hbase.cube.v1.RegionScannerAdapter;
-import org.apache.kylin.storage.hbase.cube.v1.ResultScannerAdapter;
-import org.apache.kylin.storage.hbase.steps.RowValueDecoder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-/**
- * @author yangli9
- */
-public class ObserverEnabler {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(ObserverEnabler.class);
-
-    static final String FORCE_COPROCESSOR = "forceObserver";
-    static final Map<String, Boolean> CUBE_OVERRIDES = Maps.newConcurrentMap();
-
-    public static ResultScanner scanWithCoprocessorIfBeneficial(CubeSegment 
segment, Cuboid cuboid, TupleFilter tupleFiler, //
-            Collection<TblColRef> groupBy, Collection<RowValueDecoder> 
rowValueDecoders, StorageContext context, HTableInterface table, Scan scan) 
throws IOException {
-
-        if (context.isCoprocessorEnabled() == false) {
-            return table.getScanner(scan);
-        }
-
-        CoprocessorRowType type = CoprocessorRowType.fromCuboid(segment, 
cuboid);
-        CoprocessorFilter filter = 
CoprocessorFilter.fromFilter(segment.getDimensionEncodingMap(), tupleFiler, 
FilterDecorator.FilterConstantsTreatment.REPLACE_WITH_GLOBAL_DICT);
-        CoprocessorProjector projector = 
CoprocessorProjector.makeForObserver(segment, cuboid, groupBy);
-        ObserverAggregators aggrs = 
ObserverAggregators.fromValueDecoders(rowValueDecoders);
-
-        boolean localCoprocessor = 
KylinConfig.getInstanceFromEnv().getQueryRunLocalCoprocessor() || 
BackdoorToggles.getRunLocalCoprocessor();
-
-        if (localCoprocessor) {
-            RegionScanner innerScanner = new 
RegionScannerAdapter(table.getScanner(scan));
-            AggregationScanner aggrScanner = new AggregationScanner(type, 
filter, projector, aggrs, innerScanner, 
StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM);
-            return new ResultScannerAdapter(aggrScanner);
-        } else {
-
-            // debug/profiling purpose
-            String toggle = BackdoorToggles.getCoprocessorBehavior();
-            if (toggle == null) {
-                toggle = 
StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString(); //default behavior
-            } else {
-                logger.info("The execution of this query will use " + toggle + 
" as observer's behavior");
-            }
-
-            scan.setAttribute(AggregateRegionObserver.COPROCESSOR_ENABLE, new 
byte[] { 0x01 });
-            scan.setAttribute(AggregateRegionObserver.BEHAVIOR, 
toggle.getBytes());
-            scan.setAttribute(AggregateRegionObserver.TYPE, 
CoprocessorRowType.serialize(type));
-            scan.setAttribute(AggregateRegionObserver.PROJECTOR, 
CoprocessorProjector.serialize(projector));
-            scan.setAttribute(AggregateRegionObserver.AGGREGATORS, 
ObserverAggregators.serialize(aggrs));
-            scan.setAttribute(AggregateRegionObserver.FILTER, 
CoprocessorFilter.serialize(filter));
-            return table.getScanner(scan);
-        }
-    }
-
-    public static void enableCoprocessorIfBeneficial(CubeInstance cube, 
Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, 
StorageContext context) {
-        if (isCoprocessorBeneficial(cube, groupBy, rowValueDecoders, context)) 
{
-            context.enableCoprocessor();
-        }
-    }
-
-    private static boolean isCoprocessorBeneficial(CubeInstance cube, 
Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, 
StorageContext context) {
-
-        String forceFlag = System.getProperty(FORCE_COPROCESSOR);
-        if (forceFlag != null) {
-            boolean r = Boolean.parseBoolean(forceFlag);
-            logger.info("Coprocessor is " + (r ? "enabled" : "disabled") + " 
according to sys prop " + FORCE_COPROCESSOR);
-            return r;
-        }
-
-        Boolean cubeOverride = CUBE_OVERRIDES.get(cube.getName());
-        if (cubeOverride != null) {
-            boolean r = cubeOverride.booleanValue();
-            logger.info("Coprocessor is " + (r ? "enabled" : "disabled") + " 
according to cube overrides");
-            return r;
-        }
-
-        if (RowValueDecoder.hasMemHungryMeasures(rowValueDecoders)) {
-            logger.info("Coprocessor is disabled because there is memory 
hungry count distinct");
-            return false;
-        }
-
-        if (context.isExactAggregation()) {
-            logger.info("Coprocessor is disabled because exactAggregation is 
true");
-            return false;
-        }
-
-        Cuboid cuboid = context.getCuboid();
-        Set<TblColRef> toAggr = 
Sets.newHashSet(cuboid.getAggregationColumns());
-        toAggr.removeAll(groupBy);
-        if (toAggr.isEmpty()) {
-            logger.info("Coprocessor is disabled because no additional columns 
to aggregate");
-            return false;
-        }
-
-        logger.info("Coprocessor is enabled to aggregate " + toAggr + ", 
returning " + groupBy);
-        return true;
-    }
-
-    @SuppressWarnings("unused")
-    private static int getBitsToScan(byte[] startKey, byte[] stopKey) {
-        // find the first bit difference from the beginning
-        int totalBits = startKey.length * 8;
-        int bitsToScan = totalBits;
-        for (int i = 0; i < totalBits; i++) {
-            int byteIdx = i / 8;
-            int bitIdx = 7 - i % 8;
-            byte bitMask = (byte) (1 << bitIdx);
-            if ((startKey[byteIdx] & bitMask) == (stopKey[byteIdx] & bitMask))
-                bitsToScan--;
-            else
-                break;
-        }
-        return bitsToScan;
-    }
-
-    public static void forceCoprocessorOn() {
-        System.setProperty(FORCE_COPROCESSOR, "true");
-    }
-
-    public static void forceCoprocessorOff() {
-        System.setProperty(FORCE_COPROCESSOR, "false");
-    }
-
-    public static String getForceCoprocessor() {
-        return System.getProperty(FORCE_COPROCESSOR);
-    }
-
-    public static void forceCoprocessorUnset() {
-        System.clearProperty(FORCE_COPROCESSOR);
-    }
-
-    public static void updateCubeOverride(String cubeName, String force) {
-        if ("null".equalsIgnoreCase(force) || 
"default".equalsIgnoreCase(force)) {
-            CUBE_OVERRIDES.remove(cubeName);
-        } else if ("true".equalsIgnoreCase(force)) {
-            CUBE_OVERRIDES.put(cubeName, Boolean.TRUE);
-        } else if ("false".equalsIgnoreCase(force)) {
-            CUBE_OVERRIDES.put(cubeName, Boolean.FALSE);
-        }
-    }
-
-    public static Map<String, Boolean> getCubeOverrides() {
-        return CUBE_OVERRIDES;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/545201f6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverTuple.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverTuple.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverTuple.java
deleted file mode 100644
index e2236d3..0000000
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverTuple.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer;
-
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
-import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
-
-/**
- * A special kind of tuple that exposes column value (dictionary ID) directly 
on
- * top of row key.
- *
- * @author yangli9
- */
-public class ObserverTuple implements IEvaluatableTuple {
-
-    final CoprocessorRowType type;
-
-    ImmutableBytesWritable rowkey;
-    String[] values;
-
-    public ObserverTuple(CoprocessorRowType type) {
-        this.type = type;
-        this.rowkey = new ImmutableBytesWritable();
-        this.values = new String[type.getColumnCount()];
-    }
-
-    public void setUnderlying(byte[] array, int offset, int length) {
-        rowkey.set(array, offset, length);
-        for (int i = 0; i < values.length; i++) {
-            values[i] = null;
-        }
-    }
-
-    private String getValueAt(int i) {
-        int n = type.getColumnCount();
-        if (i < 0 || i >= n)
-            return null;
-
-        if (values[i] == null) {
-            values[i] = Dictionary.dictIdToString(rowkey.get(), 
rowkey.getOffset() + type.columnOffsets[i], type.columnSizes[i]);
-        }
-
-        return values[i];
-    }
-
-    @Override
-    public Object getValue(TblColRef col) {
-        int i = type.getColIndexByTblColRef(col);
-        return getValueAt(i);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/545201f6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/FuzzyRowFilterV2.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/FuzzyRowFilterV2.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/FuzzyRowFilterV2.java
deleted file mode 100644
index ea33f9a..0000000
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/FuzzyRowFilterV2.java
+++ /dev/null
@@ -1,574 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase.cube.v1.filter;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-import java.util.PriorityQueue;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.FilterBase;
-import org.apache.hadoop.hbase.util.ByteStringer;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.kylin.storage.hbase.cube.v1.filter.generated.FilterProtosExt;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.InvalidProtocolBufferException;
-
-/**
- * This is optimized version of a standard FuzzyRowFilter Filters data based 
on fuzzy row key.
- * Performs fast-forwards during scanning. It takes pairs (row key, fuzzy 
info) to match row keys.
- * Where fuzzy info is a byte array with 0 or 1 as its values:
- * <ul>
- * <li>0 - means that this byte in provided row key is fixed, i.e. row key's 
byte at same position
- * must match</li>
- * <li>1 - means that this byte in provided row key is NOT fixed, i.e. row 
key's byte at this
- * position can be different from the one in provided row key</li>
- * </ul>
- * Example: Let's assume row key format is userId_actionId_year_month. Length 
of userId is fixed and
- * is 4, length of actionId is 2 and year and month are 4 and 2 bytes long 
respectively. Let's
- * assume that we need to fetch all users that performed certain action 
(encoded as "99") in Jan of
- * any year. Then the pair (row key, fuzzy info) would be the following: row 
key = "????_99_????_01"
- * (one can use any value instead of "?") fuzzy info =
- * "\x01\x01\x01\x01\x00\x00\x00\x00\x01\x01\x01\x01\x00\x00\x00" I.e. fuzzy 
info tells the matching
- * mask is "????_99_????_01", where at ? can be any value.
- */
[email protected]
[email protected]
-public class FuzzyRowFilterV2 extends FilterBase {
-    private List<Pair<byte[], byte[]>> fuzzyKeysData;
-    private boolean done = false;
-
-    /**
-     * The index of a last successfully found matching fuzzy string (in 
fuzzyKeysData). We will start
-     * matching next KV with this one. If they do not match then we will 
return back to the one-by-one
-     * iteration over fuzzyKeysData.
-     */
-    private int lastFoundIndex = -1;
-
-    /**
-     * Row tracker (keeps all next rows after SEEK_NEXT_USING_HINT was 
returned)
-     */
-    private RowTracker tracker;
-
-    public FuzzyRowFilterV2(List<Pair<byte[], byte[]>> fuzzyKeysData) {
-        Pair<byte[], byte[]> p;
-        for (int i = 0; i < fuzzyKeysData.size(); i++) {
-            p = fuzzyKeysData.get(i);
-            if (p.getFirst().length != p.getSecond().length) {
-                Pair<String, String> readable = 
Pair.newPair(Bytes.toStringBinary(p.getFirst()), 
Bytes.toStringBinary(p.getSecond()));
-                throw new IllegalArgumentException("Fuzzy pair lengths do not 
match: " + readable);
-            }
-            // update mask ( 0 -> -1 (0xff), 1 -> 0)
-            p.setSecond(preprocessMask(p.getSecond()));
-            preprocessSearchKey(p);
-        }
-        this.fuzzyKeysData = fuzzyKeysData;
-        this.tracker = new RowTracker();
-    }
-
-    private void preprocessSearchKey(Pair<byte[], byte[]> p) {
-        if (UnsafeAccess.isAvailable() == false) {
-            // do nothing
-            return;
-        }
-        byte[] key = p.getFirst();
-        byte[] mask = p.getSecond();
-        for (int i = 0; i < mask.length; i++) {
-            // set non-fixed part of a search key to 0.
-            if (mask[i] == 0)
-                key[i] = 0;
-        }
-    }
-
-    /**
-     * We need to preprocess mask array, as since we treat 0's as unfixed 
positions and -1 (0xff) as
-     * fixed positions
-     * @param mask
-     * @return mask array
-     */
-    private byte[] preprocessMask(byte[] mask) {
-        if (UnsafeAccess.isAvailable() == false) {
-            // do nothing
-            return mask;
-        }
-        if (isPreprocessedMask(mask))
-            return mask;
-        for (int i = 0; i < mask.length; i++) {
-            if (mask[i] == 0) {
-                mask[i] = -1; // 0 -> -1
-            } else if (mask[i] == 1) {
-                mask[i] = 0;// 1 -> 0
-            }
-        }
-        return mask;
-    }
-
-    private boolean isPreprocessedMask(byte[] mask) {
-        for (int i = 0; i < mask.length; i++) {
-            if (mask[i] != -1 && mask[i] != 0) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    @Override
-    public ReturnCode filterKeyValue(Cell c) {
-        final int startIndex = lastFoundIndex >= 0 ? lastFoundIndex : 0;
-        final int size = fuzzyKeysData.size();
-        for (int i = startIndex; i < size + startIndex; i++) {
-            final int index = i % size;
-            Pair<byte[], byte[]> fuzzyData = fuzzyKeysData.get(index);
-            SatisfiesCode satisfiesCode = satisfies(isReversed(), 
c.getRowArray(), c.getRowOffset(), c.getRowLength(), fuzzyData.getFirst(), 
fuzzyData.getSecond());
-            if (satisfiesCode == SatisfiesCode.YES) {
-                lastFoundIndex = index;
-                return ReturnCode.INCLUDE;
-            }
-        }
-        // NOT FOUND -> seek next using hint
-        lastFoundIndex = -1;
-        return ReturnCode.SEEK_NEXT_USING_HINT;
-
-    }
-
-    @Override
-    public Cell getNextCellHint(Cell currentCell) {
-        boolean result = tracker.updateTracker(currentCell);
-        if (result == false) {
-            done = true;
-            return null;
-        }
-        byte[] nextRowKey = tracker.nextRow();
-        return KeyValue.createFirstOnRow(nextRowKey);
-    }
-
-    /**
-     * If we have multiple fuzzy keys, row tracker should improve overall 
performance. It calculates
-     * all next rows (one per every fuzzy key) and put them (the fuzzy key is 
bundled) into a priority
-     * queue so that the smallest row key always appears at queue head, which 
helps to decide the
-     * "Next Cell Hint". As scanning going on, the number of candidate rows in 
the RowTracker will
-     * remain the size of fuzzy keys until some of the fuzzy keys won't 
possibly have matches any
-     * more.
-     */
-    private class RowTracker {
-        private final PriorityQueue<Pair<byte[], Pair<byte[], byte[]>>> 
nextRows;
-        private boolean initialized = false;
-
-        RowTracker() {
-            nextRows = new PriorityQueue<Pair<byte[], Pair<byte[], 
byte[]>>>(fuzzyKeysData.size(), new Comparator<Pair<byte[], Pair<byte[], 
byte[]>>>() {
-                @Override
-                public int compare(Pair<byte[], Pair<byte[], byte[]>> o1, 
Pair<byte[], Pair<byte[], byte[]>> o2) {
-                    int compare = Bytes.compareTo(o1.getFirst(), 
o2.getFirst());
-                    if (!isReversed()) {
-                        return compare;
-                    } else {
-                        return -compare;
-                    }
-                }
-            });
-        }
-
-        byte[] nextRow() {
-            if (nextRows.isEmpty()) {
-                throw new IllegalStateException("NextRows should not be empty, 
make sure to call nextRow() after updateTracker() return true");
-            } else {
-                return nextRows.peek().getFirst();
-            }
-        }
-
-        boolean updateTracker(Cell currentCell) {
-            if (!initialized) {
-                for (Pair<byte[], byte[]> fuzzyData : fuzzyKeysData) {
-                    updateWith(currentCell, fuzzyData);
-                }
-                initialized = true;
-            } else {
-                while (!nextRows.isEmpty() && !lessThan(currentCell, 
nextRows.peek().getFirst())) {
-                    Pair<byte[], Pair<byte[], byte[]>> head = nextRows.poll();
-                    Pair<byte[], byte[]> fuzzyData = head.getSecond();
-                    updateWith(currentCell, fuzzyData);
-                }
-            }
-            return !nextRows.isEmpty();
-        }
-
-        boolean lessThan(Cell currentCell, byte[] nextRowKey) {
-            int compareResult = Bytes.compareTo(currentCell.getRowArray(), 
currentCell.getRowOffset(), currentCell.getRowLength(), nextRowKey, 0, 
nextRowKey.length);
-            return (!isReversed() && compareResult < 0) || (isReversed() && 
compareResult > 0);
-        }
-
-        void updateWith(Cell currentCell, Pair<byte[], byte[]> fuzzyData) {
-            byte[] nextRowKeyCandidate = getNextForFuzzyRule(isReversed(), 
currentCell.getRowArray(), currentCell.getRowOffset(), 
currentCell.getRowLength(), fuzzyData.getFirst(), fuzzyData.getSecond());
-            if (nextRowKeyCandidate != null) {
-                nextRows.add(Pair.newPair(nextRowKeyCandidate, fuzzyData));
-            }
-        }
-
-    }
-
-    @Override
-    public boolean filterAllRemaining() {
-        return done;
-    }
-
-    /**
-     * @return The filter serialized using pb
-     */
-    public byte[] toByteArray() {
-        FilterProtosExt.FuzzyRowFilterV2.Builder builder = 
FilterProtosExt.FuzzyRowFilterV2.newBuilder();
-        for (Pair<byte[], byte[]> fuzzyData : fuzzyKeysData) {
-            FilterProtosExt.BytesBytesPair.Builder bbpBuilder = 
FilterProtosExt.BytesBytesPair.newBuilder();
-            bbpBuilder.setFirst(ByteStringer.wrap(fuzzyData.getFirst()));
-            bbpBuilder.setSecond(ByteStringer.wrap(fuzzyData.getSecond()));
-            builder.addFuzzyKeysData(bbpBuilder);
-        }
-        return builder.build().toByteArray();
-    }
-
-    public static FuzzyRowFilterV2 parseFrom(final byte[] pbBytes) throws 
DeserializationException {
-        FilterProtosExt.FuzzyRowFilterV2 proto;
-        try {
-            proto = FilterProtosExt.FuzzyRowFilterV2.parseFrom(pbBytes);
-        } catch (InvalidProtocolBufferException e) {
-            throw new DeserializationException(e);
-        }
-        int count = proto.getFuzzyKeysDataCount();
-        ArrayList<Pair<byte[], byte[]>> fuzzyKeysData = new 
ArrayList<Pair<byte[], byte[]>>(count);
-        for (int i = 0; i < count; ++i) {
-            FilterProtosExt.BytesBytesPair current = proto.getFuzzyKeysData(i);
-            byte[] keyBytes = current.getFirst().toByteArray();
-            byte[] keyMeta = current.getSecond().toByteArray();
-            fuzzyKeysData.add(Pair.newPair(keyBytes, keyMeta));
-        }
-        return new FuzzyRowFilterV2(fuzzyKeysData);
-    }
-
-    @Override
-    public String toString() {
-        final StringBuilder sb = new StringBuilder();
-        sb.append("FuzzyRowFilter");
-        sb.append("{fuzzyKeysData=");
-        for (Pair<byte[], byte[]> fuzzyData : fuzzyKeysData) {
-            
sb.append('{').append(Bytes.toStringBinary(fuzzyData.getFirst())).append(":");
-            sb.append(Bytes.toStringBinary(fuzzyData.getSecond())).append('}');
-        }
-        sb.append("}, ");
-        return sb.toString();
-    }
-
-    // Utility methods
-
-    static enum SatisfiesCode {
-        /** row satisfies fuzzy rule */
-        YES,
-        /** row doesn't satisfy fuzzy rule, but there's possible greater row 
that does */
-        NEXT_EXISTS,
-        /** row doesn't satisfy fuzzy rule and there's no greater row that 
does */
-        NO_NEXT
-    }
-
-    @VisibleForTesting
-    static SatisfiesCode satisfies(boolean reverse, byte[] row, byte[] 
fuzzyKeyBytes, byte[] fuzzyKeyMeta) {
-        return satisfies(reverse, row, 0, row.length, fuzzyKeyBytes, 
fuzzyKeyMeta);
-    }
-
-    static SatisfiesCode satisfies(boolean reverse, byte[] row, int offset, 
int length, byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) {
-
-        if (UnsafeAccess.isAvailable() == false) {
-            return satisfiesNoUnsafe(reverse, row, offset, length, 
fuzzyKeyBytes, fuzzyKeyMeta);
-        }
-
-        if (row == null) {
-            // do nothing, let scan to proceed
-            return SatisfiesCode.YES;
-        }
-        length = Math.min(length, fuzzyKeyBytes.length);
-        int numWords = length / Bytes.SIZEOF_LONG;
-
-        int j = numWords << 3; // numWords * SIZEOF_LONG;
-
-        for (int i = 0; i < j; i += Bytes.SIZEOF_LONG) {
-            long fuzzyBytes = UnsafeAccess.toLong(fuzzyKeyBytes, i);
-            long fuzzyMeta = UnsafeAccess.toLong(fuzzyKeyMeta, i);
-            long rowValue = UnsafeAccess.toLong(row, offset + i);
-            if ((rowValue & fuzzyMeta) != (fuzzyBytes)) {
-                // We always return NEXT_EXISTS
-                return SatisfiesCode.NEXT_EXISTS;
-            }
-        }
-
-        int off = j;
-
-        if (length - off >= Bytes.SIZEOF_INT) {
-            int fuzzyBytes = UnsafeAccess.toInt(fuzzyKeyBytes, off);
-            int fuzzyMeta = UnsafeAccess.toInt(fuzzyKeyMeta, off);
-            int rowValue = UnsafeAccess.toInt(row, offset + off);
-            if ((rowValue & fuzzyMeta) != (fuzzyBytes)) {
-                // We always return NEXT_EXISTS
-                return SatisfiesCode.NEXT_EXISTS;
-            }
-            off += Bytes.SIZEOF_INT;
-        }
-
-        if (length - off >= Bytes.SIZEOF_SHORT) {
-            short fuzzyBytes = UnsafeAccess.toShort(fuzzyKeyBytes, off);
-            short fuzzyMeta = UnsafeAccess.toShort(fuzzyKeyMeta, off);
-            short rowValue = UnsafeAccess.toShort(row, offset + off);
-            if ((rowValue & fuzzyMeta) != (fuzzyBytes)) {
-                // We always return NEXT_EXISTS
-                // even if it does not (in this case getNextForFuzzyRule
-                // will return null)
-                return SatisfiesCode.NEXT_EXISTS;
-            }
-            off += Bytes.SIZEOF_SHORT;
-        }
-
-        if (length - off >= Bytes.SIZEOF_BYTE) {
-            int fuzzyBytes = fuzzyKeyBytes[off] & 0xff;
-            int fuzzyMeta = fuzzyKeyMeta[off] & 0xff;
-            int rowValue = row[offset + off] & 0xff;
-            if ((rowValue & fuzzyMeta) != (fuzzyBytes)) {
-                // We always return NEXT_EXISTS
-                return SatisfiesCode.NEXT_EXISTS;
-            }
-        }
-        return SatisfiesCode.YES;
-    }
-
-    static SatisfiesCode satisfiesNoUnsafe(boolean reverse, byte[] row, int 
offset, int length, byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) {
-        if (row == null) {
-            // do nothing, let scan to proceed
-            return SatisfiesCode.YES;
-        }
-
-        Order order = Order.orderFor(reverse);
-        boolean nextRowKeyCandidateExists = false;
-
-        for (int i = 0; i < fuzzyKeyMeta.length && i < length; i++) {
-            // First, checking if this position is fixed and not equals the 
given one
-            boolean byteAtPositionFixed = fuzzyKeyMeta[i] == 0;
-            boolean fixedByteIncorrect = byteAtPositionFixed && 
fuzzyKeyBytes[i] != row[i + offset];
-            if (fixedByteIncorrect) {
-                // in this case there's another row that satisfies fuzzy rule 
and bigger than this row
-                if (nextRowKeyCandidateExists) {
-                    return SatisfiesCode.NEXT_EXISTS;
-                }
-
-                // If this row byte is less than fixed then there's a byte 
array bigger than
-                // this row and which satisfies the fuzzy rule. Otherwise 
there's no such byte array:
-                // this row is simply bigger than any byte array that 
satisfies the fuzzy rule
-                boolean rowByteLessThanFixed = (row[i + offset] & 0xFF) < 
(fuzzyKeyBytes[i] & 0xFF);
-                if (rowByteLessThanFixed && !reverse) {
-                    return SatisfiesCode.NEXT_EXISTS;
-                } else if (!rowByteLessThanFixed && reverse) {
-                    return SatisfiesCode.NEXT_EXISTS;
-                } else {
-                    return SatisfiesCode.NO_NEXT;
-                }
-            }
-
-            // Second, checking if this position is not fixed and byte value 
is not the biggest. In this
-            // case there's a byte array bigger than this row and which 
satisfies the fuzzy rule. To get
-            // bigger byte array that satisfies the rule we need to just 
increase this byte
-            // (see the code of getNextForFuzzyRule below) by one.
-            // Note: if non-fixed byte is already at biggest value, this 
doesn't allow us to say there's
-            // bigger one that satisfies the rule as it can't be increased.
-            if (fuzzyKeyMeta[i] == 1 && !order.isMax(fuzzyKeyBytes[i])) {
-                nextRowKeyCandidateExists = true;
-            }
-        }
-        return SatisfiesCode.YES;
-    }
-
-    @VisibleForTesting
-    static byte[] getNextForFuzzyRule(byte[] row, byte[] fuzzyKeyBytes, byte[] 
fuzzyKeyMeta) {
-        return getNextForFuzzyRule(false, row, 0, row.length, fuzzyKeyBytes, 
fuzzyKeyMeta);
-    }
-
-    @VisibleForTesting
-    static byte[] getNextForFuzzyRule(boolean reverse, byte[] row, byte[] 
fuzzyKeyBytes, byte[] fuzzyKeyMeta) {
-        return getNextForFuzzyRule(reverse, row, 0, row.length, fuzzyKeyBytes, 
fuzzyKeyMeta);
-    }
-
-    /** Abstracts directional comparisons based on scan direction. */
-    private enum Order {
-        ASC {
-            public boolean lt(int lhs, int rhs) {
-                return lhs < rhs;
-            }
-
-            public boolean gt(int lhs, int rhs) {
-                return lhs > rhs;
-            }
-
-            public byte inc(byte val) {
-                // TODO: what about over/underflow?
-                return (byte) (val + 1);
-            }
-
-            public boolean isMax(byte val) {
-                return val == (byte) 0xff;
-            }
-
-            public byte min() {
-                return 0;
-            }
-        },
-        DESC {
-            public boolean lt(int lhs, int rhs) {
-                return lhs > rhs;
-            }
-
-            public boolean gt(int lhs, int rhs) {
-                return lhs < rhs;
-            }
-
-            public byte inc(byte val) {
-                // TODO: what about over/underflow?
-                return (byte) (val - 1);
-            }
-
-            public boolean isMax(byte val) {
-                return val == 0;
-            }
-
-            public byte min() {
-                return (byte) 0xFF;
-            }
-        };
-
-        public static Order orderFor(boolean reverse) {
-            return reverse ? DESC : ASC;
-        }
-
-        /** Returns true when {@code lhs < rhs}. */
-        public abstract boolean lt(int lhs, int rhs);
-
-        /** Returns true when {@code lhs > rhs}. */
-        public abstract boolean gt(int lhs, int rhs);
-
-        /** Returns {@code val} incremented by 1. */
-        public abstract byte inc(byte val);
-
-        /** Return true when {@code val} is the maximum value */
-        public abstract boolean isMax(byte val);
-
-        /** Return the minimum value according to this ordering scheme. */
-        public abstract byte min();
-    }
-
-    /**
-     * @return greater byte array than given (row) which satisfies the fuzzy 
rule if it exists, null
-     *         otherwise
-     */
-    @VisibleForTesting
-    static byte[] getNextForFuzzyRule(boolean reverse, byte[] row, int offset, 
int length, byte[] fuzzyKeyBytes, byte[] fuzzyKeyMeta) {
-        // To find out the next "smallest" byte array that satisfies fuzzy 
rule and "greater" than
-        // the given one we do the following:
-        // 1. setting values on all "fixed" positions to the values from 
fuzzyKeyBytes
-        // 2. if during the first step given row did not increase, then we 
increase the value at
-        // the first "non-fixed" position (where it is not maximum already)
-
-        // It is easier to perform this by using fuzzyKeyBytes copy and 
setting "non-fixed" position
-        // values than otherwise.
-        byte[] result = Arrays.copyOf(fuzzyKeyBytes, length > 
fuzzyKeyBytes.length ? length : fuzzyKeyBytes.length);
-        if (reverse && length > fuzzyKeyBytes.length) {
-            // we need trailing 0xff's instead of trailing 0x00's
-            for (int i = fuzzyKeyBytes.length; i < result.length; i++) {
-                result[i] = (byte) 0xFF;
-            }
-        }
-        int toInc = -1;
-        final Order order = Order.orderFor(reverse);
-
-        boolean increased = false;
-        for (int i = 0; i < result.length; i++) {
-            if (i >= fuzzyKeyMeta.length || fuzzyKeyMeta[i] == 0 /* non-fixed 
*/) {
-                result[i] = row[offset + i];
-                if (!order.isMax(row[offset + i])) {
-                    // this is "non-fixed" position and is not at max value, 
hence we can increase it
-                    toInc = i;
-                }
-            } else if (i < fuzzyKeyMeta.length && fuzzyKeyMeta[i] == -1 /* 
fixed */) {
-                if (order.lt((row[i + offset] & 0xFF), (fuzzyKeyBytes[i] & 
0xFF))) {
-                    // if setting value for any fixed position increased the 
original array,
-                    // we are OK
-                    increased = true;
-                    break;
-                }
-
-                if (order.gt((row[i + offset] & 0xFF), (fuzzyKeyBytes[i] & 
0xFF))) {
-                    // if setting value for any fixed position makes array 
"smaller", then just stop:
-                    // in case we found some non-fixed position to increase we 
will do it, otherwise
-                    // there's no "next" row key that satisfies fuzzy rule and 
"greater" than given row
-                    break;
-                }
-            }
-        }
-
-        if (!increased) {
-            if (toInc < 0) {
-                return null;
-            }
-            result[toInc] = order.inc(result[toInc]);
-
-            // Setting all "non-fixed" positions to zeroes to the right of the 
one we increased so
-            // that found "next" row key is the smallest possible
-            for (int i = toInc + 1; i < result.length; i++) {
-                if (i >= fuzzyKeyMeta.length || fuzzyKeyMeta[i] == 0 /* 
non-fixed */) {
-                    result[i] = order.min();
-                }
-            }
-        }
-
-        return result;
-    }
-
-    /**
-     * @return true if and only if the fields of the filter that are 
serialized are equal to the
-     *         corresponding fields in other. Used for testing.
-     */
-    boolean areSerializedFieldsEqual(Filter o) {
-        if (o == this)
-            return true;
-        if (!(o instanceof FuzzyRowFilterV2))
-            return false;
-
-        FuzzyRowFilterV2 other = (FuzzyRowFilterV2) o;
-        if (this.fuzzyKeysData.size() != other.fuzzyKeysData.size())
-            return false;
-        for (int i = 0; i < fuzzyKeysData.size(); ++i) {
-            Pair<byte[], byte[]> thisData = this.fuzzyKeysData.get(i);
-            Pair<byte[], byte[]> otherData = other.fuzzyKeysData.get(i);
-            if (!(Bytes.equals(thisData.getFirst(), otherData.getFirst()) && 
Bytes.equals(thisData.getSecond(), otherData.getSecond()))) {
-                return false;
-            }
-        }
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/545201f6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/UnsafeAccess.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/UnsafeAccess.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/UnsafeAccess.java
deleted file mode 100644
index 34328ef..0000000
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/UnsafeAccess.java
+++ /dev/null
@@ -1,433 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase.cube.v1.filter;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import java.lang.reflect.Field;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import sun.misc.Unsafe;
-import sun.nio.ch.DirectBuffer;
-
[email protected]
[email protected]
-public final class UnsafeAccess {
-
-    private static final Log LOG = LogFactory.getLog(UnsafeAccess.class);
-
-    static final Unsafe theUnsafe;
-
-    /** The offset to the first element in a byte array. */
-    static final long BYTE_ARRAY_BASE_OFFSET;
-
-    static final boolean littleEndian = 
ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN);
-
-    static {
-        theUnsafe = (Unsafe) AccessController.doPrivileged(new 
PrivilegedAction<Object>() {
-            @Override
-            public Object run() {
-                try {
-                    Field f = Unsafe.class.getDeclaredField("theUnsafe");
-                    f.setAccessible(true);
-                    return f.get(null);
-                } catch (Throwable e) {
-                    LOG.warn("sun.misc.Unsafe is not accessible", e);
-                }
-                return null;
-            }
-        });
-
-        if (theUnsafe != null) {
-            BYTE_ARRAY_BASE_OFFSET = theUnsafe.arrayBaseOffset(byte[].class);
-        } else {
-            BYTE_ARRAY_BASE_OFFSET = -1;
-        }
-    }
-
-    private UnsafeAccess() {
-    }
-
-    /**
-     * @return true when the running JVM is having sun's Unsafe package 
available in it.
-     */
-    public static boolean isAvailable() {
-        return theUnsafe != null;
-    }
-
-    // APIs to read primitive data from a byte[] using Unsafe way
-    /**
-     * Converts a byte array to a short value considering it was written in 
big-endian format.
-     * @param bytes byte array
-     * @param offset offset into array
-     * @return the short value
-     */
-    public static short toShort(byte[] bytes, int offset) {
-        if (littleEndian) {
-            return Short.reverseBytes(theUnsafe.getShort(bytes, offset + 
BYTE_ARRAY_BASE_OFFSET));
-        } else {
-            return theUnsafe.getShort(bytes, offset + BYTE_ARRAY_BASE_OFFSET);
-        }
-    }
-
-    /**
-     * Converts a byte array to an int value considering it was written in 
big-endian format.
-     * @param bytes byte array
-     * @param offset offset into array
-     * @return the int value
-     */
-    public static int toInt(byte[] bytes, int offset) {
-        if (littleEndian) {
-            return Integer.reverseBytes(theUnsafe.getInt(bytes, offset + 
BYTE_ARRAY_BASE_OFFSET));
-        } else {
-            return theUnsafe.getInt(bytes, offset + BYTE_ARRAY_BASE_OFFSET);
-        }
-    }
-
-    /**
-     * Converts a byte array to a long value considering it was written in 
big-endian format.
-     * @param bytes byte array
-     * @param offset offset into array
-     * @return the long value
-     */
-    public static long toLong(byte[] bytes, int offset) {
-        if (littleEndian) {
-            return Long.reverseBytes(theUnsafe.getLong(bytes, offset + 
BYTE_ARRAY_BASE_OFFSET));
-        } else {
-            return theUnsafe.getLong(bytes, offset + BYTE_ARRAY_BASE_OFFSET);
-        }
-    }
-
-    // APIs to write primitive data to a byte[] using Unsafe way
-    /**
-     * Put a short value out to the specified byte array position in 
big-endian format.
-     * @param bytes the byte array
-     * @param offset position in the array
-     * @param val short to write out
-     * @return incremented offset
-     */
-    public static int putShort(byte[] bytes, int offset, short val) {
-        if (littleEndian) {
-            val = Short.reverseBytes(val);
-        }
-        theUnsafe.putShort(bytes, offset + BYTE_ARRAY_BASE_OFFSET, val);
-        return offset + Bytes.SIZEOF_SHORT;
-    }
-
-    /**
-     * Put an int value out to the specified byte array position in big-endian 
format.
-     * @param bytes the byte array
-     * @param offset position in the array
-     * @param val int to write out
-     * @return incremented offset
-     */
-    public static int putInt(byte[] bytes, int offset, int val) {
-        if (littleEndian) {
-            val = Integer.reverseBytes(val);
-        }
-        theUnsafe.putInt(bytes, offset + BYTE_ARRAY_BASE_OFFSET, val);
-        return offset + Bytes.SIZEOF_INT;
-    }
-
-    /**
-     * Put a long value out to the specified byte array position in big-endian 
format.
-     * @param bytes the byte array
-     * @param offset position in the array
-     * @param val long to write out
-     * @return incremented offset
-     */
-    public static int putLong(byte[] bytes, int offset, long val) {
-        if (littleEndian) {
-            val = Long.reverseBytes(val);
-        }
-        theUnsafe.putLong(bytes, offset + BYTE_ARRAY_BASE_OFFSET, val);
-        return offset + Bytes.SIZEOF_LONG;
-    }
-
-    // APIs to read primitive data from a ByteBuffer using Unsafe way
-    /**
-     * Reads a short value at the given buffer's offset considering it was 
written in big-endian
-     * format.
-     *
-     * @param buf
-     * @param offset
-     * @return short value at offset
-     */
-    public static short toShort(ByteBuffer buf, int offset) {
-        if (littleEndian) {
-            return Short.reverseBytes(getAsShort(buf, offset));
-        }
-        return getAsShort(buf, offset);
-    }
-
-    /**
-     * Reads bytes at the given offset as a short value.
-     * @param buf
-     * @param offset
-     * @return short value at offset
-     */
-    static short getAsShort(ByteBuffer buf, int offset) {
-        if (buf.isDirect()) {
-            return theUnsafe.getShort(((DirectBuffer) buf).address() + offset);
-        }
-        return theUnsafe.getShort(buf.array(), BYTE_ARRAY_BASE_OFFSET + 
buf.arrayOffset() + offset);
-    }
-
-    /**
-     * Reads an int value at the given buffer's offset considering it was 
written in big-endian
-     * format.
-     *
-     * @param buf
-     * @param offset
-     * @return int value at offset
-     */
-    public static int toInt(ByteBuffer buf, int offset) {
-        if (littleEndian) {
-            return Integer.reverseBytes(getAsInt(buf, offset));
-        }
-        return getAsInt(buf, offset);
-    }
-
-    /**
-     * Reads bytes at the given offset as an int value.
-     * @param buf
-     * @param offset
-     * @return int value at offset
-     */
-    static int getAsInt(ByteBuffer buf, int offset) {
-        if (buf.isDirect()) {
-            return theUnsafe.getInt(((DirectBuffer) buf).address() + offset);
-        }
-        return theUnsafe.getInt(buf.array(), BYTE_ARRAY_BASE_OFFSET + 
buf.arrayOffset() + offset);
-    }
-
-    /**
-     * Reads a long value at the given buffer's offset considering it was 
written in big-endian
-     * format.
-     *
-     * @param buf
-     * @param offset
-     * @return long value at offset
-     */
-    public static long toLong(ByteBuffer buf, int offset) {
-        if (littleEndian) {
-            return Long.reverseBytes(getAsLong(buf, offset));
-        }
-        return getAsLong(buf, offset);
-    }
-
-    /**
-     * Reads bytes at the given offset as a long value.
-     * @param buf
-     * @param offset
-     * @return long value at offset
-     */
-    static long getAsLong(ByteBuffer buf, int offset) {
-        if (buf.isDirect()) {
-            return theUnsafe.getLong(((DirectBuffer) buf).address() + offset);
-        }
-        return theUnsafe.getLong(buf.array(), BYTE_ARRAY_BASE_OFFSET + 
buf.arrayOffset() + offset);
-    }
-
-    /**
-     * Put an int value out to the specified ByteBuffer offset in big-endian 
format.
-     * @param buf the ByteBuffer to write to
-     * @param offset offset in the ByteBuffer
-     * @param val int to write out
-     * @return incremented offset
-     */
-    public static int putInt(ByteBuffer buf, int offset, int val) {
-        if (littleEndian) {
-            val = Integer.reverseBytes(val);
-        }
-        if (buf.isDirect()) {
-            theUnsafe.putInt(((DirectBuffer) buf).address() + offset, val);
-        } else {
-            theUnsafe.putInt(buf.array(), offset + buf.arrayOffset() + 
BYTE_ARRAY_BASE_OFFSET, val);
-        }
-        return offset + Bytes.SIZEOF_INT;
-    }
-
-    // APIs to copy data. This will be direct memory location copy and will be 
much faster
-    /**
-     * Copies the bytes from given array's offset to length part into the 
given buffer.
-     * @param src
-     * @param srcOffset
-     * @param dest
-     * @param destOffset
-     * @param length
-     */
-    public static void copy(byte[] src, int srcOffset, ByteBuffer dest, int 
destOffset, int length) {
-        long destAddress = destOffset;
-        Object destBase = null;
-        if (dest.isDirect()) {
-            destAddress = destAddress + ((DirectBuffer) dest).address();
-        } else {
-            destAddress = destAddress + BYTE_ARRAY_BASE_OFFSET + 
dest.arrayOffset();
-            destBase = dest.array();
-        }
-        long srcAddress = srcOffset + BYTE_ARRAY_BASE_OFFSET;
-        theUnsafe.copyMemory(src, srcAddress, destBase, destAddress, length);
-    }
-
-    /**
-     * Copies specified number of bytes from given offset of {@code src} 
ByteBuffer to the
-     * {@code dest} array.
-     *
-     * @param src
-     * @param srcOffset
-     * @param dest
-     * @param destOffset
-     * @param length
-     */
-    public static void copy(ByteBuffer src, int srcOffset, byte[] dest, int 
destOffset, int length) {
-        long srcAddress = srcOffset;
-        Object srcBase = null;
-        if (src.isDirect()) {
-            srcAddress = srcAddress + ((DirectBuffer) src).address();
-        } else {
-            srcAddress = srcAddress + BYTE_ARRAY_BASE_OFFSET + 
src.arrayOffset();
-            srcBase = src.array();
-        }
-        long destAddress = destOffset + BYTE_ARRAY_BASE_OFFSET;
-        theUnsafe.copyMemory(srcBase, srcAddress, dest, destAddress, length);
-    }
-
-    /**
-     * Copies specified number of bytes from given offset of {@code src} 
buffer into the {@code dest}
-     * buffer.
-     *
-     * @param src
-     * @param srcOffset
-     * @param dest
-     * @param destOffset
-     * @param length
-     */
-    public static void copy(ByteBuffer src, int srcOffset, ByteBuffer dest, 
int destOffset, int length) {
-        long srcAddress, destAddress;
-        Object srcBase = null, destBase = null;
-        if (src.isDirect()) {
-            srcAddress = srcOffset + ((DirectBuffer) src).address();
-        } else {
-            srcAddress = srcOffset + src.arrayOffset() + 
BYTE_ARRAY_BASE_OFFSET;
-            srcBase = src.array();
-        }
-        if (dest.isDirect()) {
-            destAddress = destOffset + ((DirectBuffer) dest).address();
-        } else {
-            destAddress = destOffset + BYTE_ARRAY_BASE_OFFSET + 
dest.arrayOffset();
-            destBase = dest.array();
-        }
-        theUnsafe.copyMemory(srcBase, srcAddress, destBase, destAddress, 
length);
-    }
-
-    // APIs to add primitives to BBs
-    /**
-     * Put a short value out to the specified BB position in big-endian format.
-     * @param buf the byte buffer
-     * @param offset position in the buffer
-     * @param val short to write out
-     * @return incremented offset
-     */
-    public static int putShort(ByteBuffer buf, int offset, short val) {
-        if (littleEndian) {
-            val = Short.reverseBytes(val);
-        }
-        if (buf.isDirect()) {
-            theUnsafe.putShort(((DirectBuffer) buf).address() + offset, val);
-        } else {
-            theUnsafe.putShort(buf.array(), BYTE_ARRAY_BASE_OFFSET + 
buf.arrayOffset() + offset, val);
-        }
-        return offset + Bytes.SIZEOF_SHORT;
-    }
-
-    /**
-     * Put a long value out to the specified BB position in big-endian format.
-     * @param buf the byte buffer
-     * @param offset position in the buffer
-     * @param val long to write out
-     * @return incremented offset
-     */
-    public static int putLong(ByteBuffer buf, int offset, long val) {
-        if (littleEndian) {
-            val = Long.reverseBytes(val);
-        }
-        if (buf.isDirect()) {
-            theUnsafe.putLong(((DirectBuffer) buf).address() + offset, val);
-        } else {
-            theUnsafe.putLong(buf.array(), BYTE_ARRAY_BASE_OFFSET + 
buf.arrayOffset() + offset, val);
-        }
-        return offset + Bytes.SIZEOF_LONG;
-    }
-
-    /**
-     * Put a byte value out to the specified BB position in big-endian format.
-     * @param buf the byte buffer
-     * @param offset position in the buffer
-     * @param b byte to write out
-     * @return incremented offset
-     */
-    public static int putByte(ByteBuffer buf, int offset, byte b) {
-        if (buf.isDirect()) {
-            theUnsafe.putByte(((DirectBuffer) buf).address() + offset, b);
-        } else {
-            theUnsafe.putByte(buf.array(), BYTE_ARRAY_BASE_OFFSET + 
buf.arrayOffset() + offset, b);
-        }
-        return offset + 1;
-    }
-
-    /**
-     * Returns the byte at the given offset
-     * @param buf the buffer to read
-     * @param offset the offset at which the byte has to be read
-     * @return the byte at the given offset
-     */
-    public static byte toByte(ByteBuffer buf, int offset) {
-        if (buf.isDirect()) {
-            return theUnsafe.getByte(((DirectBuffer) buf).address() + offset);
-        } else {
-            return theUnsafe.getByte(buf.array(), BYTE_ARRAY_BASE_OFFSET + 
buf.arrayOffset() + offset);
-        }
-    }
-}

Reply via email to