Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 25d1fce6b -> c55604f02


fix GEARPUMP-124 URI too long when changing SourceTask's parallelism

Author: huafengw <[email protected]>

Closes #18 from huafengw/GEARPUMP-124.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/c55604f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/c55604f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/c55604f0

Branch: refs/heads/master
Commit: c55604f02a68f1e5eba2440ef28823fb16fdd995
Parents: 25d1fce
Author: huafengw <[email protected]>
Authored: Tue May 17 23:39:51 2016 +0800
Committer: manuzhang <[email protected]>
Committed: Tue May 17 23:39:51 2016 +0800

----------------------------------------------------------------------
 .../scala/org/apache/gearpump/cluster/UserConfig.scala   |  4 ++--
 .../integrationtest/minicluster/RestClient.scala         |  9 +++++++--
 services/dashboard/services/restapi.js                   |  5 +++--
 .../dashboard/views/apps/streamingapp/popups/dag_edit.js | 11 +++--------
 .../apache/gearpump/streaming/appmaster/DagManager.scala | 10 ++++++++--
 .../gearpump/streaming/appmaster/DagManagerSpec.scala    |  6 +++---
 6 files changed, 26 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c55604f0/core/src/main/scala/org/apache/gearpump/cluster/UserConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/UserConfig.scala 
b/core/src/main/scala/org/apache/gearpump/cluster/UserConfig.scala
index 0570f03..28a4907 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/UserConfig.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/UserConfig.scala
@@ -116,7 +116,7 @@ final class UserConfig(private val _config: Map[String, 
String]) extends Seriali
 
   def getValue[T](key: String)(implicit system: ActorSystem): Option[T] = {
     val serializer = new 
JavaSerializer(system.asInstanceOf[ExtendedActorSystem])
-    _config.get(key).map(BaseEncoding.base64Url().decode(_))
+    _config.get(key).map(BaseEncoding.base64().decode(_))
       .map(serializer.fromBinary(_).asInstanceOf[T])
   }
 
@@ -137,7 +137,7 @@ final class UserConfig(private val _config: Map[String, 
String]) extends Seriali
     } else {
       val serializer = new 
JavaSerializer(system.asInstanceOf[ExtendedActorSystem])
       val bytes = serializer.toBinary(value)
-      val encoded = BaseEncoding.base64Url().encode(bytes)
+      val encoded = BaseEncoding.base64().encode(bytes)
       this.withString(key, encoded)
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c55604f0/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
index a2d8f99..1b143af 100644
--- 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
+++ 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
@@ -199,8 +199,13 @@ class RestClient(host: String, port: Int) {
     decodeAs[AppJar](resp)
   }
 
-  def replaceStreamingAppProcessor(appId: Int, replaceMe: 
ProcessorDescription): Boolean = try {
-    val replaceOperation = new ReplaceProcessor(replaceMe.id, replaceMe)
+  def replaceStreamingAppProcessor(appId: Int, replaceMe: 
ProcessorDescription): Boolean = {
+    replaceStreamingAppProcessor(appId, replaceMe, false)
+  }
+
+  def replaceStreamingAppProcessor(
+      appId: Int, replaceMe: ProcessorDescription, inheritConf: Boolean): 
Boolean = try {
+    val replaceOperation = new ReplaceProcessor(replaceMe.id, replaceMe, 
inheritConf)
     val args = upickle.default.write(replaceOperation)
     val resp = callApi(s"appmaster/$appId/dynamicdag?args=" + 
Util.encodeUriComponent(args),
       CRUD_POST)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c55604f0/services/dashboard/services/restapi.js
----------------------------------------------------------------------
diff --git a/services/dashboard/services/restapi.js 
b/services/dashboard/services/restapi.js
index 3bad10c..e4dce2a 100644
--- a/services/dashboard/services/restapi.js
+++ b/services/dashboard/services/restapi.js
@@ -217,14 +217,15 @@ angular.module('dashboard')
         },
 
         /** Replace a dag processor at runtime */
-        replaceDagProcessor: function (files, formFormNames, appId, 
oldProcessorId, newProcessorDescription, onComplete) {
+        replaceDagProcessor: function (files, formFormNames, appId, 
oldProcessorId, newProcessorDescription, inheritConf, onComplete) {
           var url = restapiV1Root + 'appmaster/' + appId + '/dynamicdag';
           var args = {
             "$type": 
'org.apache.gearpump.streaming.appmaster.DagManager.ReplaceProcessor',
             oldProcessorId: oldProcessorId,
             newProcessorDescription: angular.merge({
               id: oldProcessorId
-            }, newProcessorDescription)
+            }, newProcessorDescription),
+            inheritConf: inheritConf
           };
           url += '?args=' + encodeURIComponent(angular.toJson(args));
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c55604f0/services/dashboard/views/apps/streamingapp/popups/dag_edit.js
----------------------------------------------------------------------
diff --git a/services/dashboard/views/apps/streamingapp/popups/dag_edit.js 
b/services/dashboard/views/apps/streamingapp/popups/dag_edit.js
index 539a981..e2ad5bf 100644
--- a/services/dashboard/views/apps/streamingapp/popups/dag_edit.js
+++ b/services/dashboard/views/apps/streamingapp/popups/dag_edit.js
@@ -39,13 +39,6 @@ angular.module('dashboard')
           parallelism: $scope.parallelism
         };
 
-        //If only change processor's parallelism, inherit old processor's 
configuration
-        if ($scope.changeParallelismOnly) {
-          newProcessor = angular.merge(newProcessor, {
-            taskConf: $scope.taskConf
-          })
-        }
-
         if (Array.isArray($scope.transitTime) && $scope.transitTime.length === 
2) {
           var tuple = [$scope.transitTime[0] || '', $scope.transitTime[1] || 
''];
           var format = 'YYYY-MM-DD';
@@ -60,7 +53,9 @@ angular.module('dashboard')
           };
         }
 
-        $scope.dag.replaceProcessor(files, fileFormNames, $scope.app.appId, 
$scope.processorId, newProcessor, function (response) {
+        //If only change processor's parallelism, inherit old processor's 
configuration
+        var inheritConf = $scope.changeParallelismOnly || false;
+        $scope.dag.replaceProcessor(files, fileFormNames, $scope.app.appId, 
$scope.processorId, newProcessor, inheritConf, function (response) {
           $scope.shouldNoticeSubmitFailed = !response.success;
           if (response.success) {
             $scope.$hide();

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c55604f0/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala
index 3341d4f..24f16ad 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/DagManager.scala
@@ -102,7 +102,7 @@ class DagManager(appId: Int, userConfig: UserConfig, store: 
AppDataStore, dag: O
         LOG.info(s"Get task launcher data for processor: $processorId, 
dagVersion: $version")
         sender ! taskLaunchData(dag, processorId, context)
       }
-    case ReplaceProcessor(oldProcessorId, inputNewProcessor) =>
+    case ReplaceProcessor(oldProcessorId, inputNewProcessor, inheritConfig) =>
       // Replace a processor with new implementation. The upstream processors 
and downstream
       // processors are NOT changed.
       var newProcessor = inputNewProcessor.copy(id = nextProcessorId)
@@ -110,6 +110,12 @@ class DagManager(appId: Int, userConfig: UserConfig, 
store: AppDataStore, dag: O
         val oldJar = dags.last.processors.get(oldProcessorId).get
         newProcessor = newProcessor.copy(jar = oldJar.jar)
       }
+
+      if (inheritConfig) {
+        val oldConf = dags.last.processors.get(oldProcessorId).get.taskConf
+        newProcessor = newProcessor.copy(taskConf = oldConf)
+      }
+
       if (dags.length > 1) {
         sender ! DAGOperationFailed(
           "We are in the process of handling previous dynamic dag change")
@@ -171,7 +177,7 @@ object DagManager {
   sealed trait DAGOperation
 
   case class ReplaceProcessor(oldProcessorId: ProcessorId,
-      newProcessorDescription: ProcessorDescription) extends DAGOperation
+      newProcessorDescription: ProcessorDescription, inheritConf: Boolean) 
extends DAGOperation
 
   sealed trait DAGOperationResult
   case object DAGOperationSuccess extends DAGOperationResult

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c55604f0/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
index 258c7ff..2caab4f 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
@@ -65,7 +65,7 @@ class DagManagerSpec extends WordSpecLike with Matchers with 
BeforeAndAfterAll {
       client.send(dagManager, WatchChange(watcher.ref))
       val task3 = task2.copy(id = 3, life = LifeTime(100, Long.MaxValue))
 
-      client.send(dagManager, ReplaceProcessor(task2.id, task3))
+      client.send(dagManager, ReplaceProcessor(task2.id, task3, false))
       client.expectMsg(DAGOperationSuccess)
 
       client.send(dagManager, GetLatestDAG)
@@ -76,11 +76,11 @@ class DagManagerSpec extends WordSpecLike with Matchers 
with BeforeAndAfterAll {
       watcher.expectMsgType[LatestDAG]
 
       val task4 = task3.copy(id = 4)
-      client.send(dagManager, ReplaceProcessor(task3.id, task4))
+      client.send(dagManager, ReplaceProcessor(task3.id, task4, false))
       client.expectMsgType[DAGOperationFailed]
 
       client.send(dagManager, NewDAGDeployed(newDag.version))
-      client.send(dagManager, ReplaceProcessor(task3.id, task4))
+      client.send(dagManager, ReplaceProcessor(task3.id, task4, false))
       client.expectMsg(DAGOperationSuccess)
     }
 

Reply via email to