This is an automated email from the ASF dual-hosted git repository.

bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new daeadbf11 Fix missing attachment stuck actions (#5355)
daeadbf11 is described below

commit daeadbf11fb46d0f4471fef8e56c1741e1249ab8
Author: Brendan Doyle <[email protected]>
AuthorDate: Thu Dec 8 14:19:24 2022 -0800

    Fix missing attachment stuck actions (#5355)
    
    * init
    
    * compile
    
    * scalafmt
    
    * fix test compilation
    
    Co-authored-by: Brendan Doyle <[email protected]>
---
 .../openwhisk/core/database/DocumentFactory.scala    |  3 ++-
 .../apache/openwhisk/core/entity/WhiskAction.scala   | 20 +++++++++++---------
 .../apache/openwhisk/core/controller/ApiUtils.scala  |  4 ++--
 .../v2/FunctionPullingContainerProxy.scala           |  8 ++++----
 .../core/scheduler/queue/QueueManager.scala          | 16 ++++++++++++----
 .../v2/test/FunctionPullingContainerProxyTests.scala |  2 +-
 .../scheduler/queue/test/QueueManagerTests.scala     |  4 ++--
 7 files changed, 34 insertions(+), 23 deletions(-)

diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/DocumentFactory.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/DocumentFactory.scala
index 75f2d70a2..b6bbef095 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/DocumentFactory.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/DocumentFactory.scala
@@ -164,7 +164,8 @@ trait DocumentFactory[W <: DocumentRevisionProvider] 
extends MultipleReadersSing
     db: ArtifactStore[Wsuper],
     doc: DocId,
     rev: DocRevision = DocRevision.empty,
-    fromCache: Boolean = cacheEnabled)(implicit transid: TransactionId, mw: 
Manifest[W]): Future[W] = {
+    fromCache: Boolean = cacheEnabled,
+    ignoreMissingAttachment: Boolean = false)(implicit transid: TransactionId, 
mw: Manifest[W]): Future[W] = {
     implicit val logger = db.logging
     implicit val ec = db.executionContext
     val key = doc.asDocInfo(rev)
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskAction.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskAction.scala
index a51e55278..e06057367 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskAction.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/entity/WhiskAction.scala
@@ -21,7 +21,6 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
 import java.nio.charset.StandardCharsets.UTF_8
 import java.time.Instant
 import java.util.Base64
-
 import akka.http.scaladsl.model.ContentTypes
 
 import scala.concurrent.ExecutionContext
@@ -30,9 +29,7 @@ import scala.util.{Failure, Success, Try}
 import spray.json._
 import spray.json.DefaultJsonProtocol._
 import org.apache.openwhisk.common.TransactionId
-import org.apache.openwhisk.core.database.ArtifactStore
-import org.apache.openwhisk.core.database.DocumentFactory
-import org.apache.openwhisk.core.database.CacheChangeNotification
+import org.apache.openwhisk.core.database.{ArtifactStore, 
CacheChangeNotification, DocumentFactory, NoDocumentException}
 import org.apache.openwhisk.core.entity.Attachments._
 import org.apache.openwhisk.core.entity.types.EntityStore
 
@@ -421,11 +418,13 @@ object WhiskAction extends DocumentFactory[WhiskAction] 
with WhiskEntityQueries[
   }
 
   // overridden to retrieve attached code
-  override def get[A >: WhiskAction](
-    db: ArtifactStore[A],
-    doc: DocId,
-    rev: DocRevision = DocRevision.empty,
-    fromCache: Boolean)(implicit transid: TransactionId, mw: 
Manifest[WhiskAction]): Future[WhiskAction] = {
+  override def get[A >: WhiskAction](db: ArtifactStore[A],
+                                     doc: DocId,
+                                     rev: DocRevision = DocRevision.empty,
+                                     fromCache: Boolean,
+                                     ignoreMissingAttachment: Boolean = false)(
+    implicit transid: TransactionId,
+    mw: Manifest[WhiskAction]): Future[WhiskAction] = {
 
     implicit val ec = db.executionContext
 
@@ -439,6 +438,9 @@ object WhiskAction extends DocumentFactory[WhiskAction] 
with WhiskEntityQueries[
           val newAction = a.copy(exec = exec.inline(boas.toByteArray))
           newAction.revision(a.rev)
           newAction
+        }).recover({
+          case _: NoDocumentException if ignoreMissingAttachment =>
+            action
         })
       }
 
diff --git 
a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/ApiUtils.scala
 
b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/ApiUtils.scala
index ef2dc5110..ec08f5b30 100644
--- 
a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/ApiUtils.scala
+++ 
b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/ApiUtils.scala
@@ -320,7 +320,7 @@ trait WriteOps extends Directives {
     // marker to return an existing doc with status OK rather than conflict if 
overwrite is false
     case class IdentityPut(self: A) extends Throwable
 
-    onComplete(factory.get(datastore, docid) flatMap { doc =>
+    onComplete(factory.get(datastore, docid, ignoreMissingAttachment = 
overwrite) flatMap { doc =>
       if (overwrite) {
         logging.debug(this, s"[PUT] entity exists, will try to update '$doc'")
         update(doc).map(updatedDoc => (Some(doc), updatedDoc))
@@ -392,7 +392,7 @@ trait WriteOps extends Directives {
     format: RootJsonFormat[A],
     notifier: Option[CacheChangeNotification],
     ma: Manifest[A]) = {
-    onComplete(factory.get(datastore, docid) flatMap { entity =>
+    onComplete(factory.get(datastore, docid, ignoreMissingAttachment = true) 
flatMap { entity =>
       confirm(entity) flatMap {
         case _ =>
           factory.del(datastore, entity.docinfo) map { _ =>
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
index 988416fc1..896455c04 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/v2/FunctionPullingContainerProxy.scala
@@ -188,7 +188,7 @@ class FunctionPullingContainerProxy(
             Option[ExecutableWhiskAction]) => Future[Container],
   entityStore: ArtifactStore[WhiskEntity],
   namespaceBlacklist: NamespaceBlacklist,
-  get: (ArtifactStore[WhiskEntity], DocId, DocRevision, Boolean) => 
Future[WhiskAction],
+  get: (ArtifactStore[WhiskEntity], DocId, DocRevision, Boolean, Boolean) => 
Future[WhiskAction],
   dataManagementService: ActorRef,
   clientProxyFactory: (ActorRefFactory,
                        String,
@@ -786,7 +786,7 @@ class FunctionPullingContainerProxy(
   whenUnhandled {
     case Event(PingCache, data: WarmData) =>
       val actionId = 
data.action.fullyQualifiedName(false).toDocId.asDocInfo(data.revision)
-      get(entityStore, actionId.id, actionId.rev, true).map(_ => {
+      get(entityStore, actionId.id, actionId.rev, true, false).map(_ => {
         logging.debug(
           this,
           s"Refreshed function cache for action ${data.action} from container 
${data.container.containerId}.")
@@ -913,7 +913,7 @@ class FunctionPullingContainerProxy(
       if (actionid.rev == DocRevision.empty)
         logging.warn(this, s"revision was not provided for ${actionid.id}")
 
-      get(entityStore, actionid.id, actionid.rev, actionid.rev != 
DocRevision.empty)
+      get(entityStore, actionid.id, actionid.rev, actionid.rev != 
DocRevision.empty, false)
         .flatMap { action =>
           action.toExecutableWhiskAction match {
             case Some(executable) =>
@@ -1264,7 +1264,7 @@ object FunctionPullingContainerProxy {
                       Option[ExecutableWhiskAction]) => Future[Container],
             entityStore: ArtifactStore[WhiskEntity],
             namespaceBlacklist: NamespaceBlacklist,
-            get: (ArtifactStore[WhiskEntity], DocId, DocRevision, Boolean) => 
Future[WhiskAction],
+            get: (ArtifactStore[WhiskEntity], DocId, DocRevision, Boolean, 
Boolean) => Future[WhiskAction],
             dataManagementService: ActorRef,
             clientProxyFactory: (ActorRefFactory,
                                  String,
diff --git 
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
index ad7b17103..e2857f498 100644
--- 
a/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
+++ 
b/core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala
@@ -63,7 +63,11 @@ case class QueueManagerConfig(maxRetriesToGetQueue: Int, 
maxSchedulingTime: Fini
 
 class QueueManager(
   entityStore: ArtifactStore[WhiskEntity],
-  getWhiskActionMetaData: (ArtifactStore[WhiskEntity], DocId, DocRevision, 
Boolean) => Future[WhiskActionMetaData],
+  getWhiskActionMetaData: (ArtifactStore[WhiskEntity],
+                           DocId,
+                           DocRevision,
+                           Boolean,
+                           Boolean) => Future[WhiskActionMetaData],
   etcdClient: EtcdClient,
   schedulerEndpoints: SchedulerEndpoints,
   schedulerId: SchedulerInstanceId,
@@ -340,7 +344,7 @@ class QueueManager(
   private def recoverQueue(msg: ActivationMessage)(implicit transid: 
TransactionId): Unit = {
     val start = transid.started(this, LoggingMarkers.SCHEDULER_QUEUE_RECOVER)
     logging.info(this, s"Recover a queue for ${msg.action},")
-    getWhiskActionMetaData(entityStore, msg.action.toDocId, msg.revision, 
false)
+    getWhiskActionMetaData(entityStore, msg.action.toDocId, msg.revision, 
false, false)
       .map { actionMetaData: WhiskActionMetaData =>
         actionMetaData.toExecutableWhiskAction match {
           case Some(_) =>
@@ -370,7 +374,7 @@ class QueueManager(
 
     logging.info(this, s"Create a new queue for ${newAction}, rev: 
${msg.revision}")
 
-    getWhiskActionMetaData(entityStore, newAction.toDocId, msg.revision, 
msg.revision != DocRevision.empty)
+    getWhiskActionMetaData(entityStore, newAction.toDocId, msg.revision, 
msg.revision != DocRevision.empty, false)
       .map { actionMetaData: WhiskActionMetaData =>
         actionMetaData.toExecutableWhiskAction match {
           // Always use revision got from Database, there can be 2 cases for 
the actionMetaData.rev
@@ -668,7 +672,11 @@ object QueueManager {
 
   def props(
     entityStore: ArtifactStore[WhiskEntity],
-    getWhiskActionMetaData: (ArtifactStore[WhiskEntity], DocId, DocRevision, 
Boolean) => Future[WhiskActionMetaData],
+    getWhiskActionMetaData: (ArtifactStore[WhiskEntity],
+                             DocId,
+                             DocRevision,
+                             Boolean,
+                             Boolean) => Future[WhiskActionMetaData],
     etcdClient: EtcdClient,
     schedulerEndpoints: SchedulerEndpoints,
     schedulerId: SchedulerInstanceId,
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
index a3dbae818..2a51d3cad 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/v2/test/FunctionPullingContainerProxyTests.scala
@@ -209,7 +209,7 @@ class FunctionPullingContainerProxyTests
 
   /** get WhiskAction*/
   def getWhiskAction(response: Future[WhiskAction]) = LoggedFunction {
-    (_: ArtifactStore[WhiskEntity], _: DocId, _: DocRevision, _: Boolean) =>
+    (_: ArtifactStore[WhiskEntity], _: DocId, _: DocRevision, _: Boolean, _: 
Boolean) =>
       response
   }
 
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
index 6d8d80f72..ebf5a5cd2 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/scheduler/queue/test/QueueManagerTests.scala
@@ -154,7 +154,7 @@ class QueueManagerTests
 
   /**get WhiskActionMetaData*/
   def getWhiskActionMetaData(meta: Future[WhiskActionMetaData]) = 
LoggedFunction {
-    (_: ArtifactStore[WhiskEntity], _: DocId, _: DocRevision, _: Boolean) =>
+    (_: ArtifactStore[WhiskEntity], _: DocId, _: DocRevision, _: Boolean, _: 
Boolean) =>
       meta
   }
 
@@ -496,7 +496,7 @@ class QueueManagerTests
     val finalFqn = newFqn.copy(version = Some(SemVer(0, 0, 3)))
     val finalRevision = DocRevision("3-test-revision")
     // simulate the case that action is updated again while fetch it from 
database
-    def newGet(store: ArtifactStore[WhiskEntity], docId: DocId, docRevision: 
DocRevision, fromCache: Boolean) = {
+    def newGet(store: ArtifactStore[WhiskEntity], docId: DocId, docRevision: 
DocRevision, fromCache: Boolean, ignoreMissingAttachment: Boolean) = {
       if (docRevision == DocRevision.empty) {
         Future(convertToMetaData(action.copy(version = SemVer(0, 0, 
3)).toWhiskAction.revision(finalRevision)))
       } else

Reply via email to