This is an automated email from the ASF dual-hosted git repository.
bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new daeadbf11 Fix missing attachment stuck actions (#5355)
daeadbf11 is described below
commit daeadbf11fb46d0f4471fef8e56c1741e1249ab8
Author: Brendan Doyle <[email protected]>
AuthorDate: Thu Dec 8 14:19:24 2022 -0800
Fix missing attachment stuck actions (#5355)
* init
* compile
* scalafmt
* fix test compilation
Co-authored-by: Brendan Doyle <[email protected]>
---
.../openwhisk/core/database/DocumentFactory.scala | 3 ++-
.../apache/openwhisk/core/entity/WhiskAction.scala | 20 +++++++++++---------
.../apache/openwhisk/core/controller/ApiUtils.scala | 4 ++--
.../v2/FunctionPullingContainerProxy.scala | 8 ++++----
.../core/scheduler/queue/QueueManager.scala | 16 ++++++++++++----
.../v2/test/FunctionPullingContainerProxyTests.scala | 2 +-
.../scheduler/queue/test/QueueManagerTests.scala | 4 ++--
7 files changed, 34 insertions(+), 23 deletions(-)
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/DocumentFactory.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/DocumentFactory.scala
index 75f2d70a2..b6bbef095 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/DocumentFactory.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/DocumentFactory.scala
@@ -164,7 +164,8 @@ trait DocumentFactory[W <: DocumentRevisionProvider]
extends MultipleReadersSing
db: ArtifactStore[Wsuper],
doc: DocId,
rev: DocRevision = DocRevision.empty,
- fromCache: Boolean = cacheEnabled)(implicit transid: TransactionId, mw:
Manifest[W]): Future[W] = {
+ fromCache: Boolean = cacheEnabled,
+ ignoreMissingAttachment: Boolean = false)(implicit transid: TransactionId,
mw: Manifest[W]): Future[W] = {
implicit val logger = db.logging
implicit val ec = db.executionContext
val key = doc.asDocInfo(rev)
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskAction.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskAction.scala
index a51e55278..e06057367 100644
---
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskAction.scala
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskAction.scala
@@ -21,7 +21,6 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import java.nio.charset.StandardCharsets.UTF_8
import java.time.Instant
import java.util.Base64
-
import akka.http.scaladsl.model.ContentTypes
import scala.concurrent.ExecutionContext
@@ -30,9 +29,7 @@ import scala.util.{Failure, Success, Try}
import spray.json._
import spray.json.DefaultJsonProtocol._
import org.apache.openwhisk.common.TransactionId
-import org.apache.openwhisk.core.database.ArtifactStore
-import org.apache.openwhisk.core.database.DocumentFactory
-import org.apache.openwhisk.core.database.CacheChangeNotification
+import org.apache.openwhisk.core.database.{ArtifactStore,
CacheChangeNotification, DocumentFactory, NoDocumentException}
import org.apache.openwhisk.core.entity.Attachments._
import org.apache.openwhisk.core.entity.types.EntityStore
@@ -421,11 +418,13 @@ object WhiskAction extends DocumentFactory[WhiskAction]
with WhiskEntityQueries[
}
// overridden to retrieve attached code
- override def get[A >: WhiskAction](
- db: ArtifactStore[A],
- doc: DocId,
- rev: DocRevision = DocRevision.empty,
- fromCache: Boolean)(implicit transid: TransactionId, mw:
Manifest[WhiskAction]): Future[WhiskAction] = {
+ override def get[A >: WhiskAction](db: ArtifactStore[A],
+ doc: DocId,
+ rev: DocRevision = DocRevision.empty,
+ fromCache: Boolean,
+ ignoreMissingAttachment: Boolean = false)(
+ implicit transid: TransactionId,
+ mw: Manifest[WhiskAction]): Future[WhiskAction] = {
implicit val ec = db.executionContext
@@ -439,6 +438,9 @@ object WhiskAction extends DocumentFactory[WhiskAction]
with WhiskEntityQueries[
val newAction = a.copy(exec = exec.inline(boas.toByteArray))
newAction.revision(a.rev)
newAction
+ }).recover({
+ case _: NoDocumentException if ignoreMissingAttachment =>
+ action
})
}
diff --git
a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/ApiUtils.scala
b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/ApiUtils.scala
index ef2dc5110..ec08f5b30 100644
---
a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/ApiUtils.scala
+++
b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/ApiUtils.scala
@@ -320,7 +320,7 @@ trait WriteOps extends Directives {
// marker to return an existing doc with status OK rather than conflict if
overwrite is false
case class IdentityPut(self: A) extends Throwable
- onComplete(factory.get(datastore, docid) flatMap { doc =>
+ onComplete(factory.get(datastore, docid, ignoreMissingAttachment =
overwrite) flatMap { doc =>
if (overwrite) {
logging.debug(this, s"[PUT] entity exists, will try to update '$doc'")
update(doc).map(updatedDoc => (Some(doc), updatedDoc))
@@ -392,7 +392,7 @@ trait WriteOps extends Directives {
format: RootJsonFormat[A],
notifier: Option[CacheChangeNotification],
ma: Manifest[A]) = {
- onComplete(factory.get(datastore, docid) flatMap { entity =>
+ onComplete(factory.get(datastore, docid, ignoreMissingAttachment = true)
flatMap { entity =>
confirm(entity) flatMap {
case _ =>
factory.del(datastore, entity.docinfo) map { _ =>
diff --git
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
index 988416fc1..896455c04 100644
---
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
+++
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
@@ -188,7 +188,7 @@ class FunctionPullingContainerProxy(
Option[ExecutableWhiskAction]) => Future[Container],
entityStore: ArtifactStore[WhiskEntity],
namespaceBlacklist: NamespaceBlacklist,
- get: (ArtifactStore[WhiskEntity], DocId, DocRevision, Boolean) =>
Future[WhiskAction],
+ get: (ArtifactStore[WhiskEntity], DocId, DocRevision, Boolean, Boolean) =>
Future[WhiskAction],
dataManagementService: ActorRef,
clientProxyFactory: (ActorRefFactory,
String,
@@ -786,7 +786,7 @@ class FunctionPullingContainerProxy(
whenUnhandled {
case Event(PingCache, data: WarmData) =>
val actionId =
data.action.fullyQualifiedName(false).toDocId.asDocInfo(data.revision)
- get(entityStore, actionId.id, actionId.rev, true).map(_ => {
+ get(entityStore, actionId.id, actionId.rev, true, false).map(_ => {
logging.debug(
this,
s"Refreshed function cache for action ${data.action} from container
${data.container.containerId}.")
@@ -913,7 +913,7 @@ class FunctionPullingContainerProxy(
if (actionid.rev == DocRevision.empty)
logging.warn(this, s"revision was not provided for ${actionid.id}")
- get(entityStore, actionid.id, actionid.rev, actionid.rev !=
DocRevision.empty)
+ get(entityStore, actionid.id, actionid.rev, actionid.rev !=
DocRevision.empty, false)
.flatMap { action =>
action.toExecutableWhiskAction match {
case Some(executable) =>
@@ -1264,7 +1264,7 @@ object FunctionPullingContainerProxy {
Option[ExecutableWhiskAction]) => Future[Container],
entityStore: ArtifactStore[WhiskEntity],
namespaceBlacklist: NamespaceBlacklist,
- get: (ArtifactStore[WhiskEntity], DocId, DocRevision, Boolean) =>
Future[WhiskAction],
+ get: (ArtifactStore[WhiskEntity], DocId, DocRevision, Boolean,
Boolean) => Future[WhiskAction],
dataManagementService: ActorRef,
clientProxyFactory: (ActorRefFactory,
String,
diff --git
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
index ad7b17103..e2857f498 100644
---
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
+++
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
@@ -63,7 +63,11 @@ case class QueueManagerConfig(maxRetriesToGetQueue: Int,
maxSchedulingTime: Fini
class QueueManager(
entityStore: ArtifactStore[WhiskEntity],
- getWhiskActionMetaData: (ArtifactStore[WhiskEntity], DocId, DocRevision,
Boolean) => Future[WhiskActionMetaData],
+ getWhiskActionMetaData: (ArtifactStore[WhiskEntity],
+ DocId,
+ DocRevision,
+ Boolean,
+ Boolean) => Future[WhiskActionMetaData],
etcdClient: EtcdClient,
schedulerEndpoints: SchedulerEndpoints,
schedulerId: SchedulerInstanceId,
@@ -340,7 +344,7 @@ class QueueManager(
private def recoverQueue(msg: ActivationMessage)(implicit transid:
TransactionId): Unit = {
val start = transid.started(this, LoggingMarkers.SCHEDULER_QUEUE_RECOVER)
logging.info(this, s"Recover a queue for ${msg.action},")
- getWhiskActionMetaData(entityStore, msg.action.toDocId, msg.revision,
false)
+ getWhiskActionMetaData(entityStore, msg.action.toDocId, msg.revision,
false, false)
.map { actionMetaData: WhiskActionMetaData =>
actionMetaData.toExecutableWhiskAction match {
case Some(_) =>
@@ -370,7 +374,7 @@ class QueueManager(
logging.info(this, s"Create a new queue for ${newAction}, rev:
${msg.revision}")
- getWhiskActionMetaData(entityStore, newAction.toDocId, msg.revision,
msg.revision != DocRevision.empty)
+ getWhiskActionMetaData(entityStore, newAction.toDocId, msg.revision,
msg.revision != DocRevision.empty, false)
.map { actionMetaData: WhiskActionMetaData =>
actionMetaData.toExecutableWhiskAction match {
// Always use revision got from Database, there can be 2 cases for
the actionMetaData.rev
@@ -668,7 +672,11 @@ object QueueManager {
def props(
entityStore: ArtifactStore[WhiskEntity],
- getWhiskActionMetaData: (ArtifactStore[WhiskEntity], DocId, DocRevision,
Boolean) => Future[WhiskActionMetaData],
+ getWhiskActionMetaData: (ArtifactStore[WhiskEntity],
+ DocId,
+ DocRevision,
+ Boolean,
+ Boolean) => Future[WhiskActionMetaData],
etcdClient: EtcdClient,
schedulerEndpoints: SchedulerEndpoints,
schedulerId: SchedulerInstanceId,
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
index a3dbae818..2a51d3cad 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
@@ -209,7 +209,7 @@ class FunctionPullingContainerProxyTests
/** get WhiskAction*/
def getWhiskAction(response: Future[WhiskAction]) = LoggedFunction {
- (_: ArtifactStore[WhiskEntity], _: DocId, _: DocRevision, _: Boolean) =>
+ (_: ArtifactStore[WhiskEntity], _: DocId, _: DocRevision, _: Boolean, _:
Boolean) =>
response
}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
index 6d8d80f72..ebf5a5cd2 100644
---
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
+++
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
@@ -154,7 +154,7 @@ class QueueManagerTests
/**get WhiskActionMetaData*/
def getWhiskActionMetaData(meta: Future[WhiskActionMetaData]) =
LoggedFunction {
- (_: ArtifactStore[WhiskEntity], _: DocId, _: DocRevision, _: Boolean) =>
+ (_: ArtifactStore[WhiskEntity], _: DocId, _: DocRevision, _: Boolean, _:
Boolean) =>
meta
}
@@ -496,7 +496,7 @@ class QueueManagerTests
val finalFqn = newFqn.copy(version = Some(SemVer(0, 0, 3)))
val finalRevision = DocRevision("3-test-revision")
// simulate the case that action is updated again while fetch it from
database
- def newGet(store: ArtifactStore[WhiskEntity], docId: DocId, docRevision:
DocRevision, fromCache: Boolean) = {
+ def newGet(store: ArtifactStore[WhiskEntity], docId: DocId, docRevision:
DocRevision, fromCache: Boolean, ignoreMissingAttachment: Boolean) = {
if (docRevision == DocRevision.empty) {
Future(convertToMetaData(action.copy(version = SemVer(0, 0,
3)).toWhiskAction.revision(finalRevision)))
} else