This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit b4120d1672115d1551340cccea230b67fb1c258f Merge: aca87de0fe 5b4e88e3d9 Author: Michael Blow <[email protected]> AuthorDate: Fri Jun 7 15:22:49 2024 -0400 Merge branch 'gerrit/neo' into 'gerrit/trinity' Change-Id: Ia97d6afc9f854fa76ed1591ac083f3a34ca26281 .../app/active/ActiveEntityEventsListener.java | 20 ++++++---------- .../asterix/app/message/CancelQueryRequest.java | 4 ++-- .../asterix/app/message/CancelQueryResponse.java | 2 +- .../query-ASTERIXDB-3418.1.query.sqlpp | 26 ++++---------------- .../query-ASTERIXDB-3418.2.query.sqlpp | 26 ++++---------------- .../query-ASTERIXDB-3418.3.query.sqlpp | 26 ++++---------------- .../query-ASTERIXDB-3418.4.query.sqlpp | 26 ++++---------------- .../results/comparison/arrays/arrays.020.adm | 2 +- .../query-ASTERIXDB-3418.1.adm | 1 + .../query-ASTERIXDB-3418.2.adm | 1 + .../query-ASTERIXDB-3418.3.adm | 1 + .../query-ASTERIXDB-3418.4.adm | 1 + .../test/resources/runtimets/testsuite_sqlpp.xml | 5 ++++ .../data/nontagged/comparators/ComparatorUtil.java | 20 +++++++++++----- .../evaluators/functions/SleepDescriptor.java | 8 +++---- .../job/resource/JobCapacityController.java | 5 ++++ ...stractOneInputOneOutputOneFramePushRuntime.java | 2 +- .../job/resource/DefaultJobCapacityController.java | 10 ++++++++ .../api/job/resource/IJobCapacityController.java | 6 +++++ .../apache/hyracks/control/cc/job/JobManager.java | 22 +++++++++++++++++ .../hyracks/control/cc/scheduler/FIFOJobQueue.java | 5 ++++ .../hyracks/control/cc/scheduler/IJobQueue.java | 7 ++++++ .../control/cc/work/RemoveDeadNodesWork.java | 2 +- .../MaterializingPipelinedPartition.java | 7 ------ .../control/nc/work/NotifyTaskFailureWork.java | 5 +++- .../OptimizedHybridHashJoinOperatorDescriptor.java | 28 +++++----------------- .../dataflow/std/sort/TupleSorterHeapSort.java | 9 ------- .../AbstractMultiNCIntegrationTest.java | 10 ++++++++ 28 files changed, 135 insertions(+), 152 deletions(-) diff --cc hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java index 2b03da50d0,234744933a..ac1a9d9619 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java @@@ -433,32 -358,23 +437,50 @@@ public class JobManager implements IJob private void releaseJobCapacity(JobRun jobRun) { final JobSpecification job = jobRun.getJobSpecification(); jobCapacityController.release(job); + logJobCapacity(jobRun, "released"); + } + + private void logJobCapacity(JobRun jobRun, String jobStateDesc) { + IClusterCapacity requiredResources = jobRun.getJobSpecification().getRequiredClusterCapacity(); + if (requiredResources == null) { + return; + } + long requiredMemory = requiredResources.getAggregatedMemoryByteSize(); + int requiredCPUs = requiredResources.getAggregatedCores(); + if (requiredMemory == 0 && requiredCPUs == 0) { + return; + } + IReadOnlyClusterCapacity clusterCapacity = jobCapacityController.getClusterCapacity(); + LOGGER.info("{} {}, memory={}, cpu={}, (new) cluster memory={}, cpu={}, currently running={}, queued={}", + jobStateDesc, jobRun.getJobId(), requiredMemory, requiredCPUs, + clusterCapacity.getAggregatedMemoryByteSize(), clusterCapacity.getAggregatedCores(), + getRunningJobsCount(), jobQueue.size()); } + + private void handleException(HyracksException ex) { + if (ex.getError().isPresent()) { + IError error = ex.getError().get(); + switch ((ErrorCode) error) { + case JOB_QUEUE_FULL: + case JOB_REQUIREMENTS_EXCEED_CAPACITY: + incrementRejectedJobs(); + } + } + } + + private void incrementSuccessfulJobs() { + successfulJobs.incrementAndGet(); + } + + private void incrementFailedJobs() { + totalFailedJobs.incrementAndGet(); + } + + private void incrementCancelledJobs() { + totalCancelledJobs.incrementAndGet(); + } + + private void incrementRejectedJobs() { + totalRejectedJobs.incrementAndGet(); + } } diff --cc hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java index e04eebe5fa,0f31491feb..ed53a7ec7e --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java @@@ -412,11 -399,6 +409,7 @@@ public class OptimizedHybridHashJoinOpe writer.open(); state.hybridHJ.initProbe(probComp); + state.hybridHJ.setOperatorStats(stats); - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("OptimizedHybridHashJoin is starting the probe phase."); - } } @Override @@@ -496,17 -475,6 +486,11 @@@ } } + @Override + public void setOperatorStats(IOperatorStats stats) { + this.stats = stats; + } + - private void logProbeComplete() { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("OptimizedHybridHashJoin closed its probe phase"); - } - } - //The buildSideReader should be always the original buildSideReader, so should the probeSideReader private void joinPartitionPair(RunFileReader buildSideReader, RunFileReader probeSideReader, int buildSizeInTuple, int probeSizeInTuple, int level) throws HyracksDataException {
