APACHE-KYLIN-1872: Make query visible and interruptible Signed-off-by: Zhong <[email protected]> Signed-off-by: lidongsjtu <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3dbcf587 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3dbcf587 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3dbcf587 Branch: refs/heads/KYLIN-2881-review Commit: 3dbcf587bfcf82941d68dcb84b7ec46daa61b6da Parents: 18ac702 Author: Ma Gang <[email protected]> Authored: Wed Sep 20 19:46:38 2017 +0800 Committer: lidongsjtu <[email protected]> Committed: Mon Jan 8 13:30:49 2018 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/common/QueryContext.java | 76 +++++++++---- .../kylin/common/QueryContextManager.java | 109 +++++++++++++++++++ .../gtrecord/GTCubeStorageQueryBase.java | 6 +- .../gtrecord/SequentialCubeTupleIterator.java | 4 +- .../apache/kylin/query/ITFailfastQueryTest.java | 4 +- .../kylin/query/enumerator/OLAPQuery.java | 5 +- .../kylin/rest/controller/QueryController.java | 25 +++++ .../kylin/rest/metrics/QueryMetricsFacade.java | 4 +- .../apache/kylin/rest/service/QueryService.java | 18 +-- .../kylin/rest/metrics/QueryMetricsTest.java | 4 +- .../kylin/rest/service/QueryServiceTest.java | 5 +- .../storage/hbase/cube/v2/CubeHBaseRPC.java | 3 +- 12 files changed, 223 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/3dbcf587/core-common/src/main/java/org/apache/kylin/common/QueryContext.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java index 1af90f4..d36b332 100644 --- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java +++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.kylin.common.exceptions.KylinTimeoutException; @@ -39,37 +40,30 @@ public class QueryContext { private static final Logger logger = LoggerFactory.getLogger(QueryContext.class); - private static final ThreadLocal<QueryContext> contexts = new ThreadLocal<QueryContext>() { - @Override - protected QueryContext initialValue() { - return new QueryContext(); - } - }; + public interface QueryStopListener { + void stop(QueryContext query); + } private long queryStartMillis; private long deadline = Long.MAX_VALUE; - private String queryId; + private final String queryId; private String username; private Set<String> groups; private AtomicLong scannedRows = new AtomicLong(); private AtomicLong scannedBytes = new AtomicLong(); + private AtomicBoolean isRunning = new AtomicBoolean(true); + private volatile Throwable throwable; + private String stopReason; + private List<QueryStopListener> stopListeners = Lists.newCopyOnWriteArrayList(); + private List<RPCStatistics> rpcStatisticsList = Lists.newCopyOnWriteArrayList(); private Map<Integer, CubeSegmentStatisticsResult> cubeSegmentStatisticsResultMap = Maps.newConcurrentMap(); - private QueryContext() { - // use QueryContext.current() instead - queryStartMillis = System.currentTimeMillis(); + QueryContext() { queryId = UUID.randomUUID().toString(); - } - - public static QueryContext current() { - return contexts.get(); - } - - public static void reset() { - contexts.remove(); + queryStartMillis = System.currentTimeMillis(); } public long getQueryStartMillis() { @@ -102,8 +96,8 @@ public class QueryContext { return queryId == null ? "" : queryId; } - public void setQueryId(String queryId) { - this.queryId = queryId; + public long getAccumulatedMillis() { + return System.currentTimeMillis() - queryStartMillis; } public String getUsername() { @@ -138,6 +132,48 @@ public class QueryContext { return scannedBytes.addAndGet(deltaBytes); } + public void addQueryStopListener(QueryStopListener listener) { + this.stopListeners.add(listener); + } + + public boolean isStopped() { + return !isRunning.get(); + } + + public String getStopReason() { + return stopReason; + } + + /** + * stop the whole query and related sub threads + */ + public void stop(Throwable t) { + stopQuery(t, t.getMessage()); + } + + /** + * stop the whole query by rest call + */ + public void stopEarly(String reason) { + stopQuery(null, reason); + } + + private void stopQuery(Throwable t, String reason) { + if (isStopped()) { + return; + } + isRunning.set(false); + this.throwable = t; + this.stopReason = reason; + for (QueryStopListener stopListener : stopListeners) { + stopListener.stop(this); + } + } + + public Throwable getThrowable() { + return throwable; + } + public void addContext(int ctxId, String type, boolean ifCube) { Map<String, Map<String, CubeSegmentStatistics>> cubeSegmentStatisticsMap = null; if (ifCube) { http://git-wip-us.apache.org/repos/asf/kylin/blob/3dbcf587/core-common/src/main/java/org/apache/kylin/common/QueryContextManager.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContextManager.java b/core-common/src/main/java/org/apache/kylin/common/QueryContextManager.java new file mode 100644 index 0000000..d08557e --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/QueryContextManager.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.common; + +import java.util.Comparator; +import java.util.List; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +public class QueryContextManager { + + private static final Logger logger = LoggerFactory.getLogger(QueryContextManager.class); + + private static final ConcurrentMap<String, QueryContext> idContextMap = Maps.newConcurrentMap(); + private static final ThreadLocal<QueryContext> contexts = new ThreadLocal<QueryContext>() { + @Override + protected QueryContext initialValue() { + QueryContext queryContext = new QueryContext(); + idContextMap.put(queryContext.getQueryId(), queryContext); + return queryContext; + } + }; + + public static QueryContext current() { + return contexts.get(); + } + + /** + * invoked by program + */ + public static void resetCurrent() { + QueryContext queryContext = contexts.get(); + if (queryContext != null) { + idContextMap.remove(queryContext.getQueryId()); + contexts.remove(); + } + } + + /** + * invoked by user to let query stop early + * @link resetCurrent() should be finally invoked + */ + public static void stopQuery(String queryId, String info) { + QueryContext queryContext = idContextMap.get(queryId); + if (queryContext != null) { + queryContext.stopEarly(info); + } else { + logger.info("the query:{} is not existed", queryId); + } + } + + public static List<QueryContext> getAllRunningQueries() { + // Sort by descending order + TreeSet<QueryContext> queriesSet = new TreeSet<>(new Comparator<QueryContext>() { + @Override + public int compare(QueryContext o1, QueryContext o2) { + if (o2.getAccumulatedMillis() > o1.getAccumulatedMillis()) { + return 1; + } else if (o2.getAccumulatedMillis() < o1.getAccumulatedMillis()) { + return -1; + } else { + return 0; + } + } + }); + + for (QueryContext runningQuery : idContextMap.values()) { + queriesSet.add(runningQuery); + } + return Lists.newArrayList(queriesSet); + } + + /** + * @param runningTime in milliseconds + * @return running queries that have run more than specified time + */ + public static List<QueryContext> getLongRunningQueries(int runningTime) { + List<QueryContext> allRunningQueries = getAllRunningQueries(); + int i = 0; + for (; i < allRunningQueries.size(); i++) { + if (allRunningQueries.get(i).getAccumulatedMillis() < runningTime) { + break; + } + } + return allRunningQueries.subList(0, i); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/3dbcf587/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java index 11ad8bb..483facd 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java @@ -26,7 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.kylin.common.QueryContext; +import org.apache.kylin.common.QueryContextManager; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -161,8 +161,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery { // set whether to aggregate results from multiple partitions enableStreamAggregateIfBeneficial(cuboid, groupsD, context); // set and check query deadline - QueryContext.current().setDeadline(cubeInstance.getConfig().getQueryTimeoutSeconds() * 1000); - QueryContext.current().checkMillisBeforeDeadline(); + QueryContextManager.current().setDeadline(cubeInstance.getConfig().getQueryTimeoutSeconds() * 1000); + QueryContextManager.current().checkMillisBeforeDeadline(); // push down having clause filter if possible TupleFilter havingFilter = checkHavingCanPushDown(sqlDigest.havingFilter, groupsD, sqlDigest.aggregations, http://git-wip-us.apache.org/repos/asf/kylin/blob/3dbcf587/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java index ede5ff9..f45f02b 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java @@ -25,7 +25,7 @@ import java.util.List; import java.util.Set; import java.util.TreeSet; -import org.apache.kylin.common.QueryContext; +import org.apache.kylin.common.QueryContextManager; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.TblColRef; @@ -142,7 +142,7 @@ public class SequentialCubeTupleIterator implements ITupleIterator { @Override public ITuple next() { if (scanCount++ % 100 == 1) { - QueryContext.current().checkMillisBeforeDeadline(); + QueryContextManager.current().checkMillisBeforeDeadline(); } if (++scanCountDelta >= 1000) http://git-wip-us.apache.org/repos/asf/kylin/blob/3dbcf587/kylin-it/src/test/java/org/apache/kylin/query/ITFailfastQueryTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITFailfastQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITFailfastQueryTest.java index 17b804a..e4b8b43 100644 --- a/kylin-it/src/test/java/org/apache/kylin/query/ITFailfastQueryTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/query/ITFailfastQueryTest.java @@ -21,7 +21,7 @@ import java.io.File; import java.util.Map; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.QueryContext; +import org.apache.kylin.common.QueryContextManager; import org.apache.kylin.common.exceptions.ResourceLimitExceededException; import org.apache.kylin.metadata.realization.RealizationType; import org.apache.kylin.query.routing.Candidate; @@ -57,7 +57,7 @@ public class ITFailfastQueryTest extends KylinTestBase { @After public void cleanUp() { - QueryContext.reset(); + QueryContextManager.resetCurrent(); } @AfterClass http://git-wip-us.apache.org/repos/asf/kylin/blob/3dbcf587/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java ---------------------------------------------------------------------- diff --git a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java index f6bd3f8..f0759ab 100644 --- a/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java +++ b/query/src/main/java/org/apache/kylin/query/enumerator/OLAPQuery.java @@ -22,7 +22,7 @@ import org.apache.calcite.DataContext; import org.apache.calcite.linq4j.AbstractEnumerable; import org.apache.calcite.linq4j.Enumerable; import org.apache.calcite.linq4j.Enumerator; -import org.apache.kylin.common.QueryContext; +import org.apache.kylin.common.QueryContextManager; import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.query.relnode.OLAPContext; import org.slf4j.Logger; @@ -49,7 +49,8 @@ public class OLAPQuery extends AbstractEnumerable<Object[]> implements Enumerabl this.type = type; this.contextId = ctxId; - QueryContext.current().addContext(ctxId, type.toString(), type == EnumeratorTypeEnum.OLAP); + QueryContextManager.current().addContext(ctxId, type.toString(), + type == EnumeratorTypeEnum.OLAP); } public OLAPQuery(EnumeratorTypeEnum type, int ctxSeq) { http://git-wip-us.apache.org/repos/asf/kylin/blob/3dbcf587/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java index edb2dd5..b7c5650 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java +++ b/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java @@ -29,6 +29,8 @@ import java.util.Map; import javax.servlet.http.HttpServletResponse; import org.apache.commons.io.IOUtils; +import org.apache.kylin.common.QueryContext; +import org.apache.kylin.common.QueryContextManager; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.metadata.querymeta.SelectedColumnMeta; @@ -172,6 +174,29 @@ public class QueryController extends BasicController { } } + /** + * + * @param runTimeMoreThan in seconds + * @return + */ + @RequestMapping(value = "/query/runningQueries", method = RequestMethod.GET) + @ResponseBody + public List<QueryContext> getRunningQueries(@RequestParam(value = "runTimeMoreThan", required = false, defaultValue = "-1") int runTimeMoreThan) { + if (runTimeMoreThan == -1) { + return QueryContextManager.getAllRunningQueries(); + }else { + return QueryContextManager.getLongRunningQueries(runTimeMoreThan * 1000); + } + } + + @RequestMapping(value = "/query/{queryId}/stop", method = RequestMethod.PUT) + @ResponseBody + public void stopQuery(@PathVariable String queryId) { + final String user = SecurityContextHolder.getContext().getAuthentication().getName(); + logger.info("{} stop the query: {}", new Object[] { user, queryId }); + QueryContextManager.stopQuery(queryId, "stopped by " + user); + } + public void setQueryService(QueryService queryService) { this.queryService = queryService; } http://git-wip-us.apache.org/repos/asf/kylin/blob/3dbcf587/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java index e595804..09ccc07 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java +++ b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java @@ -28,6 +28,8 @@ import org.apache.hadoop.metrics2.MetricsException; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; +import org.apache.kylin.common.QueryContextManager; +import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metrics.MetricsManager; import org.apache.kylin.metrics.lib.impl.RecordEvent; import org.apache.kylin.metrics.lib.impl.TimedRecordEvent; @@ -98,7 +100,7 @@ public class QueryMetricsFacade { if (user == null) { user = "unknown"; } - for (QueryContext.RPCStatistics entry : QueryContext.current().getRpcStatisticsList()) { + for (QueryContext.RPCStatistics entry : QueryContextManager.current().getRpcStatisticsList()) { RecordEvent rpcMetricsEvent = new TimedRecordEvent( KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQueryRpcCall()); setRPCWrapper(rpcMetricsEvent, // http://git-wip-us.apache.org/repos/asf/kylin/blob/3dbcf587/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java ---------------------------------------------------------------------- diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index 311ad9c..8e6642c 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -58,6 +58,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; +import org.apache.kylin.common.QueryContextManager; import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.common.exceptions.ResourceLimitExceededException; import org.apache.kylin.common.htrace.HtraceInit; @@ -273,7 +274,7 @@ public class QueryService extends BasicService { return queries; } - public void logQuery(final SQLRequest request, final SQLResponse response) { + public void logQuery(final String queryId, final SQLRequest request, final SQLResponse response) { final String user = aclEvaluate.getCurrentUserName(); final List<String> realizationNames = new LinkedList<>(); final Set<Long> cuboidIds = new HashSet<Long>(); @@ -305,7 +306,7 @@ public class QueryService extends BasicService { StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append(newLine); stringBuilder.append("==========================[QUERY]===============================").append(newLine); - stringBuilder.append("Query Id: ").append(QueryContext.current().getQueryId()).append(newLine); + stringBuilder.append("Query Id: ").append(queryId).append(newLine); stringBuilder.append("SQL: ").append(request.getSql()).append(newLine); stringBuilder.append("User: ").append(user).append(newLine); stringBuilder.append("Success: ").append((null == response.getExceptionMessage())).append(newLine); @@ -407,7 +408,7 @@ public class QueryService extends BasicService { if (sqlRequest.getBackdoorToggles() != null) BackdoorToggles.addToggles(sqlRequest.getBackdoorToggles()); - final QueryContext queryContext = QueryContext.current(); + final QueryContext queryContext = QueryContextManager.current(); TraceScope scope = null; if (kylinConfig.isHtraceTracingEveryQuery() || BackdoorToggles.getHtraceEnabled()) { @@ -504,7 +505,7 @@ public class QueryService extends BasicService { long durationThreshold = kylinConfig.getQueryDurationCacheThreshold(); long scanCountThreshold = kylinConfig.getQueryScanCountCacheThreshold(); long scanBytesThreshold = kylinConfig.getQueryScanBytesCacheThreshold(); - sqlResponse.setDuration(System.currentTimeMillis() - startTime); + sqlResponse.setDuration(queryContext.getAccumulatedMillis()); logger.info("Stats of SQL response: isException: {}, duration: {}, total scan count {}", // String.valueOf(sqlResponse.getIsException()), String.valueOf(sqlResponse.getDuration()), String.valueOf(sqlResponse.getTotalScanCount())); @@ -620,7 +621,7 @@ public class QueryService extends BasicService { conn = QueryConnection.getConnection(sqlRequest.getProject()); String userInfo = SecurityContextHolder.getContext().getAuthentication().getName(); - QueryContext context = QueryContext.current(); + QueryContext context = QueryContextManager.current(); context.setUsername(userInfo); context.setGroups(AclPermissionUtil.getCurrentUserGroups()); final Collection<? extends GrantedAuthority> grantedAuthorities = SecurityContextHolder.getContext() @@ -1042,6 +1043,7 @@ public class QueryService extends BasicService { boolean isPartialResult = false; StringBuilder cubeSb = new StringBuilder(); StringBuilder logSb = new StringBuilder("Processed rows for each storageContext: "); + QueryContext queryContext = QueryContextManager.current(); if (OLAPContext.getThreadLocalContexts() != null) { // contexts can be null in case of 'explain plan for' for (OLAPContext ctx : OLAPContext.getThreadLocalContexts()) { if (ctx.realization != null) { @@ -1052,14 +1054,16 @@ public class QueryService extends BasicService { cubeSb.append(ctx.realization.getCanonicalName()); logSb.append(ctx.storageContext.getProcessedRowCount()).append(" "); } + queryContext.setContextRealization(ctx.id, realizationName, realizationType); } } logger.info(logSb.toString()); SQLResponse response = new SQLResponse(columnMetas, results, cubeSb.toString(), 0, false, null, isPartialResult, isPushDown); - response.setTotalScanCount(QueryContext.current().getScannedRows()); - response.setTotalScanBytes(QueryContext.current().getScannedBytes()); + response.setTotalScanCount(queryContext.getScannedRows()); + response.setTotalScanBytes(queryContext.getScannedBytes()); + response.setCubeSegmentStatisticsList(queryContext.getCubeSegmentStatisticsResultList()); return response; } http://git-wip-us.apache.org/repos/asf/kylin/blob/3dbcf587/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java index e23fc20..d4a16f8 100644 --- a/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java +++ b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java @@ -26,6 +26,7 @@ import javax.management.MBeanServer; import javax.management.ObjectName; import org.apache.kylin.common.QueryContext; +import org.apache.kylin.common.QueryContextManager; import org.apache.kylin.rest.request.SQLRequest; import org.apache.kylin.rest.response.SQLResponse; import org.apache.kylin.rest.service.ServiceTestBase; @@ -121,6 +122,8 @@ public class QueryMetricsTest extends ServiceTestBase { sqlRequest.setSql("select * from TEST_KYLIN_FACT"); sqlRequest.setProject("default"); + QueryContext context = QueryContextManager.current(); + SQLResponse sqlResponse = new SQLResponse(); sqlResponse.setDuration(10); sqlResponse.setCube("test_cube"); @@ -138,7 +141,6 @@ public class QueryMetricsTest extends ServiceTestBase { sqlResponse.setResults(results); sqlResponse.setStorageCacheUsed(true); - QueryContext context = QueryContext.current(); int ctxId = 0; context.addContext(ctxId, "OLAP", true); context.addRPCStatistics(ctxId, "sandbox", "test_cube", "20100101000000_20150101000000", 3L, 3L, 3L, null, 80L, http://git-wip-us.apache.org/repos/asf/kylin/blob/3dbcf587/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java ---------------------------------------------------------------------- diff --git a/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java index 7dc9994..061e622 100644 --- a/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java +++ b/server/src/test/java/org/apache/kylin/rest/service/QueryServiceTest.java @@ -21,6 +21,8 @@ package org.apache.kylin.rest.service; import java.io.IOException; import java.sql.SQLException; +import org.apache.kylin.common.QueryContext; +import org.apache.kylin.common.QueryContextManager; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.job.exception.JobException; import org.apache.kylin.metadata.project.ProjectInstance; @@ -63,9 +65,10 @@ public class QueryServiceTest extends ServiceTestBase { SQLRequest request = new SQLRequest(); request.setSql("select * from test_table"); request.setAcceptPartial(true); + QueryContext queryContext = QueryContextManager.current(); SQLResponse response = new SQLResponse(); response.setHitExceptionCache(true); - queryService.logQuery(request, response); + queryService.logQuery(queryContext.getQueryId(), request, response); } @Test http://git-wip-us.apache.org/repos/asf/kylin/blob/3dbcf587/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java index 6b4ac32..c660cad 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.FuzzyRowFilter; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; +import org.apache.kylin.common.QueryContextManager; import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.ImmutableBitSet; @@ -76,7 +77,7 @@ public abstract class CubeHBaseRPC implements IGTStorage { this.cubeSeg = (CubeSegment) segment; this.cuboid = cuboid; this.fullGTInfo = fullGTInfo; - this.queryContext = QueryContext.current(); + this.queryContext = QueryContextManager.current(); this.storageContext = context; this.fuzzyKeyEncoder = new FuzzyKeyEncoder(cubeSeg, cuboid);
