This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 61e319b KYLIN-4305 Streaming Receiver cannot limit income query
request or cancel long-running query
61e319b is described below
commit 61e319b524464ed2cba21fe659d057362a954b6d
Author: XiaoxiangYu <[email protected]>
AuthorDate: Thu Dec 19 21:15:44 2019 +0800
KYLIN-4305 Streaming Receiver cannot limit income query request or cancel
long-running query
---
.../org/apache/kylin/common/KylinConfigBase.java | 7 +-
.../stream/rpc/HttpStreamDataSearchClient.java | 16 ++--
.../kylin/stream/core/model/DataRequest.java | 9 ++
.../core/query/MultiThreadsResultCollector.java | 106 ++++++++++++++-------
.../core/query/StreamingCubeDataSearcher.java | 17 ++--
.../stream/core/query/StreamingSearchContext.java | 11 +++
.../server/rest/controller/DataController.java | 4 +-
7 files changed, 121 insertions(+), 49 deletions(-)
diff --git
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index d9561fd..9de676b 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -55,6 +55,7 @@ import java.util.regex.Pattern;
public abstract class KylinConfigBase implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger logger =
LoggerFactory.getLogger(KylinConfigBase.class);
+ private static final int AVAILABLE_PROCESSORS =
Runtime.getRuntime().availableProcessors();
private static final String FALSE = "false";
private static final String TRUE = "true";
@@ -2346,11 +2347,13 @@ public abstract class KylinConfigBase implements
Serializable {
}
public int getStreamingReceiverQueryCoreThreads() {
- return
Integer.parseInt(getOptional("kylin.stream.receiver.query-core-threads", "50"));
+ int def = getStreamingReceiverQueryMaxThreads() - 1;
+ return
Integer.parseInt(getOptional("kylin.stream.receiver.query-core-threads", def +
""));
}
public int getStreamingReceiverQueryMaxThreads() {
- return
Integer.parseInt(getOptional("kylin.stream.receiver.query-max-threads", "200"));
+ int def = Math.max(2, AVAILABLE_PROCESSORS - 1);
+ return
Integer.parseInt(getOptional("kylin.stream.receiver.query-max-threads", def +
""));
}
public int getStreamingReceiverUseThreadsPerQuery() {
diff --git
a/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
b/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
index bc075ce..e1119a8 100644
---
a/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
+++
b/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
@@ -72,10 +72,11 @@ import com.google.common.collect.Sets;
*/
public class HttpStreamDataSearchClient implements IStreamDataSearchClient {
public static final Logger logger =
LoggerFactory.getLogger(HttpStreamDataSearchClient.class);
+ public static final long WAIT_DURATION = 2 * 60000 ;
private static ExecutorService executorService;
static {
- executorService = new ThreadPoolExecutor(20, 100, 60L,
TimeUnit.MILLISECONDS,
+ executorService = new ThreadPoolExecutor(20, 100, 60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), new
NamedThreadFactory("stream-rpc-pool-t"));
}
private AssignmentsCache assignmentsCache;
@@ -95,7 +96,7 @@ public class HttpStreamDataSearchClient implements
IStreamDataSearchClient {
final TupleFilter tupleFilter, final Set<TblColRef> dimensions,
final Set<TblColRef> groups,
final Set<FunctionDesc> metrics, final int storagePushDownLimit,
final boolean allowStorageAggregation) {
List<ReplicaSet> replicaSetsOfCube =
assignmentsCache.getReplicaSetsByCube(cube.getName());
- int timeout = 120 * 1000; // timeout should be configurable
+ int timeout = cube.getConfig().getStreamingRPCHttpReadTimeout() * 2;
final QueuedStreamingTupleIterator result = new
QueuedStreamingTupleIterator(replicaSetsOfCube.size(), timeout);
final QueryContext query = QueryContextFacade.current();
@@ -105,7 +106,6 @@ public class HttpStreamDataSearchClient implements
IStreamDataSearchClient {
final RecordsSerializer recordsSerializer = new
RecordsSerializer(schema);
final DataRequest dataRequest = createDataRequest(query.getQueryId(),
cube.getName(), minSegmentTime, tupleInfo,
tupleFilter, dimensions, groups, metrics,
storagePushDownLimit, allowStorageAggregation);
-
logger.info("Query-{}:send request to stream receivers",
query.getQueryId());
for (final ReplicaSet rs : replicaSetsOfCube) {
executorService.submit(new Runnable() {
@@ -165,7 +165,7 @@ public class HttpStreamDataSearchClient implements
IStreamDataSearchClient {
return foundReceiver;
}
- if (System.currentTimeMillis() - lastFailTime > 2 * 60 * 1000) { //
retry every 2 minutes
+ if (System.currentTimeMillis() - lastFailTime > WAIT_DURATION) { //
retry every 2 minutes
return foundReceiver;
}
@@ -175,18 +175,18 @@ public class HttpStreamDataSearchClient implements
IStreamDataSearchClient {
public Iterator<ITuple> doSearch(DataRequest dataRequest, CubeInstance
cube, StreamingTupleConverter tupleConverter,
RecordsSerializer recordsSerializer, Node receiver, TupleInfo
tupleInfo) throws Exception {
String queryId = dataRequest.getQueryId();
- logger.info("send query to receiver " + receiver + " with query id:" +
queryId);
String url = "http://" + receiver.getHost() + ":" + receiver.getPort()
+ "/kylin/api/data/query";
try {
+ int connTimeout =
cube.getConfig().getStreamingRPCHttpConnTimeout();
+ int readTimeout =
cube.getConfig().getStreamingRPCHttpReadTimeout();
+ dataRequest.setDeadline(System.currentTimeMillis() +
(int)(readTimeout * 1.5));
String content = JsonUtil.writeValueAsString(dataRequest);
Stopwatch sw = new Stopwatch();
sw.start();
- int connTimeout =
cube.getConfig().getStreamingRPCHttpConnTimeout();
- int readTimeout =
cube.getConfig().getStreamingRPCHttpReadTimeout();
String msg = restService.postRequest(url, content, connTimeout,
readTimeout);
- logger.info("query-{}: receive response from {} take time:{}",
queryId, receiver, sw.elapsedMillis());
+ logger.info("query-{}: receive response from {} take time:{}",
queryId, receiver, sw.elapsed(TimeUnit.MILLISECONDS));
if (failedReceivers.containsKey(receiver)) {
failedReceivers.remove(receiver);
}
diff --git
a/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java
b/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java
index 07c9028..f32b751 100644
---
a/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java
+++
b/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java
@@ -36,6 +36,7 @@ public class DataRequest {
private boolean allowStorageAggregation;
private long requestSendTime;
+ private long deadline;
private boolean enableDetailProfile;
private String storageBehavior;
@@ -142,4 +143,12 @@ public class DataRequest {
public void setHavingFilter(String havingFilter) {
this.havingFilter = havingFilter;
}
+
+ public long getDeadline() {
+ return deadline;
+ }
+
+ public void setDeadline(long deadline) {
+ this.deadline = deadline;
+ }
}
diff --git
a/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java
b/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java
index 5ffe5f2..46f52a5 100644
---
a/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java
+++
b/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java
@@ -14,18 +14,19 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
package org.apache.kylin.stream.core.query;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kylin.common.KylinConfig;
@@ -38,28 +39,39 @@ import com.google.common.collect.Lists;
public class MultiThreadsResultCollector extends ResultCollector {
private static Logger logger =
LoggerFactory.getLogger(MultiThreadsResultCollector.class);
- private static ExecutorService executor;
+ private static ThreadPoolExecutor scannerThreadPool;
+ private static int MAX_RUNNING_THREAD_COUNT;
+
static {
KylinConfig config = KylinConfig.getInstanceFromEnv();
- executor = new
ThreadPoolExecutor(config.getStreamingReceiverQueryCoreThreads(),
- config.getStreamingReceiverQueryMaxThreads(), 60L,
TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(), new
NamedThreadFactory("query-worker"));
+ MAX_RUNNING_THREAD_COUNT =
config.getStreamingReceiverQueryMaxThreads();
+ scannerThreadPool = new
ThreadPoolExecutor(config.getStreamingReceiverQueryCoreThreads(),
+ MAX_RUNNING_THREAD_COUNT, 60L, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(), new
NamedThreadFactory("query-worker"));
}
- private int timeout;
+ private long deadline;
+ private String queryId;
+ /**
+ * if current query beyond the deadline
+ */
+ private AtomicBoolean cancelFlag = new AtomicBoolean(false);
private Semaphore workersSemaphore;
- final BlockingQueue<Record> queue = new LinkedBlockingQueue<>(10000);
private AtomicInteger notCompletedWorkers;
- public MultiThreadsResultCollector(int numOfWorkers, int timeout) {
+ final BlockingQueue<Record> recordCachePool = new
LinkedBlockingQueue<>(10000);
+
+ public MultiThreadsResultCollector(int numOfWorkers, long deadline) {
this.workersSemaphore = new Semaphore(numOfWorkers);
- this.timeout = timeout;
+ this.deadline = deadline;
+ this.queryId = StreamingQueryProfile.get().getQueryId();
}
@Override
public Iterator<Record> iterator() {
notCompletedWorkers = new AtomicInteger(searchResults.size());
- executor.submit(new WorkSubmitter());
+ Thread masterThread = new Thread(new WorkSubmitter(),
"MultiThreadsResultCollector_" + queryId);
+ masterThread.start();
final int batchSize = 100;
final long startTime = System.currentTimeMillis();
@@ -69,38 +81,41 @@ public class MultiThreadsResultCollector extends
ResultCollector {
@Override
public boolean hasNext() {
- boolean exits = (internalIT.hasNext() || queue.size() > 0);
+ boolean exits = (internalIT.hasNext() ||
!recordCachePool.isEmpty());
if (!exits) {
while (notCompletedWorkers.get() > 0) {
Thread.yield();
- long takeTime = System.currentTimeMillis() - startTime;
- if (takeTime > timeout) {
+ if (System.currentTimeMillis() > deadline) {
+ masterThread.interrupt(); // notify main thread
+ cancelFlag.set(true);
+ logger.warn("Beyond the deadline for {}.",
queryId);
throw new RuntimeException("Timeout when iterate
search result");
}
- if (internalIT.hasNext() || queue.size() > 0) {
+ if (internalIT.hasNext() ||
!recordCachePool.isEmpty()) {
return true;
}
}
}
-
return exits;
}
@Override
public Record next() {
try {
- long takeTime = System.currentTimeMillis() - startTime;
- if (takeTime > timeout) {
+ if (System.currentTimeMillis() > deadline) {
throw new RuntimeException("Timeout when iterate
search result");
}
if (!internalIT.hasNext()) {
recordList.clear();
- Record one = queue.poll(timeout - takeTime,
TimeUnit.MILLISECONDS);
+ Record one = recordCachePool.poll(deadline -
startTime, TimeUnit.MILLISECONDS);
if (one == null) {
+ masterThread.interrupt(); // notify main thread
+ cancelFlag.set(true);
+ logger.warn("Beyond the deadline for {}.",
queryId);
throw new RuntimeException("Timeout when iterate
search result");
}
recordList.add(one);
- queue.drainTo(recordList, batchSize - 1);
+ recordCachePool.drainTo(recordList, batchSize - 1);
internalIT = recordList.iterator();
}
return internalIT.next();
@@ -128,15 +143,13 @@ public class MultiThreadsResultCollector extends
ResultCollector {
try {
result.startRead();
for (Record record : result) {
- try {
- queue.put(record.copy());
- } catch (InterruptedException e) {
- throw new RuntimeException("Timeout when visiting
streaming segmenent", e);
- }
+ recordCachePool.put(record.copy());
}
result.endRead();
+ } catch (InterruptedException inter) {
+ logger.warn("Cancelled scan streaming segment", inter);
} catch (Exception e) {
- logger.error("error when iterate search result", e);
+ logger.error("Error when iterate search result", e);
} finally {
notCompletedWorkers.decrementAndGet();
workersSemaphore.release();
@@ -147,15 +160,44 @@ public class MultiThreadsResultCollector extends
ResultCollector {
private class WorkSubmitter implements Runnable {
@Override
public void run() {
- for (final IStreamingSearchResult result : searchResults) {
- executor.submit(new ResultIterateWorker(result));
- try {
- workersSemaphore.acquire();
- } catch (InterruptedException e) {
- logger.error("interrupted", e);
+ List<Future> futureList =
Lists.newArrayListWithExpectedSize(searchResults.size());
+ int cancelTimes = 0;
+ try {
+ for (final IStreamingSearchResult result : searchResults) {
+ Future f = scannerThreadPool.submit(new
ResultIterateWorker(result));
+ futureList.add(f);
+ workersSemaphore.acquire(); // Throw InterruptedException
when interrupted
+ }
+ while (notCompletedWorkers.get() > 0) {
+ Thread.sleep(100);
+ if (cancelFlag.get() ||
Thread.currentThread().isInterrupted()) {
+ break;
+ }
+ }
+ } catch (InterruptedException inter) {
+ logger.warn("Interrupted", inter);
+ } finally {
+ for (Future f : futureList) {
+ if (!f.isCancelled() || !f.isDone()) {
+ if (f.cancel(true)) {
+ cancelTimes++;
+ }
+ }
}
}
+ logger.debug("Finish MultiThreadsResultCollector for queryId {},
cancel {}. Current thread pool: {}.",
+ queryId, cancelTimes, scannerThreadPool);
}
}
+ /**
+ * block query if return true
+ */
+ public static boolean isOccupied() {
+ boolean occupied = scannerThreadPool.getActiveCount() >=
MAX_RUNNING_THREAD_COUNT;
+ if (occupied) {
+ logger.debug("ThreadPool {}", scannerThreadPool);
+ }
+ return occupied;
+ }
}
diff --git
a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingCubeDataSearcher.java
b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingCubeDataSearcher.java
index f89ddec..1d38b90 100644
---
a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingCubeDataSearcher.java
+++
b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingCubeDataSearcher.java
@@ -43,9 +43,6 @@ import org.slf4j.LoggerFactory;
public class StreamingCubeDataSearcher {
private static Logger logger =
LoggerFactory.getLogger(StreamingCubeDataSearcher.class);
-
- private static int TIMEOUT = Integer.MAX_VALUE;
-
private StreamingSegmentManager streamingSegmentManager;
private String cubeName;
private CubeDesc cubeDesc;
@@ -73,7 +70,15 @@ public class StreamingCubeDataSearcher {
try {
logger.info("query-{}: use cuboid {} to serve the query",
queryProfile.getQueryId(),
searchRequest.getHitCuboid());
- ResultCollector resultCollector = getResultCollector();
+ ResultCollector resultCollector =
getResultCollector(searchRequest);
+ if (resultCollector instanceof MultiThreadsResultCollector) {
+ while (MultiThreadsResultCollector.isOccupied() &&
System.currentTimeMillis() < searchRequest.getDeadline()) {
+ Thread.sleep(50);
+ }
+ if (System.currentTimeMillis() >= searchRequest.getDeadline())
{
+ throw new RuntimeException("Timeout for " +
queryProfile.getQueryId());
+ }
+ }
Collection<StreamingCubeSegment> segments =
streamingSegmentManager.getAllSegments();
StreamingDataQueryPlanner scanRangePlanner =
searchRequest.getQueryPlanner();
for (StreamingCubeSegment queryableSegment : segments) {
@@ -105,10 +110,10 @@ public class StreamingCubeDataSearcher {
}
}
- private ResultCollector getResultCollector() {
+ private ResultCollector getResultCollector(StreamingSearchContext
searchRequest) {
int useThreads =
cubeDesc.getConfig().getStreamingReceiverUseThreadsPerQuery();
if (useThreads > 1) {
- return new MultiThreadsResultCollector(useThreads, TIMEOUT);
+ return new MultiThreadsResultCollector(useThreads,
searchRequest.getDeadline());
} else {
return new SingleThreadResultCollector();
}
diff --git
a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingSearchContext.java
b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingSearchContext.java
index 9da8e5b..307c2be 100644
---
a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingSearchContext.java
+++
b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingSearchContext.java
@@ -45,6 +45,8 @@ public class StreamingSearchContext {
private long hitCuboid;
private long basicCuboid;
+ private long deadline = Long.MAX_VALUE;
+
public StreamingSearchContext(CubeDesc cubeDesc, Set<TblColRef>
dimensions, Set<TblColRef> groups,
Set<FunctionDesc> metrics, TupleFilter
filter, TupleFilter havingFilter) {
this.cubeDesc = cubeDesc;
@@ -56,6 +58,7 @@ public class StreamingSearchContext {
this.respResultSchema = new ResponseResultSchema(cubeDesc, dimensions,
metrics);
this.queryPlanner = new StreamingDataQueryPlanner(cubeDesc, filter);
this.addedGroups = Sets.newHashSet();
+ this.deadline = deadline;
calculateHitCuboid();
}
@@ -158,4 +161,12 @@ public class StreamingSearchContext {
sortedSet.addAll(cubeDesc.getMandatoryCuboids());
return sortedSet;
}
+
+ public long getDeadline() {
+ return deadline;
+ }
+
+ public void setDeadline(long deadline) {
+ this.deadline = deadline;
+ }
}
diff --git
a/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java
b/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java
index 45c6307..51d0cba 100644
---
a/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java
+++
b/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java
@@ -24,6 +24,7 @@ import java.util.Set;
import org.apache.commons.codec.binary.Base64;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.SetThreadName;
import org.apache.kylin.cube.CubeDescManager;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.gridtable.StorageSideBehavior;
@@ -79,7 +80,7 @@ public class DataController extends BasicController {
}
StreamingQueryProfile.set(queryProfile);
logger.info("receive query request queryId:{}", queryId);
- try {
+ try (SetThreadName changeName = new SetThreadName("Query %s",
queryId)) {
final Stopwatch sw = new Stopwatch();
sw.start();
String cubeName = dataRequest.getCubeName();
@@ -105,6 +106,7 @@ public class DataController extends BasicController {
StreamingSearchContext gtSearchRequest = new
StreamingSearchContext(cubeDesc, dimensions, groups,
metrics, tupleFilter, havingFilter);
+ gtSearchRequest.setDeadline(dataRequest.getDeadline());
searchResult = dataSearcher.doSearch(gtSearchRequest,
minSegmentTime,
dataRequest.isAllowStorageAggregation());