This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit f139a63ddea92edb8efbf7034bb8c80d060ff24b Author: Ali Alsuliman <[email protected]> AuthorDate: Fri Oct 24 15:12:06 2025 -0700 [ASTERIXDB-3667][OTH] Log counts of currently running jobs based on kind - user model changes: no - storage format changes: no - interface changes: no Ext-ref: MB-69127 Change-Id: I80e1270a514fb9683f3f429e54aeaa2c5ee77a9a Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20523 Reviewed-by: Ali Alsuliman <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Tested-by: Ali Alsuliman <[email protected]> --- .../asterix/optimizer/base/AnalysisUtil.java | 4 +- .../app/active/ActiveNotificationHandler.java | 6 +-- .../asterix/app/active/FeedEventsListener.java | 7 ++- .../org/apache/asterix/app/cc/GlobalTxManager.java | 4 +- .../app/external/ExternalLibraryJobUtils.java | 8 +++- .../asterix/app/translator/QueryTranslator.java | 54 +++++++++++++--------- .../org/apache/asterix/utils/DataverseUtil.java | 3 ++ .../org/apache/asterix/utils/RebalanceUtil.java | 3 ++ .../asterix/test/active/ActiveStatsTest.java | 3 +- .../test/active/TestClusterControllerActor.java | 4 +- .../asterix/common/utils/AsterixJobProperty.java | 26 +++++++++++ .../apache/asterix/metadata/utils/DatasetUtil.java | 9 +++- .../apache/asterix/metadata/utils/IndexUtil.java | 18 ++++++-- .../apache/hyracks/api/job/HyracksJobProperty.java | 23 +++++++++ .../org/apache/hyracks/api/job/IJobProperty.java | 24 ++++++++++ .../java/org/apache/hyracks/api/job/JobKind.java | 29 ++++++++++++ .../apache/hyracks/api/job/JobSpecification.java | 10 ++-- .../apache/hyracks/control/cc/job/JobManager.java | 45 ++++++++++++++++-- 18 files changed, 230 insertions(+), 50 deletions(-) diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java index d8775c8741..191010312b 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java @@ -68,7 +68,9 @@ import org.apache.hyracks.algebricks.core.rewriter.base.IRuleSetKind; import org.apache.hyracks.api.comm.IFrame; import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.api.job.HyracksJobProperty; import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobKind; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.result.IResultSetReader; import org.apache.hyracks.api.result.ResultSetId; @@ -281,7 +283,7 @@ public class AnalysisUtil { compiler.optimize(); JobSpecification jobSpec = compiler.createJob(appCtx, new JobEventListenerFactory(newTxnId, false)); - + jobSpec.setProperty(HyracksJobProperty.JOB_KIND, JobKind.SYS_QUERY); JobId jobId = JobUtils.runJobIfActive(appCtx.getHcc(), jobSpec, true); IResultSetReader resultSetReader = appCtx.getResultSet().createReader(jobId, resultSetId); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java index 01e47b5ecd..347a49e475 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java @@ -33,6 +33,7 @@ import org.apache.asterix.active.IActiveNotificationHandler; import org.apache.asterix.active.message.ActivePartitionMessage; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; +import org.apache.asterix.common.utils.AsterixJobProperty; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; import org.apache.commons.lang3.tuple.Pair; @@ -54,7 +55,6 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active private static final Logger LOGGER = LogManager.getLogger(); private static final Level level = Level.DEBUG; - public static final String ACTIVE_ENTITY_PROPERTY_NAME = "ActiveJob"; private final Map<EntityId, IActiveEntityEventsListener> entityEventListeners; private final Map<JobId, EntityId> jobId2EntityId; private boolean suspended = false; @@ -97,10 +97,10 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active @Override public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification, IJobCapacityController.JobSubmissionStatus status) throws HyracksDataException { - Object property = jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME); + Object property = jobSpecification.getProperty(AsterixJobProperty.ACTIVE_ENTITY); if (!(property instanceof EntityId)) { if (property != null) { - LOGGER.debug("{} is not an ingestion job. job property={}", jobId, property); + LOGGER.debug("{} is not an ingestion job. found entity={}", jobId, property); } return; } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java index 4674f7ee4c..7f045a3137 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java @@ -18,6 +18,9 @@ */ package org.apache.asterix.app.active; +import static org.apache.asterix.common.utils.AsterixJobProperty.ACTIVE_ENTITY; +import static org.apache.hyracks.api.job.HyracksJobProperty.JOB_KIND; + import java.util.EnumSet; import java.util.List; @@ -45,6 +48,7 @@ import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartit import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobKind; import org.apache.hyracks.api.job.JobSpecification; public class FeedEventsListener extends ActiveEntityEventsListener { @@ -103,7 +107,8 @@ public class FeedEventsListener extends ActiveEntityEventsListener { Pair<JobSpecification, AlgebricksAbsolutePartitionConstraint> jobInfo = FeedOperations.buildStartFeedJob(mdProvider, feed, feedConnections, statementExecutor, hcc); JobSpecification feedJob = jobInfo.getLeft(); - feedJob.setProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME, entityId); + feedJob.setProperty(ACTIVE_ENTITY, entityId); + feedJob.setProperty(JOB_KIND, JobKind.INGESTION); // TODO(Yingyi): currently we do not check IFrameWriter protocol violations for Feed jobs. // We will need to design general exception handling mechanism for feeds. setLocations(jobInfo.getRight()); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java index d509ba685d..2ae8a0a83a 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/GlobalTxManager.java @@ -32,6 +32,7 @@ import org.apache.asterix.common.cluster.IGlobalTxManager; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.messaging.api.ICCMessageBroker; import org.apache.asterix.common.transactions.IGlobalTransactionContext; +import org.apache.asterix.common.utils.AsterixJobProperty; import org.apache.asterix.common.utils.StorageConstants; import org.apache.asterix.transaction.management.service.transaction.GlobalTransactionContext; import org.apache.asterix.transaction.management.service.transaction.GlobalTxInfo; @@ -56,7 +57,6 @@ public class GlobalTxManager implements IGlobalTxManager { private final Map<JobId, IGlobalTransactionContext> txnContextRepository = new ConcurrentHashMap<>(); private final ICCServiceContext serviceContext; private final IOManager ioManager; - public static final String GlOBAL_TX_PROPERTY_NAME = "GlobalTxProperty"; public GlobalTxManager(ICCServiceContext serviceContext, IOManager ioManager) { this.serviceContext = serviceContext; @@ -242,7 +242,7 @@ public class GlobalTxManager implements IGlobalTxManager { @Override public void notifyJobCreation(JobId jobId, JobSpecification spec, IJobCapacityController.JobSubmissionStatus status) throws HyracksException { - GlobalTxInfo globalTxInfo = (GlobalTxInfo) spec.getProperty(GlOBAL_TX_PROPERTY_NAME); + GlobalTxInfo globalTxInfo = (GlobalTxInfo) spec.getProperty(AsterixJobProperty.GLOBAL_TX); if (globalTxInfo != null) { beginTransaction(jobId, globalTxInfo.getNumNodes(), globalTxInfo.getNumPartitions(), globalTxInfo.getDatasetIds()); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryJobUtils.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryJobUtils.java index f32b8ef246..bd9b322baf 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryJobUtils.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalLibraryJobUtils.java @@ -42,6 +42,8 @@ import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.common.utils.Triple; import org.apache.hyracks.api.dataflow.IOperatorDescriptor; import org.apache.hyracks.api.io.FileSplit; +import org.apache.hyracks.api.job.HyracksJobProperty; +import org.apache.hyracks.api.job.JobKind; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; @@ -66,6 +68,10 @@ public class ExternalLibraryJobUtils { JobSpecification abortJobSpec = createLibraryAbortJobSpec(namespace, libraryName, appCtx, splitsAndConstraint); + prepareJobSpec.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DDL); + commitJobSpec.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DDL); + abortJobSpec.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DDL); + return new Triple<>(prepareJobSpec, commitJobSpec, abortJobSpec); } @@ -110,7 +116,7 @@ public class ExternalLibraryJobUtils { AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, opDesc, splitsAndConstraint.second); jobSpec.addRoot(opDesc); - + jobSpec.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DDL); return jobSpec; } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 5e4706c8a8..731f4111c1 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -22,6 +22,7 @@ import static org.apache.asterix.common.api.IIdentifierMapper.Modifier.PLURAL; import static org.apache.asterix.common.utils.IdentifierUtil.dataset; import static org.apache.asterix.common.utils.IdentifierUtil.dataverse; import static org.apache.asterix.lang.common.statement.CreateFullTextFilterStatement.FIELD_TYPE_STOPWORDS; +import static org.apache.hyracks.api.job.HyracksJobProperty.JOB_KIND; import static org.apache.hyracks.control.nc.result.ResultState.UNLIMITED_READS; import java.io.FileInputStream; @@ -63,7 +64,6 @@ import org.apache.asterix.api.http.server.ApiServlet; import org.apache.asterix.app.active.ActiveEntityEventsListener; import org.apache.asterix.app.active.ActiveNotificationHandler; import org.apache.asterix.app.active.FeedEventsListener; -import org.apache.asterix.app.cc.GlobalTxManager; import org.apache.asterix.app.external.ExternalLibraryJobUtils; import org.apache.asterix.app.result.ExecutionError; import org.apache.asterix.app.result.ResultHandle; @@ -108,6 +108,7 @@ import org.apache.asterix.common.metadata.IMetadataLockUtil; import org.apache.asterix.common.metadata.MetadataConstants; import org.apache.asterix.common.metadata.MetadataUtil; import org.apache.asterix.common.metadata.Namespace; +import org.apache.asterix.common.utils.AsterixJobProperty; import org.apache.asterix.common.utils.JobUtils; import org.apache.asterix.common.utils.JobUtils.ProgressState; import org.apache.asterix.common.utils.StorageConstants; @@ -286,6 +287,7 @@ import org.apache.hyracks.api.exceptions.Warning; import org.apache.hyracks.api.io.FileSplit; import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobKind; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.job.profiling.IOperatorStats; import org.apache.hyracks.api.result.IResultSet; @@ -4136,6 +4138,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; if (spec != null && !isCompileOnly()) { + spec.setProperty(JOB_KIND, JobKind.DML); runJob(hcc, spec); } } catch (Exception e) { @@ -4223,13 +4226,14 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class); List<Integer> participatingDatasetIds = new ArrayList<>(); participatingDatasetIds.add(dataset.getDatasetId()); - spec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo(participatingDatasetIds, + spec.setProperty(AsterixJobProperty.GLOBAL_TX, new GlobalTxInfo(participatingDatasetIds, numParticipatingNodes, numParticipatingPartitions)); } String reqId = requestParameters.getRequestReference().getUuid(); final IRequestTracker requestTracker = appCtx.getRequestTracker(); final ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqId); - jobId = runTrackJob(hcc, spec, jobFlags, reqId, requestParameters.getClientContextId(), clientRequest); + jobId = runTrackJob(hcc, spec, jobFlags, reqId, requestParameters.getClientContextId(), clientRequest, + JobKind.DML); clientRequest.markCancellable(); String nameBefore = Thread.currentThread().getName(); try { @@ -4357,7 +4361,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen }; deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats, - requestParameters, true, null, clientRequest); + requestParameters, true, null, clientRequest, JobKind.DML); } public JobSpecification handleInsertUpsertStatement(MetadataProvider metadataProvider, Statement stmt, @@ -4409,7 +4413,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqId); if (stmtInsertUpsert.getReturnExpression() != null) { deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats, - reqParams, true, stmt, clientRequest); + reqParams, true, stmt, clientRequest, JobKind.DML); } else { locker.lock(); JobId jobId = null; @@ -4429,10 +4433,11 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class); List<Integer> participatingDatasetIds = new ArrayList<>(); participatingDatasetIds.add(ds.getDatasetId()); - jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo( - participatingDatasetIds, numParticipatingNodes, numParticipatingPartitions)); + jobSpec.setProperty(AsterixJobProperty.GLOBAL_TX, new GlobalTxInfo(participatingDatasetIds, + numParticipatingNodes, numParticipatingPartitions)); } - jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId, reqParams.getClientContextId(), clientRequest); + jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId, reqParams.getClientContextId(), clientRequest, + JobKind.DML); clientRequest.markCancellable(); String nameBefore = Thread.currentThread().getName(); try { @@ -4497,14 +4502,14 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class); List<Integer> participatingDatasetIds = new ArrayList<>(); participatingDatasetIds.add(ds.getDatasetId()); - jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo( - participatingDatasetIds, numParticipatingNodes, numParticipatingPartitions)); + jobSpec.setProperty(AsterixJobProperty.GLOBAL_TX, new GlobalTxInfo(participatingDatasetIds, + numParticipatingNodes, numParticipatingPartitions)); } String reqId = requestParameters.getRequestReference().getUuid(); final IRequestTracker requestTracker = appCtx.getRequestTracker(); final ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqId); jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId, requestParameters.getClientContextId(), - clientRequest); + clientRequest, JobKind.DML); clientRequest.markCancellable(); String nameBefore = Thread.currentThread().getName(); try { @@ -4533,8 +4538,9 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } private static JobId runTrackJob(IHyracksClientConnection hcc, JobSpecification jobSpec, EnumSet<JobFlag> jobFlags, - String reqId, String clientCtxId, ClientRequest clientRequest) throws Exception { + String reqId, String clientCtxId, ClientRequest clientRequest, JobKind jobKind) throws Exception { jobSpec.setRequestId(reqId); + jobSpec.setProperty(JOB_KIND, jobKind); JobId jobId = JobUtils.runJobIfActive(hcc, jobSpec, jobFlags, false); LOGGER.info("Created job {} for uuid:{}, clientContextID:{}", jobId, reqId, clientCtxId); clientRequest.setJobId(jobId); @@ -5541,19 +5547,19 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } }; deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats, - requestParameters, true, null, clientRequest); + requestParameters, true, null, clientRequest, JobKind.USER_QUERY); } private void deliverResult(IHyracksClientConnection hcc, IResultSet resultSet, IStatementCompiler compiler, MetadataProvider metadataProvider, IMetadataLocker locker, ResultDelivery resultDelivery, ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters, boolean cancellable, - Statement atomicStmt, ClientRequest clientRequest) throws Exception { + Statement atomicStmt, ClientRequest clientRequest, JobKind jobKind) throws Exception { final ResultSetId resultSetId = metadataProvider.getResultSetId(); switch (resultDelivery) { case ASYNC: MutableBoolean printed = new MutableBoolean(false); executorService.submit(() -> asyncCreateAndRunJob(hcc, compiler, locker, resultDelivery, - requestParameters, cancellable, resultSetId, printed, metadataProvider, atomicStmt)); + requestParameters, cancellable, resultSetId, printed, metadataProvider, atomicStmt, jobKind)); synchronized (printed) { while (!printed.booleanValue()) { printed.wait(); @@ -5567,7 +5573,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen responsePrinter.addResultPrinter(new ResultsPrinter(appCtx, resultReader, metadataProvider.findOutputRecordType(), stats, sessionOutput)); responsePrinter.printResults(); - }, requestParameters, cancellable, appCtx, metadataProvider, atomicStmt); + }, requestParameters, cancellable, appCtx, metadataProvider, atomicStmt, jobKind); break; case DEFERRED: createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> { @@ -5584,7 +5590,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen outMetadata.getResultSets() .add(new ResultSetInfo(id, resultSetId, metadataProvider.findOutputRecordType())); } - }, requestParameters, cancellable, appCtx, metadataProvider, atomicStmt); + }, requestParameters, cancellable, appCtx, metadataProvider, atomicStmt, jobKind); break; default: break; @@ -5616,7 +5622,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen private void asyncCreateAndRunJob(IHyracksClientConnection hcc, IStatementCompiler compiler, IMetadataLocker locker, ResultDelivery resultDelivery, IRequestParameters requestParameters, boolean cancellable, - ResultSetId resultSetId, MutableBoolean printed, MetadataProvider metadataProvider, Statement atomicStmt) { + ResultSetId resultSetId, MutableBoolean printed, MetadataProvider metadataProvider, Statement atomicStmt, + JobKind jobKind) { Mutable<JobId> jobId = new MutableObject<>(JobId.INVALID); final CompletableFuture<JobId> jobIdFuture = new CompletableFuture<>(); Future<?> jobSubmitFuture = executorService.submit(() -> { @@ -5632,7 +5639,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen printed.setTrue(); printed.notify(); } - }, requestParameters, cancellable, appCtx, metadataProvider, atomicStmt); + }, requestParameters, cancellable, appCtx, metadataProvider, atomicStmt, jobKind); } catch (Exception e) { jobIdFuture.completeExceptionally(e); throw new RuntimeException(e); @@ -5717,7 +5724,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen private void createAndRunJob(IHyracksClientConnection hcc, EnumSet<JobFlag> jobFlags, Mutable<JobId> jId, IStatementCompiler compiler, IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer, IRequestParameters requestParameters, boolean cancellable, ICcApplicationContext appCtx, - MetadataProvider metadataProvider, Statement atomicStatement) throws Exception { + MetadataProvider metadataProvider, Statement atomicStatement, JobKind jobKind) throws Exception { String reqId = requestParameters.getRequestReference().getUuid(); final IRequestTracker requestTracker = appCtx.getRequestTracker(); final ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqId); @@ -5750,12 +5757,13 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen LSMTreeIndexInsertUpdateDeleteOperatorDescriptor.class); List<Integer> participatingDatasetIds = new ArrayList<>(); participatingDatasetIds.add(ds.getDatasetId()); - jobSpec.setProperty(GlobalTxManager.GlOBAL_TX_PROPERTY_NAME, new GlobalTxInfo( - participatingDatasetIds, numParticipatingNodes, numParticipatingPartitions)); + jobSpec.setProperty(AsterixJobProperty.GLOBAL_TX, new GlobalTxInfo(participatingDatasetIds, + numParticipatingNodes, numParticipatingPartitions)); } } - jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId, requestParameters.getClientContextId(), clientRequest); + jobId = runTrackJob(hcc, jobSpec, jobFlags, reqId, requestParameters.getClientContextId(), clientRequest, + jobKind); if (jId != null) { jId.setValue(jobId); } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java index d132b6b295..fd3e59b8e7 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java @@ -23,6 +23,8 @@ import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataverse; import org.apache.asterix.runtime.utils.RuntimeUtils; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper; +import org.apache.hyracks.api.job.HyracksJobProperty; +import org.apache.hyracks.api.job.JobKind; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor; @@ -47,6 +49,7 @@ public class DataverseUtil { new FileRemoveOperatorDescriptor(jobSpec, pp.getSplitsProvider(), false, pp.getComputeStorageMap()); AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, frod, pp.getConstraints()); jobSpec.addRoot(frod); + jobSpec.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DDL); return jobSpec; } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java index a2cbaa5d79..ca9f1d499b 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java @@ -63,6 +63,8 @@ import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.dataflow.IConnectorDescriptor; import org.apache.hyracks.api.dataflow.IOperatorDescriptor; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.job.HyracksJobProperty; +import org.apache.hyracks.api.job.JobKind; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory; import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor; @@ -348,6 +350,7 @@ public class RebalanceUtil { spec.connect(new OneToOneConnectorDescriptor(spec), upsertOp, 0, commitOp, 0); // Executes the job. + spec.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DML); JobUtils.forceRunJob(hcc, spec, true); } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java index 9ff0514b8d..53b4796e81 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java @@ -43,6 +43,7 @@ import org.apache.asterix.app.result.ResponsePrinter; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.metadata.DataverseName; import org.apache.asterix.common.metadata.MetadataUtil; +import org.apache.asterix.common.utils.AsterixJobProperty; import org.apache.asterix.external.feed.watch.WaitForStateSubscriber; import org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable; import org.apache.asterix.metadata.declared.MetadataProvider; @@ -98,7 +99,7 @@ public class ActiveStatsTest { // Mock JobSpecification JobSpecification jobSpec = Mockito.mock(JobSpecification.class); - Mockito.when(jobSpec.getProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME)).thenReturn(entityId); + Mockito.when(jobSpec.getProperty(AsterixJobProperty.ACTIVE_ENTITY)).thenReturn(entityId); // Mock MetadataProvider CCExtensionManager extensionManager = (CCExtensionManager) appCtx.getExtensionManager(); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java index 883a0cb069..9ea8573dd5 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.asterix.active.ActiveEvent; import org.apache.asterix.active.EntityId; import org.apache.asterix.app.active.ActiveNotificationHandler; +import org.apache.asterix.common.utils.AsterixJobProperty; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; import org.apache.hyracks.api.job.JobId; @@ -48,8 +49,7 @@ public class TestClusterControllerActor extends Actor { protected void doExecute(MetadataProvider actorMdProvider) throws Exception { // succeed JobSpecification jobSpecification = Mockito.mock(JobSpecification.class); - Mockito.when(jobSpecification.getProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME)) - .thenReturn(entityId); + Mockito.when(jobSpecification.getProperty(AsterixJobProperty.ACTIVE_ENTITY)).thenReturn(entityId); handler.notifyJobCreation(jobId, jobSpecification, IJobCapacityController.JobSubmissionStatus.EXECUTE); handler.notifyJobStart(jobId, null); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/AsterixJobProperty.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/AsterixJobProperty.java new file mode 100644 index 0000000000..760c5f7184 --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/AsterixJobProperty.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.utils; + +import org.apache.hyracks.api.job.IJobProperty; + +public enum AsterixJobProperty implements IJobProperty { + ACTIVE_ENTITY, + GLOBAL_TX +} diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java index c39395b094..e637f28308 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java @@ -94,6 +94,8 @@ import org.apache.hyracks.api.dataflow.value.ITypeTraits; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileSplit; +import org.apache.hyracks.api.job.HyracksJobProperty; +import org.apache.hyracks.api.job.JobKind; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; @@ -322,10 +324,11 @@ public class DatasetUtil { public static JobSpecification dropDatasetJobSpec(Dataset dataset, MetadataProvider metadataProvider, Set<IndexDropOperatorDescriptor.DropOption> options) throws AlgebricksException, ACIDException { LOGGER.info("DROP DATASET: " + dataset); + JobSpecification specPrimary = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext()); + specPrimary.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DDL); if (dataset.getDatasetType() == DatasetType.EXTERNAL) { - return RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext()); + return specPrimary; } - JobSpecification specPrimary = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext()); PartitioningProperties partitioningProperties = metadataProvider.getPartitioningProperties(dataset); IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(metadataProvider.getStorageComponentProvider().getStorageManager(), @@ -370,6 +373,7 @@ public class DatasetUtil { AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp, partitioningProperties.getConstraints()); spec.addRoot(indexCreateOp); + spec.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DDL); return spec; } @@ -766,6 +770,7 @@ public class DatasetUtil { IOperatorDescriptor truncateOp = new TruncateOperatorDescriptor(job, nc2Resources); AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(job, truncateOp, nodeSet); hcc = metadataProvider.getApplicationContext().getHcc(); + job.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DML); JobUtils.runJobIfActive(hcc, job, true); } else { // check should have been done by caller diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java index dfefb8ff9e..79db1a0ea4 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java @@ -75,7 +75,9 @@ import org.apache.hyracks.algebricks.core.algebra.metadata.IProjectionFiltration import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; import org.apache.hyracks.api.dataflow.value.ITypeTraits; import org.apache.hyracks.api.exceptions.SourceLocation; +import org.apache.hyracks.api.job.HyracksJobProperty; import org.apache.hyracks.api.job.IJobletEventListenerFactory; +import org.apache.hyracks.api.job.JobKind; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.storage.am.common.impls.DefaultTupleProjectorFactory; import org.apache.hyracks.storage.am.common.impls.NoOpTupleProjectorFactory; @@ -162,21 +164,27 @@ public class IndexUtil { Dataset dataset, SourceLocation sourceLoc) throws AlgebricksException { ISecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper.createIndexOperationsHelper(dataset, index, metadataProvider, sourceLoc); - return secondaryIndexHelper.buildDropJobSpec(EnumSet.noneOf(DropOption.class)); + JobSpecification spec = secondaryIndexHelper.buildDropJobSpec(EnumSet.noneOf(DropOption.class)); + spec.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DDL); + return spec; } public static JobSpecification buildDropIndexJobSpec(Index index, MetadataProvider metadataProvider, Dataset dataset, Set<DropOption> options, SourceLocation sourceLoc) throws AlgebricksException { ISecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper.createIndexOperationsHelper(dataset, index, metadataProvider, sourceLoc); - return secondaryIndexHelper.buildDropJobSpec(options); + JobSpecification spec = secondaryIndexHelper.buildDropJobSpec(options); + spec.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DDL); + return spec; } public static JobSpecification buildSecondaryIndexCreationJobSpec(Dataset dataset, Index index, MetadataProvider metadataProvider, SourceLocation sourceLoc) throws AlgebricksException { ISecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper.createIndexOperationsHelper(dataset, index, metadataProvider, sourceLoc); - return secondaryIndexHelper.buildCreationJobSpec(); + JobSpecification spec = secondaryIndexHelper.buildCreationJobSpec(); + spec.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DDL); + return spec; } public static JobSpecification buildSecondaryIndexLoadingJobSpec(Dataset dataset, Index index, @@ -195,7 +203,9 @@ public class IndexUtil { secondaryIndexHelper = SecondaryTreeIndexOperationsHelper.createIndexOperationsHelper(dataset, index, metadataProvider, sourceLoc); } - return secondaryIndexHelper.buildLoadingJobSpec(); + JobSpecification spec = secondaryIndexHelper.buildLoadingJobSpec(); + spec.setProperty(HyracksJobProperty.JOB_KIND, JobKind.DML); + return spec; } private static boolean supportsCorrelated(DatasetConfig.IndexType indexType) { diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/HyracksJobProperty.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/HyracksJobProperty.java new file mode 100644 index 0000000000..d8bf5cba2d --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/HyracksJobProperty.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.job; + +public enum HyracksJobProperty implements IJobProperty { + JOB_KIND +} diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobProperty.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobProperty.java new file mode 100644 index 0000000000..6d575796af --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobProperty.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.job; + +import java.io.Serializable; + +public interface IJobProperty extends Serializable { +} diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobKind.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobKind.java new file mode 100644 index 0000000000..4c5e8692a9 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobKind.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.job; + +import java.io.Serializable; + +public enum JobKind implements Serializable { + USER_QUERY, + SYS_QUERY, + DDL, + DML, + INGESTION +} diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java index f644703b1b..7761ceca8e 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java @@ -70,7 +70,7 @@ public class JobSpecification implements Serializable, IOperatorDescriptorRegist private transient Map<Object, String> logical2PhysicalMap; - private final Map<String, Serializable> properties; + private final Map<IJobProperty, Serializable> properties; private final Set<Constraint> userConstraints; @@ -155,12 +155,12 @@ public class JobSpecification implements Serializable, IOperatorDescriptorRegist Pair.of(Pair.of(producerOp, producerPort), Pair.of(consumerOp, consumerPort))); } - public void setProperty(String name, Serializable value) { - properties.put(name, value); + public void setProperty(IJobProperty property, Serializable value) { + properties.put(property, value); } - public Serializable getProperty(String name) { - return properties.get(name); + public Serializable getProperty(IJobProperty property) { + return properties.get(property); } private <T> void extend(List<T> list, int index) { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java index 9ecd165120..269478a5bb 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java @@ -19,6 +19,7 @@ package org.apache.hyracks.control.cc.job; +import java.io.Serializable; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.time.ZoneId; @@ -38,7 +39,9 @@ import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.exceptions.IError; import org.apache.hyracks.api.exceptions.IFormattedException; import org.apache.hyracks.api.job.ActivityClusterGraph; +import org.apache.hyracks.api.job.HyracksJobProperty; import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobKind; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.job.JobStatus; import org.apache.hyracks.api.job.resource.IClusterCapacity; @@ -75,6 +78,11 @@ public class JobManager implements IJobManager { private final AtomicLong totalFailedJobs; private final AtomicLong totalCancelledJobs; private final AtomicLong totalRejectedJobs; + private long queryJobs; + private long sysQueryJobs; + private long ddlJobs; + private long dmlJobs; + private long ingestionJobs; private IJobQueue jobQueue; public JobManager(CCConfig ccConfig, ClusterControllerService ccs, IJobCapacityController jobCapacityController) { @@ -244,9 +252,9 @@ public class JobManager implements IJobManager { JobId jobId = run.getJobId(); Throwable caughtException = null; CCServiceContext serviceCtx = ccs.getContext(); + JobSpecification spec = run.getJobSpecification(); try { - serviceCtx.notifyJobFinish(jobId, run.getJobSpecification(), run.getPendingStatus(), - run.getPendingExceptions()); + serviceCtx.notifyJobFinish(jobId, spec, run.getPendingStatus(), run.getPendingExceptions()); } catch (Exception e) { LOGGER.error("Exception notifying job finish {}", jobId, e); caughtException = e; @@ -254,6 +262,7 @@ public class JobManager implements IJobManager { run.setStatus(run.getPendingStatus(), run.getPendingExceptions()); run.setEndTime(System.currentTimeMillis()); if (activeRunMap.remove(jobId) != null) { + updateActiveJobCounts(spec, -1); incrementJobCounters(run, successful); // non-active jobs have zero capacity @@ -283,6 +292,31 @@ public class JobManager implements IJobManager { } } + private void updateActiveJobCounts(JobSpecification spec, int delta) { + Serializable property = spec.getProperty(HyracksJobProperty.JOB_KIND); + if (property instanceof JobKind) { + switch ((JobKind) property) { + case USER_QUERY: + queryJobs += delta; + break; + case SYS_QUERY: + sysQueryJobs += delta; + break; + case DDL: + ddlJobs += delta; + break; + case DML: + dmlJobs += delta; + break; + case INGESTION: + ingestionJobs += delta; + break; + default: + break; + } + } + } + /** * Increments the job counters depending on the status * @@ -394,6 +428,7 @@ public class JobManager implements IJobManager { run.setStartTimeZoneId(ZoneId.systemDefault().getId()); JobId jobId = run.getJobId(); logJobCapacity(run, "running", Level.INFO); + updateActiveJobCounts(run.getJobSpecification(), 1); activeRunMap.put(jobId, run); run.setStatus(JobStatus.RUNNING, null); executeJobInternal(run); @@ -452,10 +487,10 @@ public class JobManager implements IJobManager { } IReadOnlyClusterCapacity clusterCapacity = jobCapacityController.getClusterCapacity(); LOGGER.log(lvl, - "{} {}, job memory={}, cpu={}, (new) cluster memory={}, cpu={}, currently running={}, queued={}", + "{} {}, job memory={} & cpu={}, (new) cluster memory={}, cpu={}, queued={}, currently running={}, user queries={}, system queries={}, ddls={}, dmls={}, ingestions={}", jobStateDesc, jobRun.getJobId(), requiredMemory, requiredCPUs, - clusterCapacity.getAggregatedMemoryByteSize(), clusterCapacity.getAggregatedCores(), - getRunningJobsCount(), jobQueue.size()); + clusterCapacity.getAggregatedMemoryByteSize(), clusterCapacity.getAggregatedCores(), jobQueue.size(), + getRunningJobsCount(), queryJobs, sysQueryJobs, ddlJobs, dmlJobs, ingestionJobs); } private void handleException(HyracksException ex) {
