This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit eb65877a0556d0f45bb7876b10a924304053ef97 Author: nichunen <[email protected]> AuthorDate: Thu Jan 23 11:49:21 2020 +0800 Revert "KYLIN-4305 Streaming Receiver cannot limit income query request or cancel long-running query" This reverts commit 61e319b524464ed2cba21fe659d057362a954b6d. --- .../org/apache/kylin/common/KylinConfigBase.java | 7 +- .../stream/rpc/HttpStreamDataSearchClient.java | 16 ++-- .../kylin/stream/core/model/DataRequest.java | 9 -- .../core/query/MultiThreadsResultCollector.java | 106 +++++++-------------- .../core/query/StreamingCubeDataSearcher.java | 17 ++-- .../stream/core/query/StreamingSearchContext.java | 11 --- .../server/rest/controller/DataController.java | 4 +- 7 files changed, 49 insertions(+), 121 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 9de676b..d9561fd 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -55,7 +55,6 @@ import java.util.regex.Pattern; public abstract class KylinConfigBase implements Serializable { private static final long serialVersionUID = 1L; private static final Logger logger = LoggerFactory.getLogger(KylinConfigBase.class); - private static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors(); private static final String FALSE = "false"; private static final String TRUE = "true"; @@ -2347,13 +2346,11 @@ public abstract class KylinConfigBase implements Serializable { } public int getStreamingReceiverQueryCoreThreads() { - int def = getStreamingReceiverQueryMaxThreads() - 1; - return Integer.parseInt(getOptional("kylin.stream.receiver.query-core-threads", def + "")); + return Integer.parseInt(getOptional("kylin.stream.receiver.query-core-threads", "50")); } public int getStreamingReceiverQueryMaxThreads() { - int def = Math.max(2, AVAILABLE_PROCESSORS - 1); - return Integer.parseInt(getOptional("kylin.stream.receiver.query-max-threads", def + "")); + return Integer.parseInt(getOptional("kylin.stream.receiver.query-max-threads", "200")); } public int getStreamingReceiverUseThreadsPerQuery() { diff --git a/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java b/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java index e1119a8..bc075ce 100644 --- a/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java +++ b/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java @@ -72,11 +72,10 @@ import com.google.common.collect.Sets; */ public class HttpStreamDataSearchClient implements IStreamDataSearchClient { public static final Logger logger = LoggerFactory.getLogger(HttpStreamDataSearchClient.class); - public static final long WAIT_DURATION = 2 * 60000 ; private static ExecutorService executorService; static { - executorService = new ThreadPoolExecutor(20, 100, 60L, TimeUnit.SECONDS, + executorService = new ThreadPoolExecutor(20, 100, 60L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("stream-rpc-pool-t")); } private AssignmentsCache assignmentsCache; @@ -96,7 +95,7 @@ public class HttpStreamDataSearchClient implements IStreamDataSearchClient { final TupleFilter tupleFilter, final Set<TblColRef> dimensions, final Set<TblColRef> groups, final Set<FunctionDesc> metrics, final int storagePushDownLimit, final boolean allowStorageAggregation) { List<ReplicaSet> replicaSetsOfCube = assignmentsCache.getReplicaSetsByCube(cube.getName()); - int timeout = cube.getConfig().getStreamingRPCHttpReadTimeout() * 2; + int timeout = 120 * 1000; // timeout should be configurable final QueuedStreamingTupleIterator result = new QueuedStreamingTupleIterator(replicaSetsOfCube.size(), timeout); final QueryContext query = QueryContextFacade.current(); @@ -106,6 +105,7 @@ public class HttpStreamDataSearchClient implements IStreamDataSearchClient { final RecordsSerializer recordsSerializer = new RecordsSerializer(schema); final DataRequest dataRequest = createDataRequest(query.getQueryId(), cube.getName(), minSegmentTime, tupleInfo, tupleFilter, dimensions, groups, metrics, storagePushDownLimit, allowStorageAggregation); + logger.info("Query-{}:send request to stream receivers", query.getQueryId()); for (final ReplicaSet rs : replicaSetsOfCube) { executorService.submit(new Runnable() { @@ -165,7 +165,7 @@ public class HttpStreamDataSearchClient implements IStreamDataSearchClient { return foundReceiver; } - if (System.currentTimeMillis() - lastFailTime > WAIT_DURATION) { // retry every 2 minutes + if (System.currentTimeMillis() - lastFailTime > 2 * 60 * 1000) { // retry every 2 minutes return foundReceiver; } @@ -175,18 +175,18 @@ public class HttpStreamDataSearchClient implements IStreamDataSearchClient { public Iterator<ITuple> doSearch(DataRequest dataRequest, CubeInstance cube, StreamingTupleConverter tupleConverter, RecordsSerializer recordsSerializer, Node receiver, TupleInfo tupleInfo) throws Exception { String queryId = dataRequest.getQueryId(); + logger.info("send query to receiver " + receiver + " with query id:" + queryId); String url = "http://" + receiver.getHost() + ":" + receiver.getPort() + "/kylin/api/data/query"; try { - int connTimeout = cube.getConfig().getStreamingRPCHttpConnTimeout(); - int readTimeout = cube.getConfig().getStreamingRPCHttpReadTimeout(); - dataRequest.setDeadline(System.currentTimeMillis() + (int)(readTimeout * 1.5)); String content = JsonUtil.writeValueAsString(dataRequest); Stopwatch sw = new Stopwatch(); sw.start(); + int connTimeout = cube.getConfig().getStreamingRPCHttpConnTimeout(); + int readTimeout = cube.getConfig().getStreamingRPCHttpReadTimeout(); String msg = restService.postRequest(url, content, connTimeout, readTimeout); - logger.info("query-{}: receive response from {} take time:{}", queryId, receiver, sw.elapsed(TimeUnit.MILLISECONDS)); + logger.info("query-{}: receive response from {} take time:{}", queryId, receiver, sw.elapsedMillis()); if (failedReceivers.containsKey(receiver)) { failedReceivers.remove(receiver); } diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java b/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java index f32b751..07c9028 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java @@ -36,7 +36,6 @@ public class DataRequest { private boolean allowStorageAggregation; private long requestSendTime; - private long deadline; private boolean enableDetailProfile; private String storageBehavior; @@ -143,12 +142,4 @@ public class DataRequest { public void setHavingFilter(String havingFilter) { this.havingFilter = havingFilter; } - - public long getDeadline() { - return deadline; - } - - public void setDeadline(long deadline) { - this.deadline = deadline; - } } diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java b/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java index 46f52a5..5ffe5f2 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java @@ -14,19 +14,18 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - */ +*/ package org.apache.kylin.stream.core.query; import java.util.Iterator; import java.util.List; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Future; +import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.kylin.common.KylinConfig; @@ -39,39 +38,28 @@ import com.google.common.collect.Lists; public class MultiThreadsResultCollector extends ResultCollector { private static Logger logger = LoggerFactory.getLogger(MultiThreadsResultCollector.class); - private static ThreadPoolExecutor scannerThreadPool; - private static int MAX_RUNNING_THREAD_COUNT; - + private static ExecutorService executor; static { KylinConfig config = KylinConfig.getInstanceFromEnv(); - MAX_RUNNING_THREAD_COUNT = config.getStreamingReceiverQueryMaxThreads(); - scannerThreadPool = new ThreadPoolExecutor(config.getStreamingReceiverQueryCoreThreads(), - MAX_RUNNING_THREAD_COUNT, 60L, TimeUnit.SECONDS, - new LinkedBlockingQueue<>(), new NamedThreadFactory("query-worker")); + executor = new ThreadPoolExecutor(config.getStreamingReceiverQueryCoreThreads(), + config.getStreamingReceiverQueryMaxThreads(), 60L, TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("query-worker")); } - private long deadline; - private String queryId; - /** - * if current query beyond the deadline - */ - private AtomicBoolean cancelFlag = new AtomicBoolean(false); + private int timeout; private Semaphore workersSemaphore; + final BlockingQueue<Record> queue = new LinkedBlockingQueue<>(10000); private AtomicInteger notCompletedWorkers; - final BlockingQueue<Record> recordCachePool = new LinkedBlockingQueue<>(10000); - - public MultiThreadsResultCollector(int numOfWorkers, long deadline) { + public MultiThreadsResultCollector(int numOfWorkers, int timeout) { this.workersSemaphore = new Semaphore(numOfWorkers); - this.deadline = deadline; - this.queryId = StreamingQueryProfile.get().getQueryId(); + this.timeout = timeout; } @Override public Iterator<Record> iterator() { notCompletedWorkers = new AtomicInteger(searchResults.size()); - Thread masterThread = new Thread(new WorkSubmitter(), "MultiThreadsResultCollector_" + queryId); - masterThread.start(); + executor.submit(new WorkSubmitter()); final int batchSize = 100; final long startTime = System.currentTimeMillis(); @@ -81,41 +69,38 @@ public class MultiThreadsResultCollector extends ResultCollector { @Override public boolean hasNext() { - boolean exits = (internalIT.hasNext() || !recordCachePool.isEmpty()); + boolean exits = (internalIT.hasNext() || queue.size() > 0); if (!exits) { while (notCompletedWorkers.get() > 0) { Thread.yield(); - if (System.currentTimeMillis() > deadline) { - masterThread.interrupt(); // notify main thread - cancelFlag.set(true); - logger.warn("Beyond the deadline for {}.", queryId); + long takeTime = System.currentTimeMillis() - startTime; + if (takeTime > timeout) { throw new RuntimeException("Timeout when iterate search result"); } - if (internalIT.hasNext() || !recordCachePool.isEmpty()) { + if (internalIT.hasNext() || queue.size() > 0) { return true; } } } + return exits; } @Override public Record next() { try { - if (System.currentTimeMillis() > deadline) { + long takeTime = System.currentTimeMillis() - startTime; + if (takeTime > timeout) { throw new RuntimeException("Timeout when iterate search result"); } if (!internalIT.hasNext()) { recordList.clear(); - Record one = recordCachePool.poll(deadline - startTime, TimeUnit.MILLISECONDS); + Record one = queue.poll(timeout - takeTime, TimeUnit.MILLISECONDS); if (one == null) { - masterThread.interrupt(); // notify main thread - cancelFlag.set(true); - logger.warn("Beyond the deadline for {}.", queryId); throw new RuntimeException("Timeout when iterate search result"); } recordList.add(one); - recordCachePool.drainTo(recordList, batchSize - 1); + queue.drainTo(recordList, batchSize - 1); internalIT = recordList.iterator(); } return internalIT.next(); @@ -143,13 +128,15 @@ public class MultiThreadsResultCollector extends ResultCollector { try { result.startRead(); for (Record record : result) { - recordCachePool.put(record.copy()); + try { + queue.put(record.copy()); + } catch (InterruptedException e) { + throw new RuntimeException("Timeout when visiting streaming segmenent", e); + } } result.endRead(); - } catch (InterruptedException inter) { - logger.warn("Cancelled scan streaming segment", inter); } catch (Exception e) { - logger.error("Error when iterate search result", e); + logger.error("error when iterate search result", e); } finally { notCompletedWorkers.decrementAndGet(); workersSemaphore.release(); @@ -160,44 +147,15 @@ public class MultiThreadsResultCollector extends ResultCollector { private class WorkSubmitter implements Runnable { @Override public void run() { - List<Future> futureList = Lists.newArrayListWithExpectedSize(searchResults.size()); - int cancelTimes = 0; - try { - for (final IStreamingSearchResult result : searchResults) { - Future f = scannerThreadPool.submit(new ResultIterateWorker(result)); - futureList.add(f); - workersSemaphore.acquire(); // Throw InterruptedException when interrupted - } - while (notCompletedWorkers.get() > 0) { - Thread.sleep(100); - if (cancelFlag.get() || Thread.currentThread().isInterrupted()) { - break; - } - } - } catch (InterruptedException inter) { - logger.warn("Interrupted", inter); - } finally { - for (Future f : futureList) { - if (!f.isCancelled() || !f.isDone()) { - if (f.cancel(true)) { - cancelTimes++; - } - } + for (final IStreamingSearchResult result : searchResults) { + executor.submit(new ResultIterateWorker(result)); + try { + workersSemaphore.acquire(); + } catch (InterruptedException e) { + logger.error("interrupted", e); } } - logger.debug("Finish MultiThreadsResultCollector for queryId {}, cancel {}. Current thread pool: {}.", - queryId, cancelTimes, scannerThreadPool); } } - /** - * block query if return true - */ - public static boolean isOccupied() { - boolean occupied = scannerThreadPool.getActiveCount() >= MAX_RUNNING_THREAD_COUNT; - if (occupied) { - logger.debug("ThreadPool {}", scannerThreadPool); - } - return occupied; - } } diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingCubeDataSearcher.java b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingCubeDataSearcher.java index 1d38b90..f89ddec 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingCubeDataSearcher.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingCubeDataSearcher.java @@ -43,6 +43,9 @@ import org.slf4j.LoggerFactory; public class StreamingCubeDataSearcher { private static Logger logger = LoggerFactory.getLogger(StreamingCubeDataSearcher.class); + + private static int TIMEOUT = Integer.MAX_VALUE; + private StreamingSegmentManager streamingSegmentManager; private String cubeName; private CubeDesc cubeDesc; @@ -70,15 +73,7 @@ public class StreamingCubeDataSearcher { try { logger.info("query-{}: use cuboid {} to serve the query", queryProfile.getQueryId(), searchRequest.getHitCuboid()); - ResultCollector resultCollector = getResultCollector(searchRequest); - if (resultCollector instanceof MultiThreadsResultCollector) { - while (MultiThreadsResultCollector.isOccupied() && System.currentTimeMillis() < searchRequest.getDeadline()) { - Thread.sleep(50); - } - if (System.currentTimeMillis() >= searchRequest.getDeadline()) { - throw new RuntimeException("Timeout for " + queryProfile.getQueryId()); - } - } + ResultCollector resultCollector = getResultCollector(); Collection<StreamingCubeSegment> segments = streamingSegmentManager.getAllSegments(); StreamingDataQueryPlanner scanRangePlanner = searchRequest.getQueryPlanner(); for (StreamingCubeSegment queryableSegment : segments) { @@ -110,10 +105,10 @@ public class StreamingCubeDataSearcher { } } - private ResultCollector getResultCollector(StreamingSearchContext searchRequest) { + private ResultCollector getResultCollector() { int useThreads = cubeDesc.getConfig().getStreamingReceiverUseThreadsPerQuery(); if (useThreads > 1) { - return new MultiThreadsResultCollector(useThreads, searchRequest.getDeadline()); + return new MultiThreadsResultCollector(useThreads, TIMEOUT); } else { return new SingleThreadResultCollector(); } diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingSearchContext.java b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingSearchContext.java index 307c2be..9da8e5b 100644 --- a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingSearchContext.java +++ b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingSearchContext.java @@ -45,8 +45,6 @@ public class StreamingSearchContext { private long hitCuboid; private long basicCuboid; - private long deadline = Long.MAX_VALUE; - public StreamingSearchContext(CubeDesc cubeDesc, Set<TblColRef> dimensions, Set<TblColRef> groups, Set<FunctionDesc> metrics, TupleFilter filter, TupleFilter havingFilter) { this.cubeDesc = cubeDesc; @@ -58,7 +56,6 @@ public class StreamingSearchContext { this.respResultSchema = new ResponseResultSchema(cubeDesc, dimensions, metrics); this.queryPlanner = new StreamingDataQueryPlanner(cubeDesc, filter); this.addedGroups = Sets.newHashSet(); - this.deadline = deadline; calculateHitCuboid(); } @@ -161,12 +158,4 @@ public class StreamingSearchContext { sortedSet.addAll(cubeDesc.getMandatoryCuboids()); return sortedSet; } - - public long getDeadline() { - return deadline; - } - - public void setDeadline(long deadline) { - this.deadline = deadline; - } } diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java index 51d0cba..45c6307 100644 --- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java +++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java @@ -24,7 +24,6 @@ import java.util.Set; import org.apache.commons.codec.binary.Base64; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Pair; -import org.apache.kylin.common.util.SetThreadName; import org.apache.kylin.cube.CubeDescManager; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.gridtable.StorageSideBehavior; @@ -80,7 +79,7 @@ public class DataController extends BasicController { } StreamingQueryProfile.set(queryProfile); logger.info("receive query request queryId:{}", queryId); - try (SetThreadName changeName = new SetThreadName("Query %s", queryId)) { + try { final Stopwatch sw = new Stopwatch(); sw.start(); String cubeName = dataRequest.getCubeName(); @@ -106,7 +105,6 @@ public class DataController extends BasicController { StreamingSearchContext gtSearchRequest = new StreamingSearchContext(cubeDesc, dimensions, groups, metrics, tupleFilter, havingFilter); - gtSearchRequest.setDeadline(dataRequest.getDeadline()); searchResult = dataSearcher.doSearch(gtSearchRequest, minSegmentTime, dataRequest.isAllowStorageAggregation());
