Repository: incubator-gearpump Updated Branches: refs/heads/master 25d1fce6b -> c55604f02
fix GEARPUMP-124 URI too long when changing SourceTask's parallelism Author: huafengw <[email protected]> Closes #18 from huafengw/GEARPUMP-124. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/c55604f0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/c55604f0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/c55604f0 Branch: refs/heads/master Commit: c55604f02a68f1e5eba2440ef28823fb16fdd995 Parents: 25d1fce Author: huafengw <[email protected]> Authored: Tue May 17 23:39:51 2016 +0800 Committer: manuzhang <[email protected]> Committed: Tue May 17 23:39:51 2016 +0800 ---------------------------------------------------------------------- .../scala/org/apache/gearpump/cluster/UserConfig.scala | 4 ++-- .../integrationtest/minicluster/RestClient.scala | 9 +++++++-- services/dashboard/services/restapi.js | 5 +++-- .../dashboard/views/apps/streamingapp/popups/dag_edit.js | 11 +++-------- .../apache/gearpump/streaming/appmaster/DagManager.scala | 10 ++++++++-- .../gearpump/streaming/appmaster/DagManagerSpec.scala | 6 +++--- 6 files changed, 26 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c55604f0/core/src/main/scala/org/apache/gearpump/cluster/UserConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/UserConfig.scala b/core/src/main/scala/org/apache/gearpump/cluster/UserConfig.scala index 0570f03..28a4907 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/UserConfig.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/UserConfig.scala @@ -116,7 +116,7 @@ final class UserConfig(private val _config: Map[String, String]) extends Seriali def getValue[T](key: String)(implicit system: ActorSystem): Option[T] = { val serializer = new JavaSerializer(system.asInstanceOf[ExtendedActorSystem]) - _config.get(key).map(BaseEncoding.base64Url().decode(_)) + _config.get(key).map(BaseEncoding.base64().decode(_)) .map(serializer.fromBinary(_).asInstanceOf[T]) } @@ -137,7 +137,7 @@ final class UserConfig(private val _config: Map[String, String]) extends Seriali } else { val serializer = new JavaSerializer(system.asInstanceOf[ExtendedActorSystem]) val bytes = serializer.toBinary(value) - val encoded = BaseEncoding.base64Url().encode(bytes) + val encoded = BaseEncoding.base64().encode(bytes) this.withString(key, encoded) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c55604f0/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala index a2d8f99..1b143af 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala @@ -199,8 +199,13 @@ class RestClient(host: String, port: Int) { decodeAs[AppJar](resp) } - def replaceStreamingAppProcessor(appId: Int, replaceMe: ProcessorDescription): Boolean = try { - val replaceOperation = new ReplaceProcessor(replaceMe.id, replaceMe) + def replaceStreamingAppProcessor(appId: Int, replaceMe: ProcessorDescription): Boolean = { + replaceStreamingAppProcessor(appId, replaceMe, false) + } + + def replaceStreamingAppProcessor( + appId: Int, replaceMe: ProcessorDescription, inheritConf: Boolean): Boolean = try { + val replaceOperation = new ReplaceProcessor(replaceMe.id, replaceMe, inheritConf) val args = upickle.default.write(replaceOperation) val resp = callApi(s"appmaster/$appId/dynamicdag?args=" + Util.encodeUriComponent(args), CRUD_POST) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c55604f0/services/dashboard/services/restapi.js ---------------------------------------------------------------------- diff --git a/services/dashboard/services/restapi.js b/services/dashboard/services/restapi.js index 3bad10c..e4dce2a 100644 --- a/services/dashboard/services/restapi.js +++ b/services/dashboard/services/restapi.js @@ -217,14 +217,15 @@ angular.module('dashboard') }, /** Replace a dag processor at runtime */ - replaceDagProcessor: function (files, formFormNames, appId, oldProcessorId, newProcessorDescription, onComplete) { + replaceDagProcessor: function (files, formFormNames, appId, oldProcessorId, newProcessorDescription, inheritConf, onComplete) { var url = restapiV1Root + 'appmaster/' + appId + '/dynamicdag'; var args = { "$type": 'org.apache.gearpump.streaming.appmaster.DagManager.ReplaceProcessor', oldProcessorId: oldProcessorId, newProcessorDescription: angular.merge({ id: oldProcessorId - }, newProcessorDescription) + }, newProcessorDescription), + inheritConf: inheritConf }; url += '?args=' + encodeURIComponent(angular.toJson(args)); http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c55604f0/services/dashboard/views/apps/streamingapp/popups/dag_edit.js ---------------------------------------------------------------------- diff --git a/services/dashboard/views/apps/streamingapp/popups/dag_edit.js b/services/dashboard/views/apps/streamingapp/popups/dag_edit.js index 539a981..e2ad5bf 100644 --- a/services/dashboard/views/apps/streamingapp/popups/dag_edit.js +++ b/services/dashboard/views/apps/streamingapp/popups/dag_edit.js @@ -39,13 +39,6 @@ angular.module('dashboard') parallelism: $scope.parallelism }; - //If only change processor's parallelism, inherit old processor's configuration - if ($scope.changeParallelismOnly) { - newProcessor = angular.merge(newProcessor, { - taskConf: $scope.taskConf - }) - } - if (Array.isArray($scope.transitTime) && $scope.transitTime.length === 2) { var tuple = [$scope.transitTime[0] || '', $scope.transitTime[1] || '']; var format = 'YYYY-MM-DD'; @@ -60,7 +53,9 @@ angular.module('dashboard') }; } - $scope.dag.replaceProcessor(files, fileFormNames, $scope.app.appId, $scope.processorId, newProcessor, function (response) { + //If only change processor's parallelism, inherit old processor's configuration + var inheritConf = $scope.changeParallelismOnly || false; + $scope.dag.replaceProcessor(files, fileFormNames, $scope.app.appId, $scope.processorId, newProcessor, inheritConf, function (response) { $scope.shouldNoticeSubmitFailed = !response.success; if (response.success) { $scope.$hide(); http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c55604f0/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala index 3341d4f..24f16ad 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala @@ -102,7 +102,7 @@ class DagManager(appId: Int, userConfig: UserConfig, store: AppDataStore, dag: O LOG.info(s"Get task launcher data for processor: $processorId, dagVersion: $version") sender ! taskLaunchData(dag, processorId, context) } - case ReplaceProcessor(oldProcessorId, inputNewProcessor) => + case ReplaceProcessor(oldProcessorId, inputNewProcessor, inheritConfig) => // Replace a processor with new implementation. The upstream processors and downstream // processors are NOT changed. var newProcessor = inputNewProcessor.copy(id = nextProcessorId) @@ -110,6 +110,12 @@ class DagManager(appId: Int, userConfig: UserConfig, store: AppDataStore, dag: O val oldJar = dags.last.processors.get(oldProcessorId).get newProcessor = newProcessor.copy(jar = oldJar.jar) } + + if (inheritConfig) { + val oldConf = dags.last.processors.get(oldProcessorId).get.taskConf + newProcessor = newProcessor.copy(taskConf = oldConf) + } + if (dags.length > 1) { sender ! DAGOperationFailed( "We are in the process of handling previous dynamic dag change") @@ -171,7 +177,7 @@ object DagManager { sealed trait DAGOperation case class ReplaceProcessor(oldProcessorId: ProcessorId, - newProcessorDescription: ProcessorDescription) extends DAGOperation + newProcessorDescription: ProcessorDescription, inheritConf: Boolean) extends DAGOperation sealed trait DAGOperationResult case object DAGOperationSuccess extends DAGOperationResult http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c55604f0/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala index 258c7ff..2caab4f 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala @@ -65,7 +65,7 @@ class DagManagerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll { client.send(dagManager, WatchChange(watcher.ref)) val task3 = task2.copy(id = 3, life = LifeTime(100, Long.MaxValue)) - client.send(dagManager, ReplaceProcessor(task2.id, task3)) + client.send(dagManager, ReplaceProcessor(task2.id, task3, false)) client.expectMsg(DAGOperationSuccess) client.send(dagManager, GetLatestDAG) @@ -76,11 +76,11 @@ class DagManagerSpec extends WordSpecLike with Matchers with BeforeAndAfterAll { watcher.expectMsgType[LatestDAG] val task4 = task3.copy(id = 4) - client.send(dagManager, ReplaceProcessor(task3.id, task4)) + client.send(dagManager, ReplaceProcessor(task3.id, task4, false)) client.expectMsgType[DAGOperationFailed] client.send(dagManager, NewDAGDeployed(newDag.version)) - client.send(dagManager, ReplaceProcessor(task3.id, task4)) + client.send(dagManager, ReplaceProcessor(task3.id, task4, false)) client.expectMsg(DAGOperationSuccess) }
