This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch xianyi in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 46d8f741e1eeab9b11b53e2e16c192490ed2ed13 Author: Steve Yurong Su <[email protected]> AuthorDate: Wed Nov 24 11:20:49 2021 +0800 xianyi in p --- .../org/apache/iotdb/db/service/TSServiceImpl.java | 113 +++++++++++++++++---- 1 file changed, 94 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index 505c5a9..b8a0a6b 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -54,6 +54,7 @@ import org.apache.iotdb.db.qp.physical.crud.MeasurementInfo; import org.apache.iotdb.db.qp.physical.crud.QueryPlan; import org.apache.iotdb.db.qp.physical.crud.SelectIntoPlan; import org.apache.iotdb.db.qp.physical.crud.UDFPlan; +import org.apache.iotdb.db.qp.physical.crud.UDTFPlan; import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan; import org.apache.iotdb.db.qp.physical.sys.AuthorPlan; import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan; @@ -133,11 +134,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; -import com.google.common.primitives.Bytes; -import org.apache.thrift.TException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.nio.ByteBuffer; import java.sql.SQLException; @@ -151,8 +147,16 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; +import java.util.concurrent.RecursiveTask; import java.util.stream.Collectors; +import com.google.common.primitives.Bytes; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException; import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException; import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNonQueryException; @@ -935,22 +939,29 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If return new TSExecuteStatementResp(status); } - final long startTime = System.currentTimeMillis(); - final long queryId = sessionManager.requestQueryId(statementId, true); - QueryContext context = - genQueryContext(queryId, physicalPlan.isDebug(), startTime, statement, timeout); final SelectIntoPlan selectIntoPlan = (SelectIntoPlan) physicalPlan; final QueryPlan queryPlan = selectIntoPlan.getQueryPlan(); - queryFrequencyRecorder.incrementAndGet(); - AUDIT_LOGGER.debug( - "Session {} execute select into: {}", sessionManager.getCurrSessionId(), statement); - if (physicalPlan instanceof QueryPlan && ((QueryPlan) physicalPlan).isEnableTracing()) { - tracingManager.setSeriesPathNum(queryId, queryPlan.getPaths().size()); + if (queryPlan instanceof UDTFPlan + && queryPlan + .getResultColumns() + .get(0) + .getExpression() + .isTimeSeriesGeneratingFunctionExpression() + // && ((FunctionExpression) queryPlan.getResultColumns().get(0).getExpression()) + // .getFunctionName() + // .equalsIgnoreCase("en") + ) { + return executeSelectIntoStatementXianyi( + (UDTFPlan) queryPlan, this, statement, statementId, timeout, fetchSize, sessionId); } - try { + final long startTime = System.currentTimeMillis(); + final long queryId = sessionManager.requestQueryId(statementId, true); + QueryContext context = + genQueryContext(queryId, physicalPlan.isDebug(), startTime, statement, timeout); + try { InsertTabletPlansIterator insertTabletPlansIterator = new InsertTabletPlansIterator( queryPlan, @@ -969,10 +980,74 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If return RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS).setQueryId(queryId); } finally { sessionManager.releaseQueryResourceNoExceptions(queryId); - Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_SELECT_INTO, startTime); - long costTime = System.currentTimeMillis() - startTime; - if (costTime >= CONFIG.getSlowQueryThreshold()) { - SLOW_SQL_LOGGER.info("Cost: {} ms, sql is {}", costTime, statement); + } + } + + private TSExecuteStatementResp executeSelectIntoStatementXianyi( + UDTFPlan udtfPlan, + TSServiceImpl tsService, + String statement, + long statementId, + long timeout, + int fetchSize, + long sessionId) { + ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() << 1); + List<ForkJoinTask<Void>> futures = new ArrayList<>(); + for (String subStatement : split(udtfPlan, statement)) { + futures.add( + forkJoinPool.submit( + new InsertTabletPlanTask( + tsService, statementId, timeout, fetchSize, sessionId, subStatement))); + } + for (ForkJoinTask<Void> v : futures) { + v.join(); + } + forkJoinPool.shutdown(); + return RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS); + } + + private List<String> split(UDTFPlan udtfPlan, String statement) { + List<String> statements = new ArrayList<>(); + + String prefix = statement.split("where")[0] + " where "; + + return statements; + } + + private class InsertTabletPlanTask extends RecursiveTask<Void> { + + private final TSServiceImpl tsService; + private final long statementId; + private final long timeout; + private final int fetchSize; + private final long sessionId; + private final String statement; + + InsertTabletPlanTask( + TSServiceImpl tsService, + long statementId, + long timeout, + int fetchSize, + long sessionId, + String statement) { + this.tsService = tsService; + this.statementId = statementId; + this.timeout = timeout; + this.fetchSize = fetchSize; + this.sessionId = sessionId; + this.statement = statement; + } + + @Override + protected Void compute() { + try { + PhysicalPlan physicalPlan = + processor.parseSQLToPhysicalPlan(statement, sessionManager.getZoneId(sessionId)); + tsService.executeSelectIntoStatement( + statement, statementId, physicalPlan, fetchSize, timeout, sessionId); + return null; + } catch (Exception e) { + throw new RuntimeException(e.getMessage()); } } }
