Repository: tajo Updated Branches: refs/heads/master cc1efb66d -> 1515e388a
TAJO-2022: Add AsyncTaskServer to TajoMaster. Closes #913 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/1515e388 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/1515e388 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/1515e388 Branch: refs/heads/master Commit: 1515e388a764a55b7b37ce2c23ddfab21fe8115d Parents: cc1efb6 Author: Hyunsik Choi <[email protected]> Authored: Wed Dec 16 19:12:39 2015 -0800 Committer: Hyunsik Choi <[email protected]> Committed: Wed Dec 16 19:12:39 2015 -0800 ---------------------------------------------------------------------- CHANGES | 2 + .../java/org/apache/tajo/conf/TajoConf.java | 7 ++ .../tajo/cli/tsql/TestTajoCliNegatives.java | 2 +- .../org/apache/tajo/io/AsyncTaskService.java | 109 +++++++++++++++++++ .../java/org/apache/tajo/master/TajoMaster.java | 9 ++ .../tajo/master/TajoMasterClientService.java | 24 ++-- .../exec/NonForwardQueryResultFileScanner.java | 102 ++++++++--------- .../apache/tajo/master/exec/QueryExecutor.java | 10 +- .../ws/rs/resources/QueryResultResource.java | 11 +- 9 files changed, 206 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/1515e388/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 65353b1..406e56a 100644 --- a/CHANGES +++ b/CHANGES @@ -8,6 +8,8 @@ Release 0.12.0 - unreleased IMPROVEMENT + TAJO-2022: Add AsyncTaskServer to TajoMaster. (hyunsik) + TAJO-1990: Refine some parts in HBaseTablespace. (hyunsik) TAJO-2005: Add TableStatUpdateRewriter. (hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/1515e388/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index 9f788eb..b2e08bd 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -152,6 +152,13 @@ public class TajoConf extends Configuration { DEFAULT_SERVICE_TRACKER_CLASS("tajo.discovery.service-tracker.class", BaseServiceTracker.class.getCanonicalName()), HA_SERVICE_TRACKER_CLASS("tajo.discovery.ha-service-tracker.class", "org.apache.tajo.ha.HdfsServiceTracker"), + // Async IO Task Service + + /** The number of threads for async tasks */ + MASTER_ASYNC_TASK_THREAD_NUM("tajo.master.async-task.thread-num", 4), + /** How long it will wait for termination */ + MASTER_ASYNC_TASK_TERMINATION_WAIT_TIME("tajo.master.async-task.wait-time-sec", 60), // 1 min + // Resource tracker service RESOURCE_TRACKER_RPC_ADDRESS("tajo.resource-tracker.rpc.address", "localhost:26003", Validators.networkAddr()), http://git-wip-us.apache.org/repos/asf/tajo/blob/1515e388/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java index bf4ffcf..6d939de 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java @@ -134,7 +134,7 @@ public class TestTajoCliNegatives extends QueryTestCaseBase { assertScriptFailure("select fail(3, l_orderkey, 'testQueryFailureOfSimpleQuery') from default.lineitem" , "?fail\n" + "-------------------------------\n" + - "ERROR: internal error: internal error: testQueryFailureOfSimpleQuery\n"); + "ERROR: internal error: internal error: internal error: testQueryFailureOfSimpleQuery\n"); } @Test http://git-wip-us.apache.org/repos/asf/tajo/blob/1515e388/tajo-core/src/main/java/org/apache/tajo/io/AsyncTaskService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/io/AsyncTaskService.java b/tajo-core/src/main/java/org/apache/tajo/io/AsyncTaskService.java new file mode 100644 index 0000000..71faaee --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/io/AsyncTaskService.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tajo.io; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.master.TajoMaster; +import org.apache.tajo.util.TUtil; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +/** + * AsyncTaskService executes some blocking tasks in TajoMaster + * + * @See https://issues.apache.org/jira/browse/TAJO-2022 + */ +public class AsyncTaskService extends AbstractService { + private final TajoMaster.MasterContext context; + private long TERMINATION_WAIT_TIME_SEC; + private ExecutorService executor; + + /** + * Construct the service. + * + * @param context + */ + public AsyncTaskService(TajoMaster.MasterContext context) { + super("MasterAsyncTaskExecutor"); + this.context = context; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + TajoConf systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class); + TERMINATION_WAIT_TIME_SEC = systemConf.getLongVar(ConfVars.MASTER_ASYNC_TASK_TERMINATION_WAIT_TIME); + executor = Executors.newFixedThreadPool(systemConf.getIntVar(ConfVars.MASTER_ASYNC_TASK_THREAD_NUM)); + + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + executor.shutdown(); + boolean terminated = false; + try { + terminated = executor.awaitTermination(TERMINATION_WAIT_TIME_SEC, TimeUnit.SECONDS); + } catch (InterruptedException e) { + } + if (!terminated) { + executor.shutdownNow(); + } + + super.serviceStop(); + } + + public TajoMaster.MasterContext getMasterContext() { + return this.context; + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by a task running in AsyncTaskService. + * + * @param task Task + * @param <T> Return Type + * @return CompletableFuture + */ + public <T> CompletableFuture<T> supply(Supplier<T> task) { + return CompletableFuture.supplyAsync(task, executor); + } + + /** + * Returns a new CompletableFuture that is asynchronously completed + * by a task running in the given executor after it runs the given + * action. + * + * @param task Task + * @return CompletableFuture + */ + public CompletableFuture<Void> run(Runnable task) { + return CompletableFuture.runAsync(task, executor); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/1515e388/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java index a74573e..97e9613 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java @@ -48,6 +48,7 @@ import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.function.FunctionLoader; import org.apache.tajo.exception.*; import org.apache.tajo.function.FunctionSignature; +import org.apache.tajo.io.AsyncTaskService; import org.apache.tajo.master.rm.TajoResourceManager; import org.apache.tajo.metrics.ClusterResourceMetricSet; import org.apache.tajo.metrics.Master; @@ -119,6 +120,7 @@ public class TajoMaster extends CompositeService { private CatalogServer catalogServer; private CatalogService catalog; private GlobalEngine globalEngine; + private AsyncTaskService asyncTaskService; private AsyncDispatcher dispatcher; private TajoMasterClientService tajoMasterClientService; private QueryCoordinatorService tajoMasterService; @@ -196,6 +198,9 @@ public class TajoMaster extends CompositeService { tajoMasterClientService = new TajoMasterClientService(context); addIfService(tajoMasterClientService); + asyncTaskService = new AsyncTaskService(context); + addIfService(asyncTaskService); + tajoMasterService = new QueryCoordinatorService(context); addIfService(tajoMasterService); @@ -501,6 +506,10 @@ public class TajoMaster extends CompositeService { return globalEngine; } + public AsyncTaskService asyncTaskExecutor() { + return asyncTaskService; + } + public QueryCoordinatorService getTajoMasterService() { return tajoMasterService; } http://git-wip-us.apache.org/repos/asf/tajo/blob/1515e388/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index 017b17f..bb04229 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -30,6 +30,7 @@ import org.apache.hadoop.service.AbstractService; import org.apache.tajo.QueryId; import org.apache.tajo.QueryIdFactory; import org.apache.tajo.TajoIdProtos; +import org.apache.tajo.TajoProtos; import org.apache.tajo.TajoProtos.QueryState; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.partition.PartitionMethodDesc; @@ -62,10 +63,7 @@ import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.ProtoUtil; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; +import java.util.*; import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; import static org.apache.tajo.exception.ExceptionUtil.printStackTraceIfError; @@ -558,13 +556,17 @@ public class TajoMasterClientService extends AbstractService { scanNode.init(resultTableDesc); } - if(request.hasCompressCodec()) { - queryResultScanner = new NonForwardQueryResultFileScanner(context.getConf(), session.getSessionId(), - queryId, scanNode, Integer.MAX_VALUE, request.getCompressCodec()); - } else { - queryResultScanner = new NonForwardQueryResultFileScanner(context.getConf(), - session.getSessionId(), queryId, scanNode, Integer.MAX_VALUE); - } + Optional<TajoProtos.CodecType> codecType = + request.hasCompressCodec() ? Optional.of(request.getCompressCodec()) : Optional.empty(); + + queryResultScanner = new NonForwardQueryResultFileScanner( + context.asyncTaskExecutor(), + context.getConf(), + session.getSessionId(), + queryId, + scanNode, + Integer.MAX_VALUE, + codecType); queryResultScanner.init(); session.addNonForwardQueryResultScanner(queryResultScanner); http://git-wip-us.apache.org/repos/asf/tajo/blob/1515e388/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java index 80275ce..8953315 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java @@ -40,6 +40,7 @@ import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.exception.TajoException; import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.ipc.ClientProtos.SerializedResultSet; +import org.apache.tajo.io.AsyncTaskService; import org.apache.tajo.plan.logical.ScanNode; import org.apache.tajo.querymaster.Repartitioner; import org.apache.tajo.storage.*; @@ -56,38 +57,34 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.Optional; import java.util.concurrent.Future; public class NonForwardQueryResultFileScanner implements NonForwardQueryResultScanner { private final static Log LOG = LogFactory.getLog(NonForwardQueryResultFileScanner.class); - private QueryId queryId; - private String sessionId; + final private AsyncTaskService asyncTaskService; + final private QueryId queryId; + final private String sessionId; private ScanExec scanExec; - private TableDesc tableDesc; - private RowStoreEncoder rowEncoder; - private int maxRow; + final private TableDesc tableDesc; + final private RowStoreEncoder rowEncoder; + final private int maxRow; private boolean eof; private volatile long totalRows; private volatile int currentNumRows; private volatile boolean isStopped; private TaskAttemptContext taskContext; - private TajoConf tajoConf; - private ScanNode scanNode; - private CodecType codecType; - private ExecutorService executor; + final private TajoConf tajoConf; + final private ScanNode scanNode; + final private Optional<CodecType> codecType; private MemoryRowBlock rowBlock; private Future<MemoryRowBlock> nextFetch; - public NonForwardQueryResultFileScanner(TajoConf tajoConf, String sessionId, QueryId queryId, ScanNode scanNode, - int maxRow) throws IOException { - this(tajoConf, sessionId, queryId, scanNode, maxRow, null); - } - - public NonForwardQueryResultFileScanner(TajoConf tajoConf, String sessionId, QueryId queryId, ScanNode scanNode, - int maxRow, CodecType codecType) throws IOException { + public NonForwardQueryResultFileScanner(AsyncTaskService asyncTaskService, + TajoConf tajoConf, String sessionId, QueryId queryId, ScanNode scanNode, + int maxRow, Optional<CodecType> codecType) throws IOException { + this.asyncTaskService = asyncTaskService; this.tajoConf = tajoConf; this.sessionId = sessionId; this.queryId = queryId; @@ -159,19 +156,22 @@ public class NonForwardQueryResultFileScanner implements NonForwardQueryResultSc rowBlock = null; } - if(executor != null) { - executor.shutdown(); - } - //remove temporal final output if (!tajoConf.getBoolVar(TajoConf.ConfVars.$DEBUG_ENABLED)) { Path temporalResultDir = TajoConf.getTemporalResultDir(tajoConf, queryId); + if (tableDesc.getUri().equals(temporalResultDir.toUri())) { - temporalResultDir.getFileSystem(tajoConf).delete(temporalResultDir.getParent(), true); + asyncTaskService.run(() -> { + try { + temporalResultDir.getFileSystem(tajoConf).delete(temporalResultDir.getParent(), true); + } catch (IOException e) { + LOG.error(e); + } + } + ); } } - LOG.info(String.format("\"Sent result to client for %s, queryId: %s %s rows: %d", sessionId, queryId, codecType != null ? ", compression: " + codecType : "", @@ -229,13 +229,13 @@ public class NonForwardQueryResultFileScanner implements NonForwardQueryResultSc resultSetBuilder.setRows(rowBlock.rows()); MemoryBlock memoryBlock = rowBlock.getMemory(); - if (codecType != null) { + if (codecType.isPresent()) { byte[] uncompressedBytes = new byte[memoryBlock.readableBytes()]; memoryBlock.getBuffer().getBytes(0, uncompressedBytes); - byte[] compressedBytes = CompressionUtil.compress(codecType, uncompressedBytes); + byte[] compressedBytes = CompressionUtil.compress(codecType.get(), uncompressedBytes); resultSetBuilder.setDecompressedLength(uncompressedBytes.length); - resultSetBuilder.setDecompressCodec(codecType); + resultSetBuilder.setDecompressCodec(codecType.get()); resultSetBuilder.setSerializedTuples(ByteString.copyFrom(compressedBytes)); } else { ByteBuffer uncompressed = memoryBlock.getBuffer().nioBuffer(0, memoryBlock.readableBytes()); @@ -272,42 +272,34 @@ public class NonForwardQueryResultFileScanner implements NonForwardQueryResultSc return future; } - if (executor == null) { - executor = Executors.newSingleThreadExecutor(); - } - - executor.submit(new Runnable() { - @Override - public void run() { - try { - rowBlock.clear(); - int endRow = currentNumRows + fetchRowNum; - while (currentNumRows < endRow) { - Tuple tuple = scanExec.next(); - if (tuple == null) { + return asyncTaskService.supply(() -> { + try { + rowBlock.clear(); + int endRow = currentNumRows + fetchRowNum; + while (currentNumRows < endRow) { + Tuple tuple = scanExec.next(); + if (tuple == null) { + eof = true; + break; + } else { + rowBlock.getWriter().addTuple(tuple); + currentNumRows++; + if (currentNumRows >= maxRow) { eof = true; break; - } else { - rowBlock.getWriter().addTuple(tuple); - currentNumRows++; - if (currentNumRows >= maxRow) { - eof = true; - break; - } } } + } - if (rowBlock.rows() > 0) { - totalRows += rowBlock.rows(); - } - - future.set(rowBlock); - } catch (Throwable t) { - future.setException(t); + if (rowBlock.rows() > 0) { + totalRows += rowBlock.rows(); } + + return rowBlock; + } catch (Throwable t) { + throw new TajoInternalError(t); } }); - return future; } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/1515e388/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index 6afa67a..8dddf2c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@ -72,6 +72,7 @@ import java.net.URI; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import static org.apache.tajo.exception.ReturnStateUtil.OK; import static org.apache.tajo.exception.ReturnStateUtil.errUndefinedDatabase; @@ -302,7 +303,14 @@ public class QueryExecutor { plan.getRootBlock().getRoot()); final NonForwardQueryResultScanner queryResultScanner = new NonForwardQueryResultFileScanner( - context.getConf(), session.getSessionId(), queryInfo.getQueryId(), scanNode, maxRow); + context.asyncTaskExecutor(), + context.getConf(), + session.getSessionId(), + queryInfo.getQueryId(), + scanNode, + maxRow, + Optional.empty()); + queryResultScanner.init(); session.addNonForwardQueryResultScanner(queryResultScanner); http://git-wip-us.apache.org/repos/asf/tajo/blob/1515e388/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java index 2438060..93d397a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java +++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResultResource.java @@ -50,6 +50,7 @@ import java.net.URI; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.List; +import java.util.Optional; public class QueryResultResource { @@ -133,8 +134,14 @@ public class QueryResultResource { scanNode.init(resultTableDesc); } - resultScanner = new NonForwardQueryResultFileScanner(masterContext.getConf(), session.getSessionId(), queryId, - scanNode, Integer.MAX_VALUE); + resultScanner = new NonForwardQueryResultFileScanner( + masterContext.asyncTaskExecutor(), + masterContext.getConf(), + session.getSessionId(), + queryId, + scanNode, + Integer.MAX_VALUE, + Optional.empty()); resultScanner.init(); session.addNonForwardQueryResultScanner(resultScanner); }
