This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 9607ea5af9cfd42e5c78ec687b79ac149db91904 Merge: 9b6164e7d2 6c1e847f19 Author: Michael Blow <[email protected]> AuthorDate: Sat Nov 18 09:02:05 2023 -0500 Merge branch 'gerrit/trinity' into 'master' Change-Id: I7e0d18ecfdd8d1ea72078f13997c1754fdb8b1cd .../apache/asterix/api/common/APIFramework.java | 12 +- .../asterix/app/result/JobResultCallback.java | 2 +- .../asterix/app/translator/QueryTranslator.java | 10 +- .../test/runtime/ProfiledExecutionTest.java | 3 - .../src/test/resources/runtimets/profiled.xml | 6 + .../non-unary-subplan.1.ddl.sqlpp} | 14 +- .../non-unary-subplan.2.update.sqlpp} | 15 +- .../non-unary-subplan.3.profile.sqlpp} | 18 +- .../profile/sleep/sleep.5.profile.sqlpp | 2 +- .../profile/full-scan/full-scan.3.regexjson | 18 ++ .../non-unary-subplan.3.regexjson | 241 +++++++++++++++++++++ .../results/profile/sleep/sleep.3.regexjson | 28 ++- .../results/profile/sleep/sleep.4.regexjson | 54 ++++- .../results/profile/sleep/sleep.5.regexjson | 60 ++++- .../common/ClosedRecordConstructorEvalFactory.java | 5 + .../DatasetStreamStatsOperatorDescriptor.java | 4 +- .../api/HeuristicCompilerFactoryBuilder.java | 13 ++ .../hyracks/algebricks/compiler/api/ICompiler.java | 6 + .../LogicalOperatorPrettyPrintVisitorJson.java | 118 ++++++++-- .../algebricks/core/jobgen/impl/JobBuilder.java | 46 +++- .../algebricks/core/jobgen/impl/PlanCompiler.java | 9 + .../runtime/base/IPushRuntimeFactory.java | 1 + .../runtime/base/ProfiledPushRuntime.java | 95 ++++++++ .../meta/AlgebricksMetaOperatorDescriptor.java | 113 +++++++++- .../runtime/operators/meta/PipelineAssembler.java | 24 +- .../operators/meta/SubplanRuntimeFactory.java | 29 ++- .../api/dataflow/ISelfProfilingNodePushable.java | 15 +- .../api/dataflow/IStatsContainingNodePushable.java | 12 +- .../apache/hyracks/api/dataflow/ITimedWriter.java | 15 +- .../hyracks/api/dataflow/ProfiledFrameWriter.java | 59 ++--- .../api/dataflow/ProfiledOperatorNodePushable.java | 49 +++-- .../hyracks/api/job/profiling/IOperatorStats.java | 16 +- .../api/job/profiling/NoOpOperatorStats.java | 16 +- .../hyracks/api/job/profiling/OperatorStats.java | 45 ++-- .../runtime/SuperActivityOperatorNodePushable.java | 17 +- .../common/job/profiling/StatsCollector.java | 12 +- .../common/job/profiling/om/JobProfile.java | 4 +- .../common/job/profiling/om/TaskProfile.java | 6 +- ...aryOutputIntrospectingOperatorNodePushable.java | 14 +- 39 files changed, 1002 insertions(+), 224 deletions(-) diff --cc asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java index 9517cf06ca,0cc27903e4..9bd65a694e --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java @@@ -203,9 -204,10 +205,10 @@@ public class APIFramework } public JobSpecification compileQuery(IClusterInfoCollector clusterInfoCollector, MetadataProvider metadataProvider, - Query query, int varCounter, String outputDatasetName, SessionOutput output, - ICompiledDmlStatement statement, Map<VarIdentifier, IAObject> externalVars, IResponsePrinter printer, - IWarningCollector warningCollector, IRequestParameters requestParameters, EnumSet<JobFlag> runtimeFlags) + Query query, int varCounter, String outputDatasetName, SessionOutput output, ICompiledStatement statement, + Map<VarIdentifier, IAObject> externalVars, IResponsePrinter printer, IWarningCollector warningCollector, - IRequestParameters requestParameters) throws AlgebricksException, ACIDException { ++ IRequestParameters requestParameters, EnumSet<JobFlag> runtimeFlags) + throws AlgebricksException, ACIDException { // establish facts final boolean isQuery = query != null; @@@ -345,10 -346,13 +348,13 @@@ } if (conf.is(SessionConfig.OOB_OPTIMIZED_LOGICAL_PLAN) || isExplainOnly) { - if (isQuery || isLoad) { + if (isQuery || isLoad || isCopy) { generateOptimizedLogicalPlan(plan, spec.getLogical2PhysicalMap(), output.config().getPlanFormat(), cboMode); - lastPlan = new PlanInfo(plan, spec.getLogical2PhysicalMap(), cboMode, output.config().getPlanFormat()); + if (runtimeFlags.contains(JobFlag.PROFILE_RUNTIME)) { + lastPlan = + new PlanInfo(plan, spec.getLogical2PhysicalMap(), cboMode, output.config().getPlanFormat()); + } } } diff --cc asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index d52429f1d6,e602b761d8..a99fc22472 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@@ -3887,181 -3680,6 +3887,181 @@@ public class QueryTranslator extends Ab } } + protected Map<String, String> createExternalDataPropertiesForCopyFromStmt(String databaseName, + DataverseName dataverseName, CopyFromStatement copyFromStatement, Datatype itemType, + MetadataTransactionContext mdTxnCtx, MetadataProvider md) throws AlgebricksException { + ExternalDetailsDecl edd = copyFromStatement.getExternalDetails(); + Map<String, String> properties = copyFromStatement.getExternalDetails().getProperties(); + String path = copyFromStatement.getPath(); + String pathKey = ExternalDataUtils.getPathKey(edd.getAdapter()); + properties.put(pathKey, path); + return properties; + } + + protected void handleCopyFromStatement(MetadataProvider metadataProvider, Statement stmt, + IHyracksClientConnection hcc) throws Exception { + CopyFromStatement copyStmt = (CopyFromStatement) stmt; + String datasetName = copyStmt.getDatasetName(); + metadataProvider.validateDatabaseObjectName(copyStmt.getNamespace(), datasetName, copyStmt.getSourceLocation()); + Namespace stmtActiveNamespace = getActiveNamespace(copyStmt.getNamespace()); + DataverseName dataverseName = stmtActiveNamespace.getDataverseName(); + String databaseName = stmtActiveNamespace.getDatabaseName(); + MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); + boolean bActiveTxn = true; + metadataProvider.setMetadataTxnContext(mdTxnCtx); + lockUtil.insertDeleteUpsertBegin(lockManager, metadataProvider.getLocks(), databaseName, dataverseName, + datasetName); + JobId jobId = null; + boolean atomic = false; + try { + metadataProvider.setWriteTransaction(true); + Dataset dataset = metadataProvider.findDataset(databaseName, dataverseName, copyStmt.getDatasetName()); + if (dataset == null) { + throw new CompilationException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, stmt.getSourceLocation(), + datasetName, + MetadataUtil.dataverseName(databaseName, dataverseName, metadataProvider.isUsingDatabase())); + } + Datatype itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getItemTypeDatabaseName(), + dataset.getItemTypeDataverseName(), dataset.getItemTypeName()); + // Copy statement with csv files will have a type expression + if (copyStmt.getTypeExpr() != null) { + TypeExpression itemTypeExpr = copyStmt.getTypeExpr(); + Triple<Namespace, String, Boolean> itemTypeQualifiedName = extractDatasetItemTypeName( + stmtActiveNamespace, datasetName, itemTypeExpr, false, stmt.getSourceLocation()); + Namespace itemTypeNamespace = itemTypeQualifiedName.first; + DataverseName itemTypeDataverseName = itemTypeNamespace.getDataverseName(); + String itemTypeName = itemTypeQualifiedName.second; + String itemTypeDatabaseName = itemTypeNamespace.getDatabaseName(); + IAType itemTypeEntity = translateType(itemTypeDatabaseName, itemTypeDataverseName, itemTypeName, + itemTypeExpr, mdTxnCtx); + itemType = + new Datatype(itemTypeDatabaseName, itemTypeDataverseName, itemTypeName, itemTypeEntity, true); + } + ExternalDetailsDecl externalDetails = copyStmt.getExternalDetails(); + Map<String, String> properties = createExternalDataPropertiesForCopyFromStmt(databaseName, dataverseName, + copyStmt, itemType, mdTxnCtx, metadataProvider); + ExternalDataUtils.normalize(properties); + ExternalDataUtils.validate(properties); + validateExternalDatasetProperties(externalDetails, properties, copyStmt.getSourceLocation(), mdTxnCtx, + appCtx); + CompiledCopyFromFileStatement cls = new CompiledCopyFromFileStatement(databaseName, dataverseName, + copyStmt.getDatasetName(), itemType, externalDetails.getAdapter(), properties); + cls.setSourceLocation(stmt.getSourceLocation()); + JobSpecification spec = apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionOutput, cls, - null, responsePrinter, warningCollector, null); ++ null, responsePrinter, warningCollector, null, jobFlags); + afterCompile(); + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); + bActiveTxn = false; + if (spec != null && !isCompileOnly()) { + atomic = dataset.isAtomic(); + if (atomic) { + int numParticipatingNodes = appCtx.getNodeJobTracker() + .getJobParticipatingNodes(spec, LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class) + .size(); + int numParticipatingPartitions = appCtx.getNodeJobTracker().getNumParticipatingPartitions(spec, + LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class); + List<Integer> participatingDatasetIds = new ArrayList<>(); + participatingDatasetIds.add(dataset.getDatasetId()); + spec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(participatingDatasetIds, + numParticipatingNodes, numParticipatingPartitions)); + } + jobId = JobUtils.runJob(hcc, spec, jobFlags, false); + + String nameBefore = Thread.currentThread().getName(); + try { + Thread.currentThread().setName(nameBefore + " : WaitForCompletionForJobId: " + jobId); + hcc.waitForCompletion(jobId); + } finally { + Thread.currentThread().setName(nameBefore); + } + if (atomic) { + globalTxManager.commitTransaction(jobId); + } + } + } catch (Exception e) { + if (atomic && jobId != null) { + globalTxManager.abortTransaction(jobId); + } + if (bActiveTxn) { + abort(e, e, mdTxnCtx); + } + throw e; + } finally { + metadataProvider.getLocks().unlock(); + } + } + + protected void handleCopyToStatement(MetadataProvider metadataProvider, Statement stmt, + IHyracksClientConnection hcc, IResultSet resultSet, ResultDelivery resultDelivery, + ResultMetadata outMetadata, IRequestParameters requestParameters, Map<String, IAObject> stmtParams, + Stats stats) throws Exception { + CopyToStatement copyTo = (CopyToStatement) stmt; + final IRequestTracker requestTracker = appCtx.getRequestTracker(); + final ClientRequest clientRequest = + (ClientRequest) requestTracker.get(requestParameters.getRequestReference().getUuid()); + final IMetadataLocker locker = new IMetadataLocker() { + @Override + public void lock() throws RuntimeDataException, InterruptedException { + try { + compilationLock.readLock().lockInterruptibly(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + ensureNotCancelled(clientRequest); + throw e; + } + } + + @Override + public void unlock() { + metadataProvider.getLocks().unlock(); + compilationLock.readLock().unlock(); + } + }; + final IStatementCompiler compiler = () -> { + long compileStart = System.nanoTime(); + MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); + boolean bActiveTxn = true; + metadataProvider.setMetadataTxnContext(mdTxnCtx); + try { + ExternalDetailsDecl edd = copyTo.getExternalDetailsDecl(); + edd.setProperties(createAndValidateAdapterConfigurationForCopyToStmt(edd, + ExternalDataConstants.WRITER_SUPPORTED_ADAPTERS, copyTo.getSourceLocation(), mdTxnCtx, + metadataProvider)); + + Map<VarIdentifier, IAObject> externalVars = createExternalVariables(copyTo, stmtParams); + // Query Rewriting (happens under the same ongoing metadata transaction) + LangRewritingContext langRewritingContext = createLangRewritingContext(metadataProvider, + declaredFunctions, null, warningCollector, copyTo.getVarCounter()); + Pair<IReturningStatement, Integer> rewrittenResult = apiFramework.reWriteQuery(langRewritingContext, + copyTo, sessionOutput, true, true, externalVars.keySet()); + + CompiledStatements.CompiledCopyToStatement compiledCopyToStatement = + new CompiledStatements.CompiledCopyToStatement(copyTo); + + // Query Compilation (happens under the same ongoing metadata transaction) + final JobSpecification jobSpec = apiFramework.compileQuery(hcc, metadataProvider, copyTo.getQuery(), + rewrittenResult.second, null, sessionOutput, compiledCopyToStatement, externalVars, - responsePrinter, warningCollector, requestParameters); ++ responsePrinter, warningCollector, requestParameters, jobFlags); + // update stats with count of compile-time warnings. needs to be adapted for multi-statement. + stats.updateTotalWarningsCount(warningCollector.getTotalWarningsCount()); + afterCompile(); + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); + stats.setCompileTime(System.nanoTime() - compileStart); + bActiveTxn = false; + return isCompileOnly() ? null : jobSpec; + } catch (Exception e) { + LOGGER.log(Level.INFO, e.getMessage(), e); + if (bActiveTxn) { + abort(e, e, mdTxnCtx); + } + throw e; + } + }; + + deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats, + requestParameters, true, null); + } + public JobSpecification handleInsertUpsertStatement(MetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc, IResultSet resultSet, ResultDelivery resultDelivery, ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters, diff --cc asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java index f712bdd61e,f9d75b510c..6c7f2f0d63 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java @@@ -85,12 -85,30 +87,12 @@@ public final class DatasetStreamStatsOp writer.open(); IStatsCollector coll = ctx.getStatsCollector(); if (coll != null) { - coll.add(new OperatorStats(operatorName)); + coll.add(new OperatorStats(operatorName, INVALID_ODID)); } INCServiceContext serviceCtx = ctx.getJobletContext().getServiceContext(); - indexStats = new HashMap<>(); - for (int i = 0; i < indexes.length; i++) { - IIndexDataflowHelper idxFlowHelper = indexes[i].create(serviceCtx, partition); - try { - idxFlowHelper.open(); - ILSMIndex indexInstance = (ILSMIndex) idxFlowHelper.getIndexInstance(); - long numPages = 0; - synchronized (indexInstance.getOperationTracker()) { - for (ILSMDiskComponent component : indexInstance.getDiskComponents()) { - long componentSize = component.getComponentSize(); - if (component instanceof AbstractLSMWithBloomFilterDiskComponent) { - componentSize -= ((AbstractLSMWithBloomFilterDiskComponent) component) - .getBloomFilter().getFileReference().getFile().length(); - } - numPages += componentSize / indexInstance.getBufferCache().getPageSize(); - } - } - indexStats.put(indexesNames[i], new IndexStats(indexesNames[i], numPages)); - } finally { - idxFlowHelper.close(); - } + indexesStats = new HashMap<>(); + if (indexes.length > 0) { + gatherIndexesStats(serviceCtx, partitionsMap[partition]); } } diff --cc hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java index 3e4e09f74d,5c532d3665..d1a356cb45 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java @@@ -80,7 -82,8 +81,7 @@@ import org.apache.hyracks.algebricks.co import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator; - import org.apache.hyracks.api.dataflow.ActivityId; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator; + import org.apache.hyracks.api.dataflow.OperatorDescriptorId; import org.apache.hyracks.api.exceptions.ErrorCode; import com.fasterxml.jackson.core.JsonFactory;
