Repository: incubator-gearpump Updated Branches: refs/heads/master aebc09a8d -> ae55efbdd
[GEARPUMP-285] Fix false alarm of shutting down executor time out Author: huafengw <[email protected]> Closes #169 from huafengw/timeout. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/ae55efbd Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/ae55efbd Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/ae55efbd Branch: refs/heads/master Commit: ae55efbddaf176b87d20f26bc57fa17ff1cbe2a5 Parents: aebc09a Author: huafengw <[email protected]> Authored: Fri Mar 10 15:58:28 2017 +0800 Committer: manuzhang <[email protected]> Committed: Fri Mar 10 15:58:36 2017 +0800 ---------------------------------------------------------------------- .../gearpump/cluster/AppDescription.scala | 24 ++++++-- .../gearpump/cluster/master/AppManager.scala | 59 +++++++++++--------- .../cluster/ApplicationStatusSpec.scala | 42 ++++++++++++++ .../master/ApplicationMetaDataSpec.scala | 37 ++++++++++++ .../cluster/master/ApplicationStateSpec.scala | 37 ------------ 5 files changed, 131 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae55efbd/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala b/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala index c31f01f..0c46aca 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala @@ -144,15 +144,29 @@ case class ExecutorJVMConfig( sealed abstract class ApplicationStatus(val status: String) extends Serializable{ override def toString: String = status + + def canTransitTo(newStatus: ApplicationStatus): Boolean + } sealed abstract class ApplicationTerminalStatus(override val status: String) - extends ApplicationStatus(status) + extends ApplicationStatus(status) { + + override def canTransitTo(newStatus: ApplicationStatus): Boolean = false +} object ApplicationStatus { - case object PENDING extends ApplicationStatus("pending") + case object PENDING extends ApplicationStatus("pending") { + override def canTransitTo(newStatus: ApplicationStatus): Boolean = { + !newStatus.equals(NONEXIST) + } + } - case object ACTIVE extends ApplicationStatus("active") + case object ACTIVE extends ApplicationStatus("active") { + override def canTransitTo(newStatus: ApplicationStatus): Boolean = { + !newStatus.equals(NONEXIST) && !newStatus.equals(ACTIVE) + } + } case object SUCCEEDED extends ApplicationTerminalStatus("succeeded") @@ -160,5 +174,7 @@ object ApplicationStatus { case object TERMINATED extends ApplicationTerminalStatus("terminated") - case object NONEXIST extends ApplicationStatus("nonexist") + case object NONEXIST extends ApplicationStatus("nonexist") { + override def canTransitTo(newStatus: ApplicationStatus): Boolean = false + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae55efbd/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala index 049d11d..e41a2c5 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala @@ -231,35 +231,40 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch timeStamp: TimeStamp, error: Throwable): Unit = { applicationRegistry.get(appId) match { case Some(appRuntimeInfo) => - var updatedStatus: ApplicationRuntimeInfo = null - LOG.info(s"Application $appId change to ${newStatus.toString} at $timeStamp") - newStatus match { - case ApplicationStatus.ACTIVE => - updatedStatus = appRuntimeInfo.onAppMasterActivated(timeStamp) - sender ! AppMasterActivated(appId) - case [email protected] => - killAppMaster(appId, appRuntimeInfo.worker) - updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, succeeded) - appResultListeners.getOrElse(appId, List.empty).foreach{ client => - client ! ApplicationSucceeded(appId) - } - case [email protected] => - killAppMaster(appId, appRuntimeInfo.worker) - updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, failed) - appResultListeners.getOrElse(appId, List.empty).foreach{ client => - client ! ApplicationFailed(appId, error) - } - case [email protected] => - updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, terminated) - case status => - LOG.error(s"App $appId should not change it's status to $status") - } + if (appRuntimeInfo.status.canTransitTo(newStatus)) { + var updatedStatus: ApplicationRuntimeInfo = null + LOG.info(s"Application $appId change to ${newStatus.toString} at $timeStamp") + newStatus match { + case ApplicationStatus.ACTIVE => + updatedStatus = appRuntimeInfo.onAppMasterActivated(timeStamp) + sender ! AppMasterActivated(appId) + case [email protected] => + killAppMaster(appId, appRuntimeInfo.worker) + updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, succeeded) + appResultListeners.getOrElse(appId, List.empty).foreach { client => + client ! ApplicationSucceeded(appId) + } + case [email protected] => + killAppMaster(appId, appRuntimeInfo.worker) + updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, failed) + appResultListeners.getOrElse(appId, List.empty).foreach { client => + client ! ApplicationFailed(appId, error) + } + case [email protected] => + updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, terminated) + case status => + LOG.error(s"App $appId should not change it's status to $status") + } - if (newStatus.isInstanceOf[ApplicationTerminalStatus]) { - kvService ! DeleteKVGroup(appId.toString) + if (newStatus.isInstanceOf[ApplicationTerminalStatus]) { + kvService ! DeleteKVGroup(appId.toString) + } + applicationRegistry += appId -> updatedStatus + kvService ! PutKV(MASTER_GROUP, MASTER_STATE, MasterState(nextAppId, applicationRegistry)) + } else { + LOG.error(s"Application $appId tries to switch status ${appRuntimeInfo.status} " + + s"to $newStatus") } - applicationRegistry += appId -> updatedStatus - kvService ! PutKV(MASTER_GROUP, MASTER_STATE, MasterState(nextAppId, applicationRegistry)) case None => LOG.error(s"Can not find application runtime info for appId $appId when it's " + s"status changed to ${newStatus.toString}") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae55efbd/core/src/test/scala/org/apache/gearpump/cluster/ApplicationStatusSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/ApplicationStatusSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/ApplicationStatusSpec.scala new file mode 100644 index 0000000..743fe34 --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/cluster/ApplicationStatusSpec.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster + +import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} + +class ApplicationStatusSpec extends FlatSpec with Matchers with BeforeAndAfterEach { + + "ApplicationStatus" should "check status transition properly" in { + val pending = ApplicationStatus.PENDING + assert(!pending.canTransitTo(ApplicationStatus.NONEXIST)) + assert(pending.canTransitTo(ApplicationStatus.PENDING)) + assert(pending.canTransitTo(ApplicationStatus.ACTIVE)) + assert(pending.canTransitTo(ApplicationStatus.SUCCEEDED)) + + val active = ApplicationStatus.ACTIVE + assert(active.canTransitTo(ApplicationStatus.SUCCEEDED)) + assert(active.canTransitTo(ApplicationStatus.PENDING)) + assert(!active.canTransitTo(ApplicationStatus.ACTIVE)) + assert(!active.canTransitTo(ApplicationStatus.NONEXIST)) + + val succeed = ApplicationStatus.SUCCEEDED + assert(!succeed.canTransitTo(ApplicationStatus.NONEXIST)) + assert(!succeed.canTransitTo(ApplicationStatus.SUCCEEDED)) + assert(!succeed.canTransitTo(ApplicationStatus.FAILED)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae55efbd/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationMetaDataSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationMetaDataSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationMetaDataSpec.scala new file mode 100644 index 0000000..664fc9c --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationMetaDataSpec.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.master + +import org.apache.gearpump.cluster.AppDescription +import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} +import org.apache.gearpump.cluster.appmaster.ApplicationMetaData + +class ApplicationMetaDataSpec extends FlatSpec with Matchers with BeforeAndAfterEach { + + "ApplicationMetaData" should "check equal with respect to only appId and attemptId" in { + val appDescription = AppDescription("app", "AppMaster", null) + val metaDataA = ApplicationMetaData(0, 0, appDescription, null, null) + val metaDataB = ApplicationMetaData(0, 0, appDescription, null, null) + val metaDataC = ApplicationMetaData(0, 1, appDescription, null, null) + + assert(metaDataA == metaDataB) + assert(metaDataA.hashCode == metaDataB.hashCode) + assert(metaDataA != metaDataC) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ae55efbd/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationStateSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationStateSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationStateSpec.scala deleted file mode 100644 index 6593836..0000000 --- a/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationStateSpec.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.cluster.master - -import org.apache.gearpump.cluster.AppDescription -import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers} -import org.apache.gearpump.cluster.appmaster.ApplicationMetaData - -class ApplicationStateSpec extends FlatSpec with Matchers with BeforeAndAfterEach { - - "ApplicationState" should "check equal with respect to only appId and attemptId" in { - val appDescription = AppDescription("app", "AppMaster", null) - val stateA = ApplicationMetaData(0, 0, appDescription, null, null) - val stateB = ApplicationMetaData(0, 0, appDescription, null, null) - val stateC = ApplicationMetaData(0, 1, appDescription, null, null) - - assert(stateA == stateB) - assert(stateA.hashCode == stateB.hashCode) - assert(stateA != stateC) - } -}
