This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit eb65877a0556d0f45bb7876b10a924304053ef97
Author: nichunen <[email protected]>
AuthorDate: Thu Jan 23 11:49:21 2020 +0800

    Revert "KYLIN-4305 Streaming Receiver cannot limit income query request or 
cancel long-running query"
    
    This reverts commit 61e319b524464ed2cba21fe659d057362a954b6d.
---
 .../org/apache/kylin/common/KylinConfigBase.java   |   7 +-
 .../stream/rpc/HttpStreamDataSearchClient.java     |  16 ++--
 .../kylin/stream/core/model/DataRequest.java       |   9 --
 .../core/query/MultiThreadsResultCollector.java    | 106 +++++++--------------
 .../core/query/StreamingCubeDataSearcher.java      |  17 ++--
 .../stream/core/query/StreamingSearchContext.java  |  11 ---
 .../server/rest/controller/DataController.java     |   4 +-
 7 files changed, 49 insertions(+), 121 deletions(-)

diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 9de676b..d9561fd 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -55,7 +55,6 @@ import java.util.regex.Pattern;
 public abstract class KylinConfigBase implements Serializable {
     private static final long serialVersionUID = 1L;
     private static final Logger logger = 
LoggerFactory.getLogger(KylinConfigBase.class);
-    private static final int AVAILABLE_PROCESSORS = 
Runtime.getRuntime().availableProcessors();
 
     private static final String FALSE = "false";
     private static final String TRUE = "true";
@@ -2347,13 +2346,11 @@ public abstract class KylinConfigBase implements 
Serializable {
     }
 
     public int getStreamingReceiverQueryCoreThreads() {
-        int def = getStreamingReceiverQueryMaxThreads() - 1;
-        return 
Integer.parseInt(getOptional("kylin.stream.receiver.query-core-threads", def + 
""));
+        return 
Integer.parseInt(getOptional("kylin.stream.receiver.query-core-threads", "50"));
     }
 
     public int getStreamingReceiverQueryMaxThreads() {
-        int def = Math.max(2, AVAILABLE_PROCESSORS - 1);
-        return 
Integer.parseInt(getOptional("kylin.stream.receiver.query-max-threads", def + 
""));
+        return 
Integer.parseInt(getOptional("kylin.stream.receiver.query-max-threads", "200"));
     }
 
     public int getStreamingReceiverUseThreadsPerQuery() {
diff --git 
a/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
 
b/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
index e1119a8..bc075ce 100644
--- 
a/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
+++ 
b/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
@@ -72,11 +72,10 @@ import com.google.common.collect.Sets;
  */
 public class HttpStreamDataSearchClient implements IStreamDataSearchClient {
     public static final Logger logger = 
LoggerFactory.getLogger(HttpStreamDataSearchClient.class);
-    public static final long WAIT_DURATION = 2 * 60000 ;
 
     private static ExecutorService executorService;
     static {
-        executorService = new ThreadPoolExecutor(20, 100, 60L, 
TimeUnit.SECONDS,
+        executorService = new ThreadPoolExecutor(20, 100, 60L, 
TimeUnit.MILLISECONDS,
                 new LinkedBlockingQueue<Runnable>(), new 
NamedThreadFactory("stream-rpc-pool-t"));
     }
     private AssignmentsCache assignmentsCache;
@@ -96,7 +95,7 @@ public class HttpStreamDataSearchClient implements 
IStreamDataSearchClient {
             final TupleFilter tupleFilter, final Set<TblColRef> dimensions, 
final Set<TblColRef> groups,
             final Set<FunctionDesc> metrics, final int storagePushDownLimit, 
final boolean allowStorageAggregation) {
         List<ReplicaSet> replicaSetsOfCube = 
assignmentsCache.getReplicaSetsByCube(cube.getName());
-        int timeout = cube.getConfig().getStreamingRPCHttpReadTimeout() * 2;
+        int timeout = 120 * 1000; // timeout should be configurable
         final QueuedStreamingTupleIterator result = new 
QueuedStreamingTupleIterator(replicaSetsOfCube.size(), timeout);
         final QueryContext query = QueryContextFacade.current();
 
@@ -106,6 +105,7 @@ public class HttpStreamDataSearchClient implements 
IStreamDataSearchClient {
         final RecordsSerializer recordsSerializer = new 
RecordsSerializer(schema);
         final DataRequest dataRequest = createDataRequest(query.getQueryId(), 
cube.getName(), minSegmentTime, tupleInfo,
                 tupleFilter, dimensions, groups, metrics, 
storagePushDownLimit, allowStorageAggregation);
+
         logger.info("Query-{}:send request to stream receivers", 
query.getQueryId());
         for (final ReplicaSet rs : replicaSetsOfCube) {
             executorService.submit(new Runnable() {
@@ -165,7 +165,7 @@ public class HttpStreamDataSearchClient implements 
IStreamDataSearchClient {
             return foundReceiver;
         }
 
-        if (System.currentTimeMillis() - lastFailTime > WAIT_DURATION) { // 
retry every 2 minutes
+        if (System.currentTimeMillis() - lastFailTime > 2 * 60 * 1000) { // 
retry every 2 minutes
             return foundReceiver;
         }
 
@@ -175,18 +175,18 @@ public class HttpStreamDataSearchClient implements 
IStreamDataSearchClient {
     public Iterator<ITuple> doSearch(DataRequest dataRequest, CubeInstance 
cube, StreamingTupleConverter tupleConverter,
             RecordsSerializer recordsSerializer, Node receiver, TupleInfo 
tupleInfo) throws Exception {
         String queryId = dataRequest.getQueryId();
+        logger.info("send query to receiver " + receiver + " with query id:" + 
queryId);
         String url = "http://"; + receiver.getHost() + ":" + receiver.getPort() 
+ "/kylin/api/data/query";
 
         try {
-            int connTimeout = 
cube.getConfig().getStreamingRPCHttpConnTimeout();
-            int readTimeout = 
cube.getConfig().getStreamingRPCHttpReadTimeout();
-            dataRequest.setDeadline(System.currentTimeMillis() + 
(int)(readTimeout * 1.5));
             String content = JsonUtil.writeValueAsString(dataRequest);
             Stopwatch sw = new Stopwatch();
             sw.start();
+            int connTimeout = 
cube.getConfig().getStreamingRPCHttpConnTimeout();
+            int readTimeout = 
cube.getConfig().getStreamingRPCHttpReadTimeout();
             String msg = restService.postRequest(url, content, connTimeout, 
readTimeout);
 
-            logger.info("query-{}: receive response from {} take time:{}", 
queryId, receiver, sw.elapsed(TimeUnit.MILLISECONDS));
+            logger.info("query-{}: receive response from {} take time:{}", 
queryId, receiver, sw.elapsedMillis());
             if (failedReceivers.containsKey(receiver)) {
                 failedReceivers.remove(receiver);
             }
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java 
b/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java
index f32b751..07c9028 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java
@@ -36,7 +36,6 @@ public class DataRequest {
     private boolean allowStorageAggregation;
 
     private long requestSendTime;
-    private long deadline;
     private boolean enableDetailProfile;
     private String storageBehavior;
 
@@ -143,12 +142,4 @@ public class DataRequest {
     public void setHavingFilter(String havingFilter) {
         this.havingFilter = havingFilter;
     }
-
-    public long getDeadline() {
-        return deadline;
-    }
-
-    public void setDeadline(long deadline) {
-        this.deadline = deadline;
-    }
 }
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java
index 46f52a5..5ffe5f2 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/query/MultiThreadsResultCollector.java
@@ -14,19 +14,18 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- */
+*/
 
 package org.apache.kylin.stream.core.query;
 
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Future;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.kylin.common.KylinConfig;
@@ -39,39 +38,28 @@ import com.google.common.collect.Lists;
 
 public class MultiThreadsResultCollector extends ResultCollector {
     private static Logger logger = 
LoggerFactory.getLogger(MultiThreadsResultCollector.class);
-    private static ThreadPoolExecutor scannerThreadPool;
-    private static int MAX_RUNNING_THREAD_COUNT;
-
+    private static ExecutorService executor;
     static {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
-        MAX_RUNNING_THREAD_COUNT = 
config.getStreamingReceiverQueryMaxThreads();
-        scannerThreadPool = new 
ThreadPoolExecutor(config.getStreamingReceiverQueryCoreThreads(),
-                MAX_RUNNING_THREAD_COUNT, 60L, TimeUnit.SECONDS,
-                new LinkedBlockingQueue<>(), new 
NamedThreadFactory("query-worker"));
+        executor = new 
ThreadPoolExecutor(config.getStreamingReceiverQueryCoreThreads(),
+                config.getStreamingReceiverQueryMaxThreads(), 60L, 
TimeUnit.SECONDS,
+                new LinkedBlockingQueue<Runnable>(), new 
NamedThreadFactory("query-worker"));
     }
 
-    private long deadline;
-    private String queryId;
-    /**
-     * if current query beyond the deadline
-     */
-    private AtomicBoolean cancelFlag = new AtomicBoolean(false);
+    private int timeout;
     private Semaphore workersSemaphore;
+    final BlockingQueue<Record> queue = new LinkedBlockingQueue<>(10000);
     private AtomicInteger notCompletedWorkers;
 
-    final BlockingQueue<Record> recordCachePool = new 
LinkedBlockingQueue<>(10000);
-
-    public MultiThreadsResultCollector(int numOfWorkers, long deadline) {
+    public MultiThreadsResultCollector(int numOfWorkers, int timeout) {
         this.workersSemaphore = new Semaphore(numOfWorkers);
-        this.deadline = deadline;
-        this.queryId = StreamingQueryProfile.get().getQueryId();
+        this.timeout = timeout;
     }
 
     @Override
     public Iterator<Record> iterator() {
         notCompletedWorkers = new AtomicInteger(searchResults.size());
-        Thread masterThread = new Thread(new WorkSubmitter(), 
"MultiThreadsResultCollector_" + queryId);
-        masterThread.start();
+        executor.submit(new WorkSubmitter());
 
         final int batchSize = 100;
         final long startTime = System.currentTimeMillis();
@@ -81,41 +69,38 @@ public class MultiThreadsResultCollector extends 
ResultCollector {
 
             @Override
             public boolean hasNext() {
-                boolean exits = (internalIT.hasNext() || 
!recordCachePool.isEmpty());
+                boolean exits = (internalIT.hasNext() || queue.size() > 0);
                 if (!exits) {
                     while (notCompletedWorkers.get() > 0) {
                         Thread.yield();
-                        if (System.currentTimeMillis() > deadline) {
-                            masterThread.interrupt(); // notify main thread
-                            cancelFlag.set(true);
-                            logger.warn("Beyond the deadline for {}.", 
queryId);
+                        long takeTime = System.currentTimeMillis() - startTime;
+                        if (takeTime > timeout) {
                             throw new RuntimeException("Timeout when iterate 
search result");
                         }
-                        if (internalIT.hasNext() || 
!recordCachePool.isEmpty()) {
+                        if (internalIT.hasNext() || queue.size() > 0) {
                             return true;
                         }
                     }
                 }
+
                 return exits;
             }
 
             @Override
             public Record next() {
                 try {
-                    if (System.currentTimeMillis() > deadline) {
+                    long takeTime = System.currentTimeMillis() - startTime;
+                    if (takeTime > timeout) {
                         throw new RuntimeException("Timeout when iterate 
search result");
                     }
                     if (!internalIT.hasNext()) {
                         recordList.clear();
-                        Record one = recordCachePool.poll(deadline - 
startTime, TimeUnit.MILLISECONDS);
+                        Record one = queue.poll(timeout - takeTime, 
TimeUnit.MILLISECONDS);
                         if (one == null) {
-                            masterThread.interrupt(); // notify main thread
-                            cancelFlag.set(true);
-                            logger.warn("Beyond the deadline for {}.", 
queryId);
                             throw new RuntimeException("Timeout when iterate 
search result");
                         }
                         recordList.add(one);
-                        recordCachePool.drainTo(recordList, batchSize - 1);
+                        queue.drainTo(recordList, batchSize - 1);
                         internalIT = recordList.iterator();
                     }
                     return internalIT.next();
@@ -143,13 +128,15 @@ public class MultiThreadsResultCollector extends 
ResultCollector {
             try {
                 result.startRead();
                 for (Record record : result) {
-                    recordCachePool.put(record.copy());
+                    try {
+                        queue.put(record.copy());
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException("Timeout when visiting 
streaming segmenent", e);
+                    }
                 }
                 result.endRead();
-            } catch (InterruptedException inter) {
-                logger.warn("Cancelled scan streaming segment", inter);
             } catch (Exception e) {
-                logger.error("Error when iterate search result", e);
+                logger.error("error when iterate search result", e);
             } finally {
                 notCompletedWorkers.decrementAndGet();
                 workersSemaphore.release();
@@ -160,44 +147,15 @@ public class MultiThreadsResultCollector extends 
ResultCollector {
     private class WorkSubmitter implements Runnable {
         @Override
         public void run() {
-            List<Future> futureList = 
Lists.newArrayListWithExpectedSize(searchResults.size());
-            int cancelTimes = 0;
-            try {
-                for (final IStreamingSearchResult result : searchResults) {
-                    Future f = scannerThreadPool.submit(new 
ResultIterateWorker(result));
-                    futureList.add(f);
-                    workersSemaphore.acquire(); // Throw InterruptedException 
when interrupted
-                }
-                while (notCompletedWorkers.get() > 0) {
-                    Thread.sleep(100);
-                    if (cancelFlag.get() || 
Thread.currentThread().isInterrupted()) {
-                        break;
-                    }
-                }
-            } catch (InterruptedException inter) {
-                logger.warn("Interrupted", inter);
-            } finally {
-                for (Future f : futureList) {
-                    if (!f.isCancelled() || !f.isDone()) {
-                        if (f.cancel(true)) {
-                            cancelTimes++;
-                        }
-                    }
+            for (final IStreamingSearchResult result : searchResults) {
+                executor.submit(new ResultIterateWorker(result));
+                try {
+                    workersSemaphore.acquire();
+                } catch (InterruptedException e) {
+                    logger.error("interrupted", e);
                 }
             }
-            logger.debug("Finish MultiThreadsResultCollector for queryId {}, 
cancel {}. Current thread pool: {}.",
-                    queryId, cancelTimes, scannerThreadPool);
         }
     }
 
-    /**
-     * block query if return true
-     */
-    public static boolean isOccupied() {
-        boolean occupied = scannerThreadPool.getActiveCount() >= 
MAX_RUNNING_THREAD_COUNT;
-        if (occupied) {
-            logger.debug("ThreadPool {}", scannerThreadPool);
-        }
-        return occupied;
-    }
 }
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingCubeDataSearcher.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingCubeDataSearcher.java
index 1d38b90..f89ddec 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingCubeDataSearcher.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingCubeDataSearcher.java
@@ -43,6 +43,9 @@ import org.slf4j.LoggerFactory;
 
 public class StreamingCubeDataSearcher {
     private static Logger logger = 
LoggerFactory.getLogger(StreamingCubeDataSearcher.class);
+
+    private static int TIMEOUT = Integer.MAX_VALUE;
+
     private StreamingSegmentManager streamingSegmentManager;
     private String cubeName;
     private CubeDesc cubeDesc;
@@ -70,15 +73,7 @@ public class StreamingCubeDataSearcher {
         try {
             logger.info("query-{}: use cuboid {} to serve the query", 
queryProfile.getQueryId(),
                     searchRequest.getHitCuboid());
-            ResultCollector resultCollector = 
getResultCollector(searchRequest);
-            if (resultCollector instanceof MultiThreadsResultCollector) {
-                while (MultiThreadsResultCollector.isOccupied() && 
System.currentTimeMillis() < searchRequest.getDeadline()) {
-                    Thread.sleep(50);
-                }
-                if (System.currentTimeMillis() >= searchRequest.getDeadline()) 
{
-                    throw new RuntimeException("Timeout for " + 
queryProfile.getQueryId());
-                }
-            }
+            ResultCollector resultCollector = getResultCollector();
             Collection<StreamingCubeSegment> segments = 
streamingSegmentManager.getAllSegments();
             StreamingDataQueryPlanner scanRangePlanner = 
searchRequest.getQueryPlanner();
             for (StreamingCubeSegment queryableSegment : segments) {
@@ -110,10 +105,10 @@ public class StreamingCubeDataSearcher {
         }
     }
 
-    private ResultCollector getResultCollector(StreamingSearchContext 
searchRequest) {
+    private ResultCollector getResultCollector() {
         int useThreads = 
cubeDesc.getConfig().getStreamingReceiverUseThreadsPerQuery();
         if (useThreads > 1) {
-            return new MultiThreadsResultCollector(useThreads, 
searchRequest.getDeadline());
+            return new MultiThreadsResultCollector(useThreads, TIMEOUT);
         } else {
             return new SingleThreadResultCollector();
         }
diff --git 
a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingSearchContext.java
 
b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingSearchContext.java
index 307c2be..9da8e5b 100644
--- 
a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingSearchContext.java
+++ 
b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingSearchContext.java
@@ -45,8 +45,6 @@ public class StreamingSearchContext {
     private long hitCuboid;
     private long basicCuboid;
 
-    private long deadline = Long.MAX_VALUE;
-
     public StreamingSearchContext(CubeDesc cubeDesc, Set<TblColRef> 
dimensions, Set<TblColRef> groups,
                                   Set<FunctionDesc> metrics, TupleFilter 
filter, TupleFilter havingFilter) {
         this.cubeDesc = cubeDesc;
@@ -58,7 +56,6 @@ public class StreamingSearchContext {
         this.respResultSchema = new ResponseResultSchema(cubeDesc, dimensions, 
metrics);
         this.queryPlanner = new StreamingDataQueryPlanner(cubeDesc, filter);
         this.addedGroups = Sets.newHashSet();
-        this.deadline = deadline;
         calculateHitCuboid();
     }
 
@@ -161,12 +158,4 @@ public class StreamingSearchContext {
         sortedSet.addAll(cubeDesc.getMandatoryCuboids());
         return sortedSet;
     }
-
-    public long getDeadline() {
-        return deadline;
-    }
-
-    public void setDeadline(long deadline) {
-        this.deadline =  deadline;
-    }
 }
diff --git 
a/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java
 
b/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java
index 51d0cba..45c6307 100644
--- 
a/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java
+++ 
b/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java
@@ -24,7 +24,6 @@ import java.util.Set;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.SetThreadName;
 import org.apache.kylin.cube.CubeDescManager;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.gridtable.StorageSideBehavior;
@@ -80,7 +79,7 @@ public class DataController extends BasicController {
         }
         StreamingQueryProfile.set(queryProfile);
         logger.info("receive query request queryId:{}", queryId);
-        try (SetThreadName changeName = new SetThreadName("Query %s", 
queryId)) {
+        try {
             final Stopwatch sw = new Stopwatch();
             sw.start();
             String cubeName = dataRequest.getCubeName();
@@ -106,7 +105,6 @@ public class DataController extends BasicController {
 
             StreamingSearchContext gtSearchRequest = new 
StreamingSearchContext(cubeDesc, dimensions, groups,
                     metrics, tupleFilter, havingFilter);
-            gtSearchRequest.setDeadline(dataRequest.getDeadline());
             searchResult = dataSearcher.doSearch(gtSearchRequest, 
minSegmentTime,
                     dataRequest.isAllowStorageAggregation());
 

Reply via email to