This is an automated email from the ASF dual-hosted git repository.
bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 60ca6605b Add Scheduler Queue Metric for Not Processing Any
Activations (#5386)
60ca6605b is described below
commit 60ca6605bb081f99906cff1a21caf75d47e414fa
Author: Brendan Doyle <[email protected]>
AuthorDate: Tue Mar 7 10:17:34 2023 -0800
Add Scheduler Queue Metric for Not Processing Any Activations (#5386)
* Add Scheduler Queue Metric for Not Processing Any Activations
* fix timeout comparison
* account for action timeout being longer than queue retention
---------
Co-authored-by: Brendan Doyle <[email protected]>
---
.../scala/org/apache/openwhisk/common/Logging.scala | 7 +++++++
.../openwhisk/core/scheduler/queue/MemoryQueue.scala | 19 ++++++++++++++++---
2 files changed, 23 insertions(+), 3 deletions(-)
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index 2bc3b1c81..c2c4bbf59 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -612,6 +612,13 @@ object LoggingMarkers {
counter,
Some(actionWithoutVersion),
Map("namespace" -> namespace, "action" ->
actionWithVersion))(MeasurementUnit.none)
+ def SCHEDULER_QUEUE_NOT_PROCESSING(namespace: String, actionWithVersion:
String, actionWithoutVersion: String) =
+ LogMarkerToken(
+ scheduler,
+ "queueNotProcessing",
+ counter,
+ Some(actionWithoutVersion),
+ Map("namespace" -> namespace, "action" ->
actionWithVersion))(MeasurementUnit.none)
/*
* General markers
diff --git
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
index 8602702dd..d21a6104d 100644
---
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
+++
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala
@@ -46,10 +46,10 @@ import
org.apache.openwhisk.http.Messages.{namespaceLimitUnderZero, tooManyConcu
import pureconfig.loadConfigOrThrow
import spray.json._
import pureconfig.generic.auto._
-import scala.collection.JavaConverters._
+import scala.collection.JavaConverters._
import java.time.{Duration, Instant}
-import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import scala.annotation.tailrec
import scala.collection.immutable.Queue
import scala.collection.mutable
@@ -139,6 +139,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
checkToDropStaleActivation: (Clock,
Queue[TimeSeriesActivationEntry],
Long,
+ AtomicLong,
String,
WhiskActionMetaData,
MemoryQueueState,
@@ -173,6 +174,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
private[queue] var queue = Queue.empty[TimeSeriesActivationEntry]
private[queue] var in = new AtomicInteger(0)
+ private[queue] val lastActivationPulledTime = new
AtomicLong(Instant.now.toEpochMilli)
private[queue] val namespaceContainerCount =
NamespaceContainerCount(invocationNamespace, etcdClient, watcherService)
private[queue] var averageDuration: Option[Double] = None
private[queue] var averageDurationBuffer =
AverageRingBuffer(queueConfig.durationBufferSize)
@@ -574,7 +576,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
case Event(DropOld, _) =>
if (queue.nonEmpty && Duration
.between(queue.head.timestamp, clock.now())
- .compareTo(Duration.ofMillis(actionRetentionTimeout)) < 0) {
+ .compareTo(Duration.ofMillis(actionRetentionTimeout)) >= 0) {
logging.error(
this,
s"[$invocationNamespace:$action:$stateName] Drop some stale
activations for $revision, existing container is ${containers.size}, inProgress
container is ${creationIds.size}, state data: $stateData, in is $in, current:
${queue.size}.")
@@ -920,6 +922,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
clock,
queue,
actionRetentionTimeout,
+ lastActivationPulledTime,
invocationNamespace,
actionMetaData,
stateName,
@@ -1024,6 +1027,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
MetricEmitter.emitHistogramMetric(
LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString,
action.toStringWithoutVersion),
totalTimeInScheduler.toMillis)
+ lastActivationPulledTime.set(Instant.now.toEpochMilli)
res.trySuccess(Right(msg))
in.decrementAndGet()
stay
@@ -1049,6 +1053,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
MetricEmitter.emitHistogramMetric(
LoggingMarkers.SCHEDULER_WAIT_TIME(action.asString,
action.toStringWithoutVersion),
totalTimeInScheduler.toMillis)
+ lastActivationPulledTime.set(Instant.now.toEpochMilli)
sender ! GetActivationResponse(Right(msg))
tryDisableActionThrottling()
@@ -1186,6 +1191,7 @@ object MemoryQueue {
def checkToDropStaleActivation(clock: Clock,
queue: Queue[TimeSeriesActivationEntry],
maxRetentionMs: Long,
+ lastActivationExecutedTime: AtomicLong,
invocationNamespace: String,
actionMetaData: WhiskActionMetaData,
stateName: MemoryQueueState,
@@ -1201,6 +1207,13 @@ object MemoryQueue {
logging.info(
this,
s"[$invocationNamespace:$action:$stateName] some activations are stale
msg: ${queue.head.msg.activationId}.")
+ val timeSinceLastActivationGrabbed = clock.now().toEpochMilli -
lastActivationExecutedTime.get()
+ if (timeSinceLastActivationGrabbed > maxRetentionMs &&
timeSinceLastActivationGrabbed > actionMetaData.limits.timeout.millis) {
+ MetricEmitter.emitGaugeMetric(
+ LoggingMarkers
+ .SCHEDULER_QUEUE_NOT_PROCESSING(invocationNamespace,
action.asString, action.toStringWithoutVersion),
+ 1)
+ }
queueRef ! DropOld
}