KYLIN-2255 drop v1 coprocessor impl
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/545201f6 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/545201f6 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/545201f6 Branch: refs/heads/master-hbase1.x Commit: 545201f6c0c2e4da9b3dcabac0f606e2ef880d19 Parents: 21bcd2f Author: Li Yang <[email protected]> Authored: Thu Dec 8 10:41:18 2016 +0800 Committer: Li Yang <[email protected]> Committed: Thu Dec 8 10:54:58 2016 +0800 ---------------------------------------------------------------------- .../apache/kylin/query/ITCombinationTest.java | 15 - .../org/apache/kylin/query/KylinTestBase.java | 2 - .../kylin/rest/controller/CubeController.java | 14 - .../hbase/cube/v1/CubeSegmentTupleIterator.java | 344 ----- .../storage/hbase/cube/v1/CubeStorageQuery.java | 796 ----------- .../hbase/cube/v1/CubeTupleConverter.java | 271 ---- .../hbase/cube/v1/RegionScannerAdapter.java | 97 -- .../hbase/cube/v1/ResultScannerAdapter.java | 100 -- .../cube/v1/SerializedHBaseTupleIterator.java | 156 --- .../observer/AggregateRegionObserver.java | 112 -- .../observer/AggregationScanner.java | 188 --- .../observer/ObserverAggregationCache.java | 166 --- .../observer/ObserverAggregators.java | 265 ---- .../coprocessor/observer/ObserverEnabler.java | 191 --- .../v1/coprocessor/observer/ObserverTuple.java | 71 - .../hbase/cube/v1/filter/FuzzyRowFilterV2.java | 574 -------- .../hbase/cube/v1/filter/UnsafeAccess.java | 433 ------ .../v1/filter/generated/FilterProtosExt.java | 1298 ------------------ .../cube/v1/filter/protobuf/FilterExt.proto | 39 - .../cube/MeasureTypeOnlyAggrInBaseTest.java | 21 - .../observer/AggregateRegionObserverTest.java | 339 ----- .../observer/RowAggregatorsTest.java | 62 - .../cube/v1/filter/TestFuzzyRowFilterV2.java | 249 ---- .../v1/filter/TestFuzzyRowFilterV2EndToEnd.java | 346 ----- 24 files changed, 6149 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/545201f6/kylin-it/src/test/java/org/apache/kylin/query/ITCombinationTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITCombinationTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITCombinationTest.java index 496ac4e..84573b5 100644 --- a/kylin-it/src/test/java/org/apache/kylin/query/ITCombinationTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/query/ITCombinationTest.java @@ -27,7 +27,6 @@ import org.apache.kylin.metadata.realization.RealizationType; import org.apache.kylin.query.routing.Candidate; import org.apache.kylin.query.routing.rules.RemoveBlackoutRealizationsRule; import org.apache.kylin.storage.hbase.HBaseStorage; -import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.runner.RunWith; @@ -67,7 +66,6 @@ public class ITCombinationTest extends ITKylinQueryTest { public static Collection<Object[]> configs() { return Arrays.asList(new Object[][] { // { "inner", "on", "v2", true }, // - { "left", "on", "v1", true }, // { "left", "on", "v2", true }, // //{ "inner", "on", "v2", false }, // exclude view to simply model/cube selection //{ "left", "on", "v1", false }, // exclude view to simply model/cube selection @@ -84,23 +82,10 @@ public class ITCombinationTest extends ITKylinQueryTest { ITKylinQueryTest.joinType = joinType; ITKylinQueryTest.setupAll(); - if (coprocessorToggle.equals("on")) { - ObserverEnabler.forceCoprocessorOn(); - } else if (coprocessorToggle.equals("off")) { - ObserverEnabler.forceCoprocessorOff(); - } else if (coprocessorToggle.equals("unset")) { - // unset - } - RemoveBlackoutRealizationsRule.blackList.clear(); if (excludeViewCubes) { RemoveBlackoutRealizationsRule.blackList.add("CUBE[name=test_kylin_cube_with_view_left_join_empty]"); RemoveBlackoutRealizationsRule.blackList.add("CUBE[name=test_kylin_cube_with_view_inner_join_empty]"); } - - if ("v1".equalsIgnoreCase(queryEngine)) - HBaseStorage.overwriteStorageQuery = HBaseStorage.v1CubeStorageQuery; - else - HBaseStorage.overwriteStorageQuery = null; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/545201f6/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java index 52461c4..bcf55e5 100644 --- a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java +++ b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java @@ -52,7 +52,6 @@ import org.apache.kylin.query.enumerator.OLAPQuery; import org.apache.kylin.query.relnode.OLAPContext; import org.apache.kylin.query.routing.rules.RemoveBlackoutRealizationsRule; import org.apache.kylin.query.schema.OLAPSchemaFactory; -import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler; import org.dbunit.Assertion; import org.dbunit.database.DatabaseConfig; import org.dbunit.database.DatabaseConnection; @@ -628,7 +627,6 @@ public class KylinTestBase { if (h2Connection != null) closeConnection(h2Connection); - ObserverEnabler.forceCoprocessorUnset(); HBaseMetadataTestCase.staticCleanupTestMetadata(); RemoveBlackoutRealizationsRule.blackList.clear(); http://git-wip-us.apache.org/repos/asf/kylin/blob/545201f6/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java index ab32551..aef1ffc 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java @@ -55,7 +55,6 @@ import org.apache.kylin.rest.response.HBaseResponse; import org.apache.kylin.rest.service.CubeService; import org.apache.kylin.rest.service.JobService; import org.apache.kylin.source.kafka.util.KafkaClient; -import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -198,19 +197,6 @@ public class CubeController extends BasicController { } } - @RequestMapping(value = "/{cubeName}/coprocessor", method = { RequestMethod.PUT }) - @ResponseBody - public Map<String, Boolean> updateCubeCoprocessor(@PathVariable String cubeName, @RequestParam(value = "force") String force) { - try { - ObserverEnabler.updateCubeOverride(cubeName, force); - return ObserverEnabler.getCubeOverrides(); - } catch (Exception e) { - String message = "Failed to update cube coprocessor: " + cubeName + " : " + force; - logger.error(message, e); - throw new InternalErrorException(message + " Caused by: " + e.getMessage(), e); - } - } - /** * Force rebuild a cube's lookup table snapshot * http://git-wip-us.apache.org/repos/asf/kylin/blob/545201f6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java deleted file mode 100644 index 8ac3832..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java +++ /dev/null @@ -1,344 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase.cube.v1; - -import java.text.MessageFormat; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.Set; - -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.metrics.ScanMetrics; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.filter.FuzzyRowFilter; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.persistence.StorageException; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.model.HBaseColumnDesc; -import org.apache.kylin.measure.MeasureType; -import org.apache.kylin.metadata.filter.TupleFilter; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.metadata.tuple.ITupleIterator; -import org.apache.kylin.metadata.tuple.Tuple; -import org.apache.kylin.metadata.tuple.TupleInfo; -import org.apache.kylin.storage.StorageContext; -import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler; -import org.apache.kylin.storage.hbase.cube.v1.filter.FuzzyRowFilterV2; -import org.apache.kylin.storage.hbase.steps.RowValueDecoder; -import org.apache.kylin.storage.translate.HBaseKeyRange; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; - -/** - * @author xjiang - * - */ -public class CubeSegmentTupleIterator implements ITupleIterator { - - public static final Logger logger = LoggerFactory.getLogger(CubeSegmentTupleIterator.class); - - protected final CubeSegment cubeSeg; - private final TupleFilter filter; - private final Collection<TblColRef> groupBy; - protected final List<RowValueDecoder> rowValueDecoders; - private final StorageContext context; - private final String tableName; - private final HTableInterface table; - - protected CubeTupleConverter tupleConverter; - protected final Iterator<HBaseKeyRange> rangeIterator; - protected final Tuple oneTuple; // avoid new instance - - private Scan scan; - private ResultScanner scanner; - protected Iterator<Result> resultIterator; - protected int scanCount; - protected int scanCountDelta; - protected Tuple next; - protected final Cuboid cuboid; - - private List<MeasureType.IAdvMeasureFiller> advMeasureFillers; - private int advMeasureRowsRemaining; - private int advMeasureRowIndex; - - public CubeSegmentTupleIterator(CubeSegment cubeSeg, List<HBaseKeyRange> keyRanges, HConnection conn, // - Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, // - List<RowValueDecoder> rowValueDecoders, StorageContext context, TupleInfo returnTupleInfo) { - this.cubeSeg = cubeSeg; - this.filter = filter; - this.groupBy = groupBy; - this.rowValueDecoders = rowValueDecoders; - this.context = context; - this.tableName = cubeSeg.getStorageLocationIdentifier(); - - cuboid = keyRanges.get(0).getCuboid(); - for (HBaseKeyRange range : keyRanges) { - assert cuboid.equals(range.getCuboid()); - } - - this.tupleConverter = new CubeTupleConverter(cubeSeg, cuboid, rowValueDecoders, returnTupleInfo); - this.oneTuple = new Tuple(returnTupleInfo); - this.rangeIterator = keyRanges.iterator(); - - try { - this.table = conn.getTable(tableName); - } catch (Throwable t) { - throw new StorageException("Error when open connection to table " + tableName, t); - } - } - - @Override - public boolean hasNext() { - - if (next != null) - return true; - - // consume any left rows from advanced measure filler - if (advMeasureRowsRemaining > 0) { - for (MeasureType.IAdvMeasureFiller filler : advMeasureFillers) { - filler.fillTuple(oneTuple, advMeasureRowIndex); - } - advMeasureRowIndex++; - advMeasureRowsRemaining--; - next = oneTuple; - return true; - } - - if (resultIterator == null) { - if (rangeIterator.hasNext() == false) - return false; - - resultIterator = doScan(rangeIterator.next()); - } - - if (resultIterator.hasNext() == false) { - closeScanner(); - resultIterator = null; - return hasNext(); - } - - Result result = resultIterator.next(); - scanCount++; - if (++scanCountDelta >= 1000) - flushScanCountDelta(); - - // translate into tuple - advMeasureFillers = tupleConverter.translateResult(result, oneTuple); - - // the simple case - if (advMeasureFillers == null) { - next = oneTuple; - return true; - } - - // advanced measure filling, like TopN, will produce multiple tuples out of one record - advMeasureRowsRemaining = -1; - for (MeasureType.IAdvMeasureFiller filler : advMeasureFillers) { - if (advMeasureRowsRemaining < 0) - advMeasureRowsRemaining = filler.getNumOfRows(); - if (advMeasureRowsRemaining != filler.getNumOfRows()) - throw new IllegalStateException(); - } - if (advMeasureRowsRemaining < 0) - throw new IllegalStateException(); - - advMeasureRowIndex = 0; - return hasNext(); - - } - - @Override - public Tuple next() { - if (next == null) { - hasNext(); - if (next == null) - throw new NoSuchElementException(); - } - Tuple r = next; - next = null; - return r; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - protected final Iterator<Result> doScan(HBaseKeyRange keyRange) { - Iterator<Result> iter = null; - try { - scan = buildScan(keyRange); - applyFuzzyFilter(scan, keyRange); - logScan(keyRange); - - scanner = ObserverEnabler.scanWithCoprocessorIfBeneficial(cubeSeg, keyRange.getCuboid(), filter, groupBy, rowValueDecoders, context, table, scan); - - iter = scanner.iterator(); - } catch (Throwable t) { - String msg = MessageFormat.format("Error when scan from lower key {1} to upper key {2} on table {0}.", tableName, Bytes.toString(keyRange.getStartKey()), Bytes.toString(keyRange.getStopKey())); - throw new StorageException(msg, t); - } - return iter; - } - - private void logScan(HBaseKeyRange keyRange) { - StringBuilder info = new StringBuilder(); - info.append("Scan hbase table ").append(tableName).append(": "); - if (keyRange.getCuboid().requirePostAggregation()) { - info.append(" cuboid require post aggregation, from "); - } else { - info.append(" cuboid exact match, from "); - } - info.append(keyRange.getCuboid().getInputID()); - info.append(" to "); - info.append(keyRange.getCuboid().getId()); - info.append(" Start: "); - info.append(keyRange.getStartKeyAsString()); - info.append(" - "); - info.append(Bytes.toStringBinary(keyRange.getStartKey())); - info.append(" Stop: "); - info.append(keyRange.getStopKeyAsString()); - info.append(" - "); - info.append(Bytes.toStringBinary(keyRange.getStopKey())); - if (this.scan.getFilter() != null) { - info.append(" Fuzzy key counts: " + keyRange.getFuzzyKeys().size()); - info.append(" Fuzzy: "); - info.append(keyRange.getFuzzyKeyAsString()); - } - logger.info(info.toString()); - } - - private Scan buildScan(HBaseKeyRange keyRange) { - Scan scan = new Scan(); - tuneScanParameters(scan); - scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE)); - for (RowValueDecoder valueDecoder : this.rowValueDecoders) { - HBaseColumnDesc hbaseColumn = valueDecoder.getHBaseColumn(); - byte[] byteFamily = Bytes.toBytes(hbaseColumn.getColumnFamilyName()); - byte[] byteQualifier = Bytes.toBytes(hbaseColumn.getQualifier()); - scan.addColumn(byteFamily, byteQualifier); - } - scan.setStartRow(keyRange.getStartKey()); - scan.setStopRow(keyRange.getStopKey()); - return scan; - } - - private void tuneScanParameters(Scan scan) { - KylinConfig config = cubeSeg.getCubeDesc().getConfig(); - - scan.setCaching(config.getHBaseScanCacheRows()); - scan.setMaxResultSize(config.getHBaseScanMaxResultSize()); - scan.setCacheBlocks(true); - - // cache less when there are memory hungry measures - // if (RowValueDecoder.hasMemHungryMeasures(rowValueDecoders)) { - // scan.setCaching(scan.getCaching() / 10); - // } - } - - private void applyFuzzyFilter(Scan scan, HBaseKeyRange keyRange) { - List<org.apache.kylin.common.util.Pair<byte[], byte[]>> fuzzyKeys = keyRange.getFuzzyKeys(); - if (fuzzyKeys != null && fuzzyKeys.size() > 0) { - - //FuzzyRowFilterV2 is a back ported from https://issues.apache.org/jira/browse/HBASE-13761 - //However we found a bug of it and fixed it in https://issues.apache.org/jira/browse/HBASE-14269 - //After fix the performance is not much faster than the original one. So by default use defalt one. - boolean useFuzzyRowFilterV2 = false; - Filter fuzzyFilter = null; - if (useFuzzyRowFilterV2) { - fuzzyFilter = new FuzzyRowFilterV2(convertToHBasePair(fuzzyKeys)); - } else { - fuzzyFilter = new FuzzyRowFilter(convertToHBasePair(fuzzyKeys)); - } - - Filter filter = scan.getFilter(); - if (filter != null) { - throw new RuntimeException("Scan filter not empty : " + filter); - } else { - scan.setFilter(fuzzyFilter); - } - } - } - - private List<org.apache.hadoop.hbase.util.Pair<byte[], byte[]>> convertToHBasePair(List<org.apache.kylin.common.util.Pair<byte[], byte[]>> pairList) { - List<org.apache.hadoop.hbase.util.Pair<byte[], byte[]>> result = Lists.newArrayList(); - for (org.apache.kylin.common.util.Pair<byte[], byte[]> pair : pairList) { - org.apache.hadoop.hbase.util.Pair<byte[], byte[]> element = new org.apache.hadoop.hbase.util.Pair<byte[], byte[]>(pair.getFirst(), pair.getSecond()); - result.add(element); - } - - return result; - } - - protected void closeScanner() { - flushScanCountDelta(); - - if (logger.isDebugEnabled() && scan != null) { - byte[] metricsBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA); - if (metricsBytes != null) { - ScanMetrics scanMetrics = ProtobufUtil.toScanMetrics(metricsBytes); - logger.debug("HBase Metrics when scanning " + this.tableName + " count={}, ms={}, bytes={}, remote_bytes={}, regions={}, not_serving_region={}, rpc={}, rpc_retries={}, remote_rpc={}, remote_rpc_retries={}", // - new Object[] { scanCount, scanMetrics.sumOfMillisSecBetweenNexts, scanMetrics.countOfBytesInResults, scanMetrics.countOfBytesInRemoteResults, scanMetrics.countOfRegions, scanMetrics.countOfNSRE, scanMetrics.countOfRPCcalls, scanMetrics.countOfRPCRetries, scanMetrics.countOfRemoteRPCcalls, scanMetrics.countOfRemoteRPCRetries }); - } - scan = null; - } - try { - if (scanner != null) { - scanner.close(); - scanner = null; - } - } catch (Throwable t) { - throw new StorageException("Error when close scanner for table " + tableName, t); - } - } - - private void closeTable() { - try { - if (table != null) { - table.close(); - } - } catch (Throwable t) { - throw new StorageException("Error when close table " + tableName, t); - } - } - - @Override - public void close() { - logger.info("Closing CubeSegmentTupleIterator"); - closeScanner(); - closeTable(); - } - - protected void flushScanCountDelta() { - context.increaseTotalScanCount(scanCountDelta); - scanCountDelta = 0; - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/545201f6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java deleted file mode 100644 index 75c3fd7..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java +++ /dev/null @@ -1,796 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase.cube.v1; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.BitSet; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; - -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.BytesUtil; -import org.apache.kylin.common.util.Dictionary; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.common.util.ShardingHash; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.RawQueryLastHacker; -import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.kv.RowConstants; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.cube.model.CubeDesc.DeriveInfo; -import org.apache.kylin.cube.model.HBaseColumnDesc; -import org.apache.kylin.cube.model.HBaseMappingDesc; -import org.apache.kylin.cube.model.RowKeyDesc; -import org.apache.kylin.dict.lookup.LookupStringTable; -import org.apache.kylin.measure.MeasureType; -import org.apache.kylin.metadata.filter.ColumnTupleFilter; -import org.apache.kylin.metadata.filter.CompareTupleFilter; -import org.apache.kylin.metadata.filter.LogicalTupleFilter; -import org.apache.kylin.metadata.filter.TupleFilter; -import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum; -import org.apache.kylin.metadata.model.FunctionDesc; -import org.apache.kylin.metadata.model.MeasureDesc; -import org.apache.kylin.metadata.model.SegmentStatusEnum; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.metadata.realization.SQLDigest; -import org.apache.kylin.metadata.tuple.ITupleIterator; -import org.apache.kylin.metadata.tuple.TupleInfo; -import org.apache.kylin.storage.IStorageQuery; -import org.apache.kylin.storage.StorageContext; -import org.apache.kylin.storage.hbase.HBaseConnection; -import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler; -import org.apache.kylin.storage.hbase.steps.RowValueDecoder; -import org.apache.kylin.storage.translate.ColumnValueRange; -import org.apache.kylin.storage.translate.DerivedFilterTranslator; -import org.apache.kylin.storage.translate.HBaseKeyRange; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -@SuppressWarnings("unused") -public class CubeStorageQuery implements IStorageQuery { - - private static final Logger logger = LoggerFactory.getLogger(CubeStorageQuery.class); - - private static final int MERGE_KEYRANGE_THRESHOLD = 100; - - private final CubeInstance cubeInstance; - private final CubeDesc cubeDesc; - private final String uuid; - - public CubeStorageQuery(CubeInstance cube) { - this.cubeInstance = cube; - this.cubeDesc = cube.getDescriptor(); - this.uuid = cube.getUuid(); - } - - @Override - public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) { - - //deal with participant columns in subquery join - sqlDigest.includeSubqueryJoinParticipants(); - - //cope with queries with no aggregations - RawQueryLastHacker.hackNoAggregations(sqlDigest, cubeDesc, returnTupleInfo); - - // Customized measure taking effect: e.g. allow custom measures to help raw queries - notifyBeforeStorageQuery(sqlDigest); - - Collection<TblColRef> groups = sqlDigest.groupbyColumns; - TupleFilter filter = sqlDigest.filter; - - // build dimension & metrics - Collection<TblColRef> dimensions = new HashSet<TblColRef>(); - Collection<FunctionDesc> metrics = new HashSet<FunctionDesc>(); - buildDimensionsAndMetrics(dimensions, metrics, sqlDigest); - - // all dimensions = groups + others - Set<TblColRef> others = Sets.newHashSet(dimensions); - others.removeAll(groups); - - // expand derived - Set<TblColRef> derivedPostAggregation = Sets.newHashSet(); - Set<TblColRef> groupsD = expandDerived(groups, derivedPostAggregation); - Set<TblColRef> othersD = expandDerived(others, derivedPostAggregation); - othersD.removeAll(groupsD); - - // identify cuboid - Set<TblColRef> dimensionsD = Sets.newHashSet(); - dimensionsD.addAll(groupsD); - dimensionsD.addAll(othersD); - Cuboid cuboid = identifyCuboid(dimensionsD, metrics); - context.setCuboid(cuboid); - - // isExactAggregation? meaning: tuples returned from storage requires no further aggregation in query engine - Set<TblColRef> singleValuesD = findSingleValueColumns(filter); - boolean isExactAggregation = isExactAggregation(cuboid, groupsD, othersD, singleValuesD, derivedPostAggregation); - context.setExactAggregation(isExactAggregation); - - // translate filter for scan range and compose returning groups for coprocessor, note: - // - columns on non-evaluatable filter have to return - // - columns on loosened filter (due to derived translation) have to return - Set<TblColRef> groupsCopD = Sets.newHashSet(groupsD); - collectNonEvaluable(filter, groupsCopD); - TupleFilter filterD = translateDerived(filter, groupsCopD); - - // translate filter into segment scan ranges - List<HBaseKeyRange> scans = buildScanRanges(flattenToOrAndFilter(filterD), dimensionsD); - - // check involved measures, build value decoder for each each family:column - List<RowValueDecoder> valueDecoders = translateAggregation(cubeDesc.getHbaseMapping(), metrics, context); - - // memory hungry distinct count are pushed down to coprocessor, no need to set threshold any more - setThreshold(dimensionsD, valueDecoders, context); // set cautious threshold to prevent out of memory - setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial - setLimit(filter, context); - - HConnection conn = HBaseConnection.get(context.getConnUrl()); - - // notice we're passing filterD down to storage instead of flatFilter - return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, valueDecoders, context, returnTupleInfo); - } - - private void buildDimensionsAndMetrics(Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics, SQLDigest sqlDigest) { - - for (FunctionDesc func : sqlDigest.aggregations) { - if (!func.isDimensionAsMetric()) { - metrics.add(func); - } - } - - for (TblColRef column : sqlDigest.allColumns) { - // skip measure columns - if (sqlDigest.metricColumns.contains(column)) { - continue; - } - - dimensions.add(column); - } - } - - private Cuboid identifyCuboid(Set<TblColRef> dimensions, Collection<FunctionDesc> metrics) { - for (FunctionDesc metric : metrics) { - if (metric.getMeasureType().onlyAggrInBaseCuboid()) - return Cuboid.getBaseCuboid(cubeDesc); - } - - long cuboidID = 0; - for (TblColRef column : dimensions) { - int index = cubeDesc.getRowkey().getColumnBitIndex(column); - cuboidID |= 1L << index; - } - return Cuboid.findById(cubeDesc, cuboidID); - } - - private boolean isExactAggregation(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> othersD, Set<TblColRef> singleValuesD, Set<TblColRef> derivedPostAggregation) { - boolean exact = true; - - if (cuboid.requirePostAggregation()) { - exact = false; - logger.info("exactAggregation is false because cuboid " + cuboid.getInputID() + "=> " + cuboid.getId()); - } - - // derived aggregation is bad, unless expanded columns are already in group by - if (groups.containsAll(derivedPostAggregation) == false) { - exact = false; - logger.info("exactAggregation is false because derived column require post aggregation: " + derivedPostAggregation); - } - - // other columns (from filter) is bad, unless they are ensured to have single value - if (singleValuesD.containsAll(othersD) == false) { - exact = false; - logger.info("exactAggregation is false because some column not on group by: " + othersD // - + " (single value column: " + singleValuesD + ")"); - } - - if (exact) { - logger.info("exactAggregation is true"); - } - return exact; - } - - private Set<TblColRef> expandDerived(Collection<TblColRef> cols, Set<TblColRef> derivedPostAggregation) { - Set<TblColRef> expanded = Sets.newHashSet(); - for (TblColRef col : cols) { - if (cubeDesc.hasHostColumn(col)) { - DeriveInfo hostInfo = cubeDesc.getHostInfo(col); - for (TblColRef hostCol : hostInfo.columns) { - expanded.add(hostCol); - if (hostInfo.isOneToOne == false) - derivedPostAggregation.add(hostCol); - } - } else { - expanded.add(col); - } - } - return expanded; - } - - @SuppressWarnings("unchecked") - private Set<TblColRef> findSingleValueColumns(TupleFilter filter) { - Collection<? extends TupleFilter> toCheck; - if (filter instanceof CompareTupleFilter) { - toCheck = Collections.singleton(filter); - } else if (filter instanceof LogicalTupleFilter && filter.getOperator() == FilterOperatorEnum.AND) { - toCheck = filter.getChildren(); - } else { - return (Set<TblColRef>) Collections.EMPTY_SET; - } - - Set<TblColRef> result = Sets.newHashSet(); - for (TupleFilter f : toCheck) { - if (f instanceof CompareTupleFilter) { - CompareTupleFilter compFilter = (CompareTupleFilter) f; - // is COL=const ? - if (compFilter.getOperator() == FilterOperatorEnum.EQ && compFilter.getValues().size() == 1 && compFilter.getColumn() != null) { - result.add(compFilter.getColumn()); - } - } - } - - // expand derived - Set<TblColRef> resultD = Sets.newHashSet(); - for (TblColRef col : result) { - if (cubeDesc.isExtendedColumn(col)) { - throw new CubeDesc.CannotFilterExtendedColumnException(col); - } - - if (cubeDesc.isDerived(col)) { - DeriveInfo hostInfo = cubeDesc.getHostInfo(col); - if (hostInfo.isOneToOne) { - for (TblColRef hostCol : hostInfo.columns) { - resultD.add(hostCol); - } - } - //if not one2one, it will be pruned - } else { - resultD.add(col); - } - } - return resultD; - } - - private void collectNonEvaluable(TupleFilter filter, Set<TblColRef> collector) { - if (filter == null) - return; - - if (filter.isEvaluable()) { - for (TupleFilter child : filter.getChildren()) - collectNonEvaluable(child, collector); - } else { - collectColumnsRecursively(filter, collector); - } - } - - private void collectColumnsRecursively(TupleFilter filter, Set<TblColRef> collector) { - if (filter == null) - return; - - if (filter instanceof ColumnTupleFilter) { - collectColumns(((ColumnTupleFilter) filter).getColumn(), collector); - } - for (TupleFilter child : filter.getChildren()) { - collectColumnsRecursively(child, collector); - } - } - - private void collectColumns(TblColRef col, Set<TblColRef> collector) { - if (cubeDesc.isExtendedColumn(col)) { - throw new CubeDesc.CannotFilterExtendedColumnException(col); - } - - if (cubeDesc.isDerived(col)) { - DeriveInfo hostInfo = cubeDesc.getHostInfo(col); - for (TblColRef h : hostInfo.columns) - collector.add(h); - } else { - collector.add(col); - } - } - - @SuppressWarnings("unchecked") - private TupleFilter translateDerived(TupleFilter filter, Set<TblColRef> collector) { - if (filter == null) - return filter; - - if (filter instanceof CompareTupleFilter) { - return translateDerivedInCompare((CompareTupleFilter) filter, collector); - } - - List<TupleFilter> children = (List<TupleFilter>) filter.getChildren(); - List<TupleFilter> newChildren = Lists.newArrayListWithCapacity(children.size()); - boolean modified = false; - for (TupleFilter child : children) { - TupleFilter translated = translateDerived(child, collector); - newChildren.add(translated); - if (child != translated) - modified = true; - } - if (modified) { - filter = replaceChildren(filter, newChildren); - } - return filter; - } - - private TupleFilter replaceChildren(TupleFilter filter, List<TupleFilter> newChildren) { - if (filter instanceof LogicalTupleFilter) { - LogicalTupleFilter r = new LogicalTupleFilter(filter.getOperator()); - r.addChildren(newChildren); - return r; - } else - throw new IllegalStateException("Cannot replaceChildren on " + filter); - } - - private TupleFilter translateDerivedInCompare(CompareTupleFilter compf, Set<TblColRef> collector) { - if (compf.getColumn() == null || compf.getValues().isEmpty()) - return compf; - - TblColRef derived = compf.getColumn(); - if (cubeDesc.isExtendedColumn(derived)) { - throw new CubeDesc.CannotFilterExtendedColumnException(derived); - } - if (cubeDesc.isDerived(derived) == false) { - return compf; - } - - DeriveInfo hostInfo = cubeDesc.getHostInfo(derived); - CubeManager cubeMgr = CubeManager.getInstance(this.cubeInstance.getConfig()); - CubeSegment seg = cubeInstance.getLatestReadySegment(); - LookupStringTable lookup = cubeMgr.getLookupTable(seg, hostInfo.join); - Pair<TupleFilter, Boolean> translated = DerivedFilterTranslator.translate(lookup, hostInfo, compf); - TupleFilter translatedFilter = translated.getFirst(); - boolean loosened = translated.getSecond(); - if (loosened) { - collectColumnsRecursively(translatedFilter, collector); - } - return translatedFilter; - } - - private List<RowValueDecoder> translateAggregation(HBaseMappingDesc hbaseMapping, Collection<FunctionDesc> metrics, // - StorageContext context) { - Map<HBaseColumnDesc, RowValueDecoder> codecMap = Maps.newHashMap(); - for (FunctionDesc aggrFunc : metrics) { - Collection<HBaseColumnDesc> hbCols = hbaseMapping.findHBaseColumnByFunction(aggrFunc); - if (hbCols.isEmpty()) { - throw new IllegalStateException("can't find HBaseColumnDesc for function " + aggrFunc.getFullExpression()); - } - HBaseColumnDesc bestHBCol = null; - int bestIndex = -1; - for (HBaseColumnDesc hbCol : hbCols) { - bestHBCol = hbCol; - bestIndex = hbCol.findMeasure(aggrFunc); - // we used to prefer specific measure over another (holistic distinct count), now it's gone - break; - } - - RowValueDecoder codec = codecMap.get(bestHBCol); - if (codec == null) { - codec = new RowValueDecoder(bestHBCol); - codecMap.put(bestHBCol, codec); - } - codec.setProjectIndex(bestIndex); - } - return new ArrayList<RowValueDecoder>(codecMap.values()); - } - - //check TupleFilter.flatFilter's comment - private TupleFilter flattenToOrAndFilter(TupleFilter filter) { - if (filter == null) - return null; - - // core - TupleFilter flatFilter = filter.flatFilter(); - - // normalize to OR-AND filter - if (flatFilter.getOperator() == FilterOperatorEnum.AND) { - LogicalTupleFilter f = new LogicalTupleFilter(FilterOperatorEnum.OR); - f.addChild(flatFilter); - flatFilter = f; - } - - if (flatFilter.getOperator() != FilterOperatorEnum.OR) - throw new IllegalStateException(); - - return flatFilter; - } - - private List<HBaseKeyRange> buildScanRanges(TupleFilter flatFilter, Collection<TblColRef> dimensionColumns) { - - List<HBaseKeyRange> result = Lists.newArrayList(); - - logger.info("Current cubeInstance is " + cubeInstance + " with " + cubeInstance.getSegments().size() + " segs in all"); - List<CubeSegment> segs = cubeInstance.getSegments(SegmentStatusEnum.READY); - logger.info("READY segs count: " + segs.size()); - - // build row key range for each cube segment - StringBuilder sb = new StringBuilder("hbasekeyrange trace: "); - for (CubeSegment cubeSeg : segs) { - CubeDesc cubeDesc = cubeSeg.getCubeDesc(); - if (cubeDesc.getConfig().isSkippingEmptySegments() && cubeSeg.getInputRecords() == 0) { - logger.info("Skip cube segment {} because its input record is 0", cubeSeg); - continue; - } - // consider derived (lookup snapshot), filter on dimension may - // differ per segment - List<Collection<ColumnValueRange>> orAndDimRanges = translateToOrAndDimRanges(flatFilter, cubeSeg); - if (orAndDimRanges == null) { // has conflict - continue; - } - - List<HBaseKeyRange> scanRanges = Lists.newArrayListWithCapacity(orAndDimRanges.size()); - for (Collection<ColumnValueRange> andDimRanges : orAndDimRanges) { - HBaseKeyRange rowKeyRange = new HBaseKeyRange(dimensionColumns, andDimRanges, cubeSeg, cubeDesc); - scanRanges.add(rowKeyRange); - } - - //log - sb.append(scanRanges.size() + "=(mergeoverlap)>"); - - List<HBaseKeyRange> mergedRanges = mergeOverlapRanges(scanRanges); - - //log - sb.append(mergedRanges.size() + "=(mergetoomany)>"); - - mergedRanges = mergeTooManyRanges(mergedRanges); - - //log - sb.append(mergedRanges.size() + ","); - - result.addAll(mergedRanges); - } - logger.info(sb.toString()); - - logger.info("hbasekeyrange count: " + result.size()); - - dropUnhitSegments(result); - logger.info("hbasekeyrange count after dropping unhit :" + result.size()); - - //TODO: should use LazyRowKeyEncoder.getRowKeysDifferentShards like CubeHBaseRPC, not do so because v1 query engine is retiring. not worth changing it - if (cubeDesc.isEnableSharding()) { - result = duplicateRangeByShard(result); - } - logger.info("hbasekeyrange count after dropping duplicatebyshard :" + result.size()); - - return result; - } - - private List<Collection<ColumnValueRange>> translateToOrAndDimRanges(TupleFilter flatFilter, CubeSegment cubeSegment) { - List<Collection<ColumnValueRange>> result = Lists.newArrayList(); - - if (flatFilter == null) { - result.add(Collections.<ColumnValueRange> emptyList()); - return result; - } - - for (TupleFilter andFilter : flatFilter.getChildren()) { - if (andFilter.getOperator() != FilterOperatorEnum.AND) { - throw new IllegalStateException("Filter should be AND instead of " + andFilter); - } - - Collection<ColumnValueRange> andRanges = translateToAndDimRanges(andFilter.getChildren(), cubeSegment); - - if (andRanges != null) { - result.add(andRanges); - } - } - - return preprocessConstantConditions(result); - } - - private List<Collection<ColumnValueRange>> preprocessConstantConditions(List<Collection<ColumnValueRange>> orAndRanges) { - boolean globalAlwaysTrue = false; - Iterator<Collection<ColumnValueRange>> iterator = orAndRanges.iterator(); - while (iterator.hasNext()) { - Collection<ColumnValueRange> andRanges = iterator.next(); - Iterator<ColumnValueRange> iterator2 = andRanges.iterator(); - boolean hasAlwaysFalse = false; - while (iterator2.hasNext()) { - ColumnValueRange range = iterator2.next(); - if (range.satisfyAll()) - iterator2.remove(); - else if (range.satisfyNone()) - hasAlwaysFalse = true; - } - if (hasAlwaysFalse) { - iterator.remove(); - } else if (andRanges.isEmpty()) { - globalAlwaysTrue = true; - break; - } - } - if (globalAlwaysTrue) { - orAndRanges.clear(); - orAndRanges.add(Collections.<ColumnValueRange> emptyList()); - } - return orAndRanges; - } - - // return empty collection to mean true; return null to mean false - @SuppressWarnings("unchecked") - private Collection<ColumnValueRange> translateToAndDimRanges(List<? extends TupleFilter> andFilters, CubeSegment cubeSegment) { - Map<TblColRef, ColumnValueRange> rangeMap = new HashMap<TblColRef, ColumnValueRange>(); - for (TupleFilter filter : andFilters) { - if ((filter instanceof CompareTupleFilter) == false) { - continue; - } - - CompareTupleFilter comp = (CompareTupleFilter) filter; - if (comp.getColumn() == null) { - continue; - } - - ColumnValueRange range = new ColumnValueRange(comp.getColumn(), (Collection<String>) comp.getValues(), comp.getOperator()); - andMerge(range, rangeMap); - } - - // a little pre-evaluation to remove invalid EQ/IN values and round start/end according to dictionary - RowKeyDesc rowkey = cubeSegment.getCubeDesc().getRowkey(); - Iterator<ColumnValueRange> it = rangeMap.values().iterator(); - while (it.hasNext()) { - ColumnValueRange range = it.next(); - if (rowkey.isUseDictionary(range.getColumn())) { - range.preEvaluateWithDict((Dictionary<String>) cubeSegment.getDictionary(range.getColumn())); - } - if (range.satisfyAll()) - it.remove(); - else if (range.satisfyNone()) - return null; - } - - return rangeMap.values(); - } - - private void andMerge(ColumnValueRange range, Map<TblColRef, ColumnValueRange> rangeMap) { - ColumnValueRange columnRange = rangeMap.get(range.getColumn()); - if (columnRange == null) { - rangeMap.put(range.getColumn(), range); - } else { - columnRange.andMerge(range); - } - } - - private List<HBaseKeyRange> mergeOverlapRanges(List<HBaseKeyRange> keyRanges) { - if (keyRanges.size() <= 1) { - return keyRanges; - } - - if (logger.isDebugEnabled()) { - logger.debug("Merging key range from " + keyRanges.size()); - } - - // sort ranges by start key - Collections.sort(keyRanges); - - // merge the overlap range - List<HBaseKeyRange> mergedRanges = new LinkedList<HBaseKeyRange>(); - int beginIndex = 0; - byte[] maxStopKey = keyRanges.get(0).getStopKey(); - for (int index = 0; index < keyRanges.size(); index++) { - HBaseKeyRange keyRange = keyRanges.get(index); - if (Bytes.compareTo(maxStopKey, keyRange.getStartKey()) < 0) { - // merge the current key ranges - HBaseKeyRange mergedRange = mergeKeyRange(keyRanges, beginIndex, index - 1); - mergedRanges.add(mergedRange); - // start new merge - beginIndex = index; - } - if (Bytes.compareTo(maxStopKey, keyRange.getStopKey()) < 0) { - // update the stop key - maxStopKey = keyRange.getStopKey(); - } - } - // merge last range - HBaseKeyRange mergedRange = mergeKeyRange(keyRanges, beginIndex, keyRanges.size() - 1); - mergedRanges.add(mergedRange); - if (logger.isDebugEnabled()) { - logger.debug("Merging key range to " + mergedRanges.size()); - } - return mergedRanges; - } - - private HBaseKeyRange mergeKeyRange(List<HBaseKeyRange> keyRanges, int from, int to) { - HBaseKeyRange keyRange = keyRanges.get(from); - int mergeSize = to - from + 1; - if (mergeSize > 1) { - // merge range from mergeHeader to i - 1 - CubeSegment cubeSegment = keyRange.getCubeSegment(); - Cuboid cuboid = keyRange.getCuboid(); - byte[] startKey = keyRange.getStartKey(); - byte[] stopKey = keyRange.getStopKey(); - long partitionColumnStartDate = Long.MAX_VALUE; - long partitionColumnEndDate = 0; - - TreeSet<Pair<byte[], byte[]>> newFuzzyKeys = new TreeSet<>(new Comparator<Pair<byte[], byte[]>>() { - @Override - public int compare(Pair<byte[], byte[]> o1, Pair<byte[], byte[]> o2) { - int partialResult = Bytes.compareTo(o1.getFirst(), o2.getFirst()); - if (partialResult != 0) { - return partialResult; - } else { - return Bytes.compareTo(o1.getSecond(), o2.getSecond()); - } - } - }); - List<Collection<ColumnValueRange>> newFlatOrAndFilter = Lists.newLinkedList(); - - boolean hasNonFuzzyRange = false; - for (int k = from; k <= to; k++) { - HBaseKeyRange nextRange = keyRanges.get(k); - hasNonFuzzyRange = hasNonFuzzyRange || nextRange.getFuzzyKeys().isEmpty(); - newFuzzyKeys.addAll(nextRange.getFuzzyKeys()); - newFlatOrAndFilter.addAll(nextRange.getFlatOrAndFilter()); - if (Bytes.compareTo(stopKey, nextRange.getStopKey()) < 0) { - stopKey = nextRange.getStopKey(); - } - if (nextRange.getPartitionColumnStartDate() > 0 && nextRange.getPartitionColumnStartDate() < partitionColumnStartDate) { - partitionColumnStartDate = nextRange.getPartitionColumnStartDate(); - } - if (nextRange.getPartitionColumnEndDate() < Long.MAX_VALUE && nextRange.getPartitionColumnEndDate() > partitionColumnEndDate) { - partitionColumnEndDate = nextRange.getPartitionColumnEndDate(); - } - } - - // if any range is non-fuzzy, then all fuzzy keys must be cleared - if (hasNonFuzzyRange) { - newFuzzyKeys.clear(); - } - - partitionColumnStartDate = (partitionColumnStartDate == Long.MAX_VALUE) ? 0 : partitionColumnStartDate; - partitionColumnEndDate = (partitionColumnEndDate == 0) ? Long.MAX_VALUE : partitionColumnEndDate; - keyRange = new HBaseKeyRange(cubeSegment, cuboid, startKey, stopKey, Lists.newArrayList(newFuzzyKeys), newFlatOrAndFilter, partitionColumnStartDate, partitionColumnEndDate); - } - return keyRange; - } - - private List<HBaseKeyRange> mergeTooManyRanges(List<HBaseKeyRange> keyRanges) { - if (keyRanges.size() < MERGE_KEYRANGE_THRESHOLD) { - return keyRanges; - } - // TODO: check the distance between range. and merge the large distance range - List<HBaseKeyRange> mergedRanges = new LinkedList<HBaseKeyRange>(); - HBaseKeyRange mergedRange = mergeKeyRange(keyRanges, 0, keyRanges.size() - 1); - mergedRanges.add(mergedRange); - return mergedRanges; - } - - private void dropUnhitSegments(List<HBaseKeyRange> scans) { - if (cubeDesc.getModel().getPartitionDesc().isPartitioned()) { - Iterator<HBaseKeyRange> iterator = scans.iterator(); - while (iterator.hasNext()) { - HBaseKeyRange scan = iterator.next(); - if (scan.hitSegment() == false) { - iterator.remove(); - } - } - } - } - - private List<HBaseKeyRange> duplicateRangeByShard(List<HBaseKeyRange> scans) { - List<HBaseKeyRange> ret = Lists.newArrayList(); - - for (HBaseKeyRange scan : scans) { - CubeSegment segment = scan.getCubeSegment(); - - byte[] startKey = scan.getStartKey(); - byte[] stopKey = scan.getStopKey(); - - short cuboidShardNum = segment.getCuboidShardNum(scan.getCuboid().getId()); - short cuboidShardBase = segment.getCuboidBaseShard(scan.getCuboid().getId()); - for (short i = 0; i < cuboidShardNum; ++i) { - short newShard = ShardingHash.normalize(cuboidShardBase, i, segment.getTotalShards(scan.getCuboid().getId())); - byte[] newStartKey = duplicateKeyAndChangeShard(newShard, startKey); - byte[] newStopKey = duplicateKeyAndChangeShard(newShard, stopKey); - HBaseKeyRange newRange = new HBaseKeyRange(segment, scan.getCuboid(), newStartKey, newStopKey, // - scan.getFuzzyKeys(), scan.getFlatOrAndFilter(), scan.getPartitionColumnStartDate(), scan.getPartitionColumnEndDate()); - ret.add(newRange); - } - } - - Collections.sort(ret, new Comparator<HBaseKeyRange>() { - @Override - public int compare(HBaseKeyRange o1, HBaseKeyRange o2) { - return Bytes.compareTo(o1.getStartKey(), o2.getStartKey()); - } - }); - - return ret; - } - - private byte[] duplicateKeyAndChangeShard(short newShard, byte[] bytes) { - byte[] ret = Arrays.copyOf(bytes, bytes.length); - BytesUtil.writeShort(newShard, ret, 0, RowConstants.ROWKEY_SHARDID_LEN); - return ret; - } - - private void setThreshold(Collection<TblColRef> dimensions, List<RowValueDecoder> valueDecoders, StorageContext context) { - if (RowValueDecoder.hasMemHungryMeasures(valueDecoders) == false) { - return; - } - - int rowSizeEst = dimensions.size() * 3; - for (RowValueDecoder decoder : valueDecoders) { - MeasureDesc[] measures = decoder.getMeasures(); - BitSet projectionIndex = decoder.getProjectionIndex(); - for (int i = projectionIndex.nextSetBit(0); i >= 0; i = projectionIndex.nextSetBit(i + 1)) { - FunctionDesc func = measures[i].getFunction(); - // FIXME getStorageBytesEstimate() is not appropriate as here we want size in memory (not in storage) - rowSizeEst += func.getReturnDataType().getStorageBytesEstimate(); - } - } - - long rowEst = this.cubeInstance.getConfig().getQueryMemBudget() / rowSizeEst; - if (rowEst > 0) { - logger.info("Memory budget is set to: " + rowEst); - context.setThreshold((int) rowEst); - } else { - logger.info("Memory budget is not set."); - } - } - - private void setLimit(TupleFilter filter, StorageContext context) { - boolean goodAggr = context.isExactAggregation(); - boolean goodFilter = filter == null || (TupleFilter.isEvaluableRecursively(filter) && context.isCoprocessorEnabled()); - boolean goodSort = !context.hasSort(); - if (goodAggr && goodFilter && goodSort) { - logger.info("Enable limit " + context.getLimit()); - context.enableLimit(); - } - } - - private void setCoprocessor(Set<TblColRef> groupsCopD, List<RowValueDecoder> valueDecoders, StorageContext context) { - ObserverEnabler.enableCoprocessorIfBeneficial(cubeInstance, groupsCopD, valueDecoders, context); - } - - private void notifyBeforeStorageQuery(SQLDigest sqlDigest) { - - Map<String, List<MeasureDesc>> map = Maps.newHashMap(); - for (MeasureDesc measure : cubeDesc.getMeasures()) { - MeasureType<?> measureType = measure.getFunction().getMeasureType(); - - String key = measureType.getClass().getCanonicalName(); - List<MeasureDesc> temp = null; - if ((temp = map.get(key)) != null) { - temp.add(measure); - } else { - map.put(key, Lists.<MeasureDesc> newArrayList(measure)); - } - } - - for (List<MeasureDesc> sublist : map.values()) { - sublist.get(0).getFunction().getMeasureType().adjustSqlDigest(sublist, sqlDigest); - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/545201f6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java deleted file mode 100644 index 64feff0..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java +++ /dev/null @@ -1,271 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.storage.hbase.cube.v1; - -import java.io.IOException; -import java.util.BitSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - -import org.apache.hadoop.hbase.client.Result; -import org.apache.kylin.common.util.Array; -import org.apache.kylin.common.util.Dictionary; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.cuboid.Cuboid; -import org.apache.kylin.cube.kv.RowKeyDecoder; -import org.apache.kylin.cube.model.CubeDesc.DeriveInfo; -import org.apache.kylin.dict.lookup.LookupStringTable; -import org.apache.kylin.measure.MeasureType; -import org.apache.kylin.metadata.model.FunctionDesc; -import org.apache.kylin.metadata.model.MeasureDesc; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.metadata.tuple.Tuple; -import org.apache.kylin.metadata.tuple.TupleInfo; -import org.apache.kylin.storage.hbase.steps.RowValueDecoder; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -public class CubeTupleConverter { - - final CubeSegment cubeSeg; - final Cuboid cuboid; - final TupleInfo tupleInfo; - final RowKeyDecoder rowKeyDecoder; - final List<RowValueDecoder> rowValueDecoders; - final List<IDerivedColumnFiller> derivedColFillers; - final int[] dimensionTupleIdx; - final int[][] metricsMeasureIdx; - final int[][] metricsTupleIdx; - - final List<MeasureType<?>> measureTypes; - - final List<MeasureType.IAdvMeasureFiller> advMeasureFillers; - final List<Pair<Integer, Integer>> advMeasureIndexInRV;//first=> which rowValueDecoders,second => metric index - - public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, List<RowValueDecoder> rowValueDecoders, TupleInfo tupleInfo) { - this.cubeSeg = cubeSeg; - this.cuboid = cuboid; - this.tupleInfo = tupleInfo; - this.rowKeyDecoder = new RowKeyDecoder(this.cubeSeg); - this.rowValueDecoders = rowValueDecoders; - this.derivedColFillers = Lists.newArrayList(); - - List<TblColRef> dimCols = cuboid.getColumns(); - - measureTypes = Lists.newArrayList(); - advMeasureFillers = Lists.newArrayListWithCapacity(1); - advMeasureIndexInRV = Lists.newArrayListWithCapacity(1); - - // pre-calculate dimension index mapping to tuple - dimensionTupleIdx = new int[dimCols.size()]; - for (int i = 0; i < dimCols.size(); i++) { - TblColRef col = dimCols.get(i); - dimensionTupleIdx[i] = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1; - } - - // pre-calculate metrics index mapping to tuple - metricsMeasureIdx = new int[rowValueDecoders.size()][]; - metricsTupleIdx = new int[rowValueDecoders.size()][]; - for (int i = 0; i < rowValueDecoders.size(); i++) { - RowValueDecoder decoder = rowValueDecoders.get(i); - MeasureDesc[] measures = decoder.getMeasures(); - BitSet selectedMeasures = decoder.getProjectionIndex(); - metricsMeasureIdx[i] = new int[selectedMeasures.cardinality()]; - metricsTupleIdx[i] = new int[selectedMeasures.cardinality()]; - for (int j = 0, mi = selectedMeasures.nextSetBit(0); j < metricsMeasureIdx[i].length; j++, mi = selectedMeasures.nextSetBit(mi + 1)) { - FunctionDesc aggrFunc = measures[mi].getFunction(); - - int tupleIdx; - if (aggrFunc.needRewrite()) { - // a rewrite metrics is identified by its rewrite field name - String rewriteFieldName = aggrFunc.getRewriteFieldName(); - tupleIdx = tupleInfo.hasField(rewriteFieldName) ? tupleInfo.getFieldIndex(rewriteFieldName) : -1; - } else { - // a non-rewrite metrics (like sum, or dimension playing as metrics) is like a dimension column - TblColRef col = aggrFunc.getParameter().getColRefs().get(0); - tupleIdx = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1; - } - metricsMeasureIdx[i][j] = mi; - metricsTupleIdx[i][j] = tupleIdx; - - MeasureType<?> measureType = aggrFunc.getMeasureType(); - if (measureType.needAdvancedTupleFilling()) { - Map<TblColRef, Dictionary<String>> dictionaryMap = buildDictionaryMap(measureType.getColumnsNeedDictionary(aggrFunc)); - advMeasureFillers.add(measureType.getAdvancedTupleFiller(aggrFunc, tupleInfo, dictionaryMap)); - advMeasureIndexInRV.add(Pair.newPair(i, mi)); - measureTypes.add(null); - } else { - measureTypes.add(measureType); - } - } - } - - // prepare derived columns and filler - Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedInfo = cuboid.getCubeDesc().getHostToDerivedInfo(dimCols, null); - for (Entry<Array<TblColRef>, List<DeriveInfo>> entry : hostToDerivedInfo.entrySet()) { - TblColRef[] hostCols = entry.getKey().data; - for (DeriveInfo deriveInfo : entry.getValue()) { - IDerivedColumnFiller filler = newDerivedColumnFiller(hostCols, deriveInfo); - if (filler != null) { - derivedColFillers.add(filler); - } - } - } - } - - // load only needed dictionaries - private Map<TblColRef, Dictionary<String>> buildDictionaryMap(List<TblColRef> columnsNeedDictionary) { - Map<TblColRef, Dictionary<String>> result = Maps.newHashMap(); - for (TblColRef col : columnsNeedDictionary) { - result.put(col, cubeSeg.getDictionary(col)); - } - return result; - } - - public List<MeasureType.IAdvMeasureFiller> translateResult(Result hbaseRow, Tuple tuple) { - try { - byte[] rowkey = hbaseRow.getRow(); - rowKeyDecoder.decode(rowkey); - } catch (IOException ex) { - throw new RuntimeException("Cannot translate hbase result " + hbaseRow); - } - - // dimensions - List<String> dimensionValues = rowKeyDecoder.getValues(); - for (int i = 0; i < dimensionValues.size(); i++) { - int tupleIdx = dimensionTupleIdx[i]; - if (tupleIdx >= 0) { - tuple.setDimensionValue(tupleIdx, dimensionValues.get(i)); - } - } - - // derived - for (IDerivedColumnFiller filler : derivedColFillers) { - filler.fillDerivedColumns(dimensionValues, tuple); - } - - // measures - int index = 0; - for (int i = 0; i < rowValueDecoders.size(); i++) { - RowValueDecoder rowValueDecoder = rowValueDecoders.get(i); - rowValueDecoder.decodeAndConvertJavaObj(hbaseRow); - Object[] measureValues = rowValueDecoder.getValues(); - - int[] measureIdx = metricsMeasureIdx[i]; - int[] tupleIdx = metricsTupleIdx[i]; - for (int j = 0; j < measureIdx.length; j++) { - if (measureTypes.get(index++) != null) { - tuple.setMeasureValue(tupleIdx[j], measureValues[measureIdx[j]]); - } - } - } - - // advanced measure filling, due to possible row split, will complete at caller side - if (advMeasureFillers.isEmpty()) { - return null; - } else { - for (int i = 0; i < advMeasureFillers.size(); i++) { - Pair<Integer, Integer> metricLocation = advMeasureIndexInRV.get(i); - Object measureValue = rowValueDecoders.get(metricLocation.getFirst()).getValues()[metricLocation.getSecond()]; - advMeasureFillers.get(i).reload(measureValue); - } - return advMeasureFillers; - } - } - - private interface IDerivedColumnFiller { - public void fillDerivedColumns(List<String> rowValues, Tuple tuple); - } - - private IDerivedColumnFiller newDerivedColumnFiller(TblColRef[] hostCols, final DeriveInfo deriveInfo) { - List<TblColRef> rowColumns = cuboid.getColumns(); - - final int[] hostColIdx = new int[hostCols.length]; - for (int i = 0; i < hostCols.length; i++) { - hostColIdx[i] = rowColumns.indexOf(hostCols[i]); - } - - boolean needCopyDerived = false; - final int[] derivedTupleIdx = new int[deriveInfo.columns.length]; - for (int i = 0; i < deriveInfo.columns.length; i++) { - TblColRef col = deriveInfo.columns[i]; - derivedTupleIdx[i] = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1; - needCopyDerived = needCopyDerived || derivedTupleIdx[i] >= 0; - } - - if (needCopyDerived == false) - return null; - - switch (deriveInfo.type) { - case LOOKUP: - return new IDerivedColumnFiller() { - CubeManager cubeMgr = CubeManager.getInstance(cubeSeg.getCubeInstance().getConfig()); - LookupStringTable lookupTable = cubeMgr.getLookupTable(cubeSeg, deriveInfo.join); - int[] derivedColIdx = initDerivedColIdx(); - Array<String> lookupKey = new Array<String>(new String[hostColIdx.length]); - - private int[] initDerivedColIdx() { - int[] idx = new int[deriveInfo.columns.length]; - for (int i = 0; i < idx.length; i++) { - idx[i] = deriveInfo.columns[i].getColumnDesc().getZeroBasedIndex(); - } - return idx; - } - - @Override - public void fillDerivedColumns(List<String> rowValues, Tuple tuple) { - for (int i = 0; i < hostColIdx.length; i++) { - lookupKey.data[i] = rowValues.get(hostColIdx[i]); - } - - String[] lookupRow = lookupTable.getRow(lookupKey); - - if (lookupRow != null) { - for (int i = 0; i < derivedTupleIdx.length; i++) { - if (derivedTupleIdx[i] >= 0) { - String value = lookupRow[derivedColIdx[i]]; - tuple.setDimensionValue(derivedTupleIdx[i], value); - } - } - } else { - for (int i = 0; i < derivedTupleIdx.length; i++) { - if (derivedTupleIdx[i] >= 0) { - tuple.setDimensionValue(derivedTupleIdx[i], null); - } - } - } - } - }; - case PK_FK: - return new IDerivedColumnFiller() { - @Override - public void fillDerivedColumns(List<String> rowValues, Tuple tuple) { - // composite keys are split, so only copy [0] is enough, see CubeDesc.initDimensionColumns() - tuple.setDimensionValue(derivedTupleIdx[0], rowValues.get(hostColIdx[0])); - } - }; - default: - throw new IllegalArgumentException(); - } - } -} http://git-wip-us.apache.org/repos/asf/kylin/blob/545201f6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/RegionScannerAdapter.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/RegionScannerAdapter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/RegionScannerAdapter.java deleted file mode 100644 index 8a20c65..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/RegionScannerAdapter.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase.cube.v1; - -import java.io.IOException; -import java.util.List; - -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.regionserver.RegionScanner; - -/** - * @author yangli9 - * - */ -public class RegionScannerAdapter implements RegionScanner { - - private ResultScanner scanner; - - public RegionScannerAdapter(ResultScanner scanner) { - this.scanner = scanner; - } - - @Override - public boolean next(List<Cell> results) throws IOException { - Result result = scanner.next(); - if (result == null) // EOF - return false; - - results.addAll(result.listCells()); - return true; - } - - @Override - public boolean next(List<Cell> result, int limit) throws IOException { - return next(result); - } - - @Override - public boolean nextRaw(List<Cell> result) throws IOException { - return next(result); - } - - @Override - public boolean nextRaw(List<Cell> result, int limit) throws IOException { - return next(result); - } - - @Override - public void close() throws IOException { - scanner.close(); - } - - @Override - public HRegionInfo getRegionInfo() { - return null; - } - - @Override - public long getMaxResultSize() { - return Long.MAX_VALUE; - } - - @Override - public boolean isFilterDone() throws IOException { - return false; - } - - @Override - public boolean reseek(byte[] row) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public long getMvccReadPoint() { - return Long.MAX_VALUE; - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/545201f6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/ResultScannerAdapter.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/ResultScannerAdapter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/ResultScannerAdapter.java deleted file mode 100644 index 99058e7..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/ResultScannerAdapter.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase.cube.v1; - -import java.io.IOException; -import java.util.Iterator; -import java.util.List; - -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.regionserver.RegionScanner; - -import com.google.common.collect.Lists; - -/** - * @author yangli9 - * - */ -public class ResultScannerAdapter implements ResultScanner { - - private RegionScanner scanner; - - public ResultScannerAdapter(RegionScanner scanner) { - this.scanner = scanner; - } - - @Override - public Iterator<Result> iterator() { - return new Iterator<Result>() { - - Result next = null; - - @Override - public boolean hasNext() { - if (next == null) { - try { - next = ResultScannerAdapter.this.next(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - return next != null; - } - - @Override - public Result next() { - Result r = next; - next = null; - return r; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } - - @Override - public Result next() throws IOException { - List<Cell> cells = Lists.newArrayList(); - scanner.next(cells); - if (cells.isEmpty()) - return null; - else - return Result.create(cells); - } - - @Override - public Result[] next(int nbRows) throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public void close() { - try { - scanner.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/545201f6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java deleted file mode 100644 index e8dd5b9..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase.cube.v1; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Set; - -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.metadata.filter.TupleFilter; -import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.metadata.tuple.ITuple; -import org.apache.kylin.metadata.tuple.ITupleIterator; -import org.apache.kylin.metadata.tuple.TupleInfo; -import org.apache.kylin.storage.StorageContext; -import org.apache.kylin.storage.exception.ScanOutOfLimitException; -import org.apache.kylin.storage.hbase.steps.RowValueDecoder; -import org.apache.kylin.storage.translate.HBaseKeyRange; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -/** - * @author xjiang - */ -public class SerializedHBaseTupleIterator implements ITupleIterator { - - private static final int PARTIAL_DEFAULT_LIMIT = 10000; - - private final StorageContext context; - private final int partialResultLimit; - private final List<CubeSegmentTupleIterator> segmentIteratorList; - private final Iterator<CubeSegmentTupleIterator> segmentIteratorIterator; - - private ITupleIterator segmentIterator; - private int scanCount; - private ITuple next; - - public SerializedHBaseTupleIterator(HConnection conn, List<HBaseKeyRange> segmentKeyRanges, CubeInstance cube, // - Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, List<RowValueDecoder> rowValueDecoders, // - StorageContext context, TupleInfo returnTupleInfo) { - - this.context = context; - int limit = context.getLimit(); - this.partialResultLimit = Math.max(limit, PARTIAL_DEFAULT_LIMIT); - - this.segmentIteratorList = new ArrayList<CubeSegmentTupleIterator>(segmentKeyRanges.size()); - Map<CubeSegment, List<HBaseKeyRange>> rangesMap = makeRangesMap(segmentKeyRanges); - - for (Map.Entry<CubeSegment, List<HBaseKeyRange>> entry : rangesMap.entrySet()) { - CubeSegmentTupleIterator it = new CubeSegmentTupleIterator(entry.getKey(), entry.getValue(), conn, dimensions, filter, groupBy, rowValueDecoders, context, returnTupleInfo); - this.segmentIteratorList.add(it); - } - - this.segmentIteratorIterator = this.segmentIteratorList.iterator(); - if (this.segmentIteratorIterator.hasNext()) { - this.segmentIterator = this.segmentIteratorIterator.next(); - } else { - this.segmentIterator = ITupleIterator.EMPTY_TUPLE_ITERATOR; - } - } - - private Map<CubeSegment, List<HBaseKeyRange>> makeRangesMap(List<HBaseKeyRange> segmentKeyRanges) { - Map<CubeSegment, List<HBaseKeyRange>> map = Maps.newHashMap(); - for (HBaseKeyRange range : segmentKeyRanges) { - List<HBaseKeyRange> list = map.get(range.getCubeSegment()); - if (list == null) { - list = Lists.newArrayList(); - map.put(range.getCubeSegment(), list); - } - list.add(range); - } - return map; - } - - @Override - public boolean hasNext() { - if (next != null) - return true; - - // 1. check limit - if (context.isLimitEnabled() && scanCount >= context.getLimit() + context.getOffset()) { - return false; - } - // 2. check partial result - if (context.isAcceptPartialResult() && scanCount > partialResultLimit) { - context.setPartialResultReturned(true); - return false; - } - // 3. check threshold - if (scanCount >= context.getThreshold()) { - throw new ScanOutOfLimitException("Scan row count exceeded threshold: " + context.getThreshold() + ", please add filter condition to narrow down backend scan range, like where clause."); - } - // 4. check cube segments - if (segmentIterator.hasNext()) { - next = segmentIterator.next(); - scanCount++; - return true; - } else if (segmentIteratorIterator.hasNext()) { - segmentIterator.close(); - segmentIterator = segmentIteratorIterator.next(); - return hasNext(); - } - return false; - } - - @Override - public ITuple next() { - if (next == null) { - hasNext(); - if (next == null) - throw new NoSuchElementException(); - } - ITuple r = next; - next = null; - return r; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - @Override - public void close() { - // hasNext() loop may exit because of limit, threshold, etc. - // close all the remaining segmentIterator - segmentIterator.close(); - while (segmentIteratorIterator.hasNext()) { - segmentIterator = segmentIteratorIterator.next(); - segmentIterator.close(); - } - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/545201f6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java deleted file mode 100644 index 7139ca7..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserver.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer; - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; -import org.apache.hadoop.hbase.coprocessor.ObserverContext; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; -import org.apache.hadoop.hbase.regionserver.RegionScanner; -import org.apache.kylin.gridtable.StorageSideBehavior; -import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter; -import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector; -import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType; - -/** - * @author yangli9 - */ -public class AggregateRegionObserver extends BaseRegionObserver { - - // HBase uses common logging (vs. Kylin uses slf4j) - static final Log LOG = LogFactory.getLog(AggregateRegionObserver.class); - - static final String COPROCESSOR_ENABLE = "_Coprocessor_Enable"; - static final String TYPE = "_Type"; - static final String PROJECTOR = "_Projector"; - static final String AGGREGATORS = "_Aggregators"; - static final String FILTER = "_Filter"; - static final String BEHAVIOR = "_Behavior"; - - @Override - public final RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> ctxt, final Scan scan, final RegionScanner innerScanner) throws IOException { - - boolean copAbortOnError = ctxt.getEnvironment().getConfiguration().getBoolean(RegionCoprocessorHost.ABORT_ON_ERROR_KEY, RegionCoprocessorHost.DEFAULT_ABORT_ON_ERROR); - - // never throw out exception that could abort region server - if (copAbortOnError) { - try { - return doPostScannerObserver(ctxt, scan, innerScanner); - } catch (Throwable e) { - LOG.error("Kylin Coprocessor Error", e); - return innerScanner; - } - } else { - return doPostScannerObserver(ctxt, scan, innerScanner); - } - } - - private RegionScanner doPostScannerObserver(final ObserverContext<RegionCoprocessorEnvironment> ctxt, final Scan scan, final RegionScanner innerScanner) throws IOException { - byte[] coprocessorEnableBytes = scan.getAttribute(COPROCESSOR_ENABLE); - if (coprocessorEnableBytes == null || coprocessorEnableBytes.length == 0 || coprocessorEnableBytes[0] == 0) { - return innerScanner; - } - - byte[] typeBytes = scan.getAttribute(TYPE); - CoprocessorRowType type = CoprocessorRowType.deserialize(typeBytes); - - byte[] projectorBytes = scan.getAttribute(PROJECTOR); - CoprocessorProjector projector = CoprocessorProjector.deserialize(projectorBytes); - - byte[] aggregatorBytes = scan.getAttribute(AGGREGATORS); - ObserverAggregators aggregators = ObserverAggregators.deserialize(aggregatorBytes); - - byte[] filterBytes = scan.getAttribute(FILTER); - CoprocessorFilter filter = CoprocessorFilter.deserialize(filterBytes); - - StorageSideBehavior storageSideBehavior = StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM; - try { - byte[] behavior = scan.getAttribute(BEHAVIOR); - if (behavior != null && behavior.length != 0) { - storageSideBehavior = StorageSideBehavior.valueOf(new String(behavior)); - } - } catch (Exception e) { - LOG.error("failed to parse behavior,using default behavior SCAN_FILTER_AGGR_CHECKMEM", e); - storageSideBehavior = StorageSideBehavior.SCAN_FILTER_AGGR_CHECKMEM; - } - - // start/end region operation & sync on scanner is suggested by the - // javadoc of RegionScanner.nextRaw() - // FIXME: will the lock still work when a iterator is returned? is it safe? Is readonly attribute helping here? by mhb - HRegion region = ctxt.getEnvironment().getRegion(); - region.startRegionOperation(); - try { - synchronized (innerScanner) { - return new AggregationScanner(type, filter, projector, aggregators, innerScanner, storageSideBehavior); - } - } finally { - region.closeRegionOperation(); - } - } -}
