Repository: incubator-gearpump
Updated Branches:
  refs/heads/master dac6953a2 -> d82717512


fix GEARPUMP-124 SinkTask fail to start after changing parallelism

Author: huafengw <[email protected]>

Closes #17 from huafengw/fix_124.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/d8271751
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/d8271751
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/d8271751

Branch: refs/heads/master
Commit: d82717512e09cedb99e203969aa49d8431f65f22
Parents: dac6953
Author: huafengw <[email protected]>
Authored: Sat May 14 16:53:56 2016 +0800
Committer: manuzhang <[email protected]>
Committed: Sat May 14 16:53:56 2016 +0800

----------------------------------------------------------------------
 .../main/scala/org/apache/gearpump/cluster/UserConfig.scala  | 4 ++--
 .../dashboard/views/apps/streamingapp/popups/dag_edit.js     | 8 ++++++++
 2 files changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d8271751/core/src/main/scala/org/apache/gearpump/cluster/UserConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/UserConfig.scala 
b/core/src/main/scala/org/apache/gearpump/cluster/UserConfig.scala
index 28a4907..0570f03 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/UserConfig.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/UserConfig.scala
@@ -116,7 +116,7 @@ final class UserConfig(private val _config: Map[String, 
String]) extends Seriali
 
   def getValue[T](key: String)(implicit system: ActorSystem): Option[T] = {
     val serializer = new 
JavaSerializer(system.asInstanceOf[ExtendedActorSystem])
-    _config.get(key).map(BaseEncoding.base64().decode(_))
+    _config.get(key).map(BaseEncoding.base64Url().decode(_))
       .map(serializer.fromBinary(_).asInstanceOf[T])
   }
 
@@ -137,7 +137,7 @@ final class UserConfig(private val _config: Map[String, 
String]) extends Seriali
     } else {
       val serializer = new 
JavaSerializer(system.asInstanceOf[ExtendedActorSystem])
       val bytes = serializer.toBinary(value)
-      val encoded = BaseEncoding.base64().encode(bytes)
+      val encoded = BaseEncoding.base64Url().encode(bytes)
       this.withString(key, encoded)
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d8271751/services/dashboard/views/apps/streamingapp/popups/dag_edit.js
----------------------------------------------------------------------
diff --git a/services/dashboard/views/apps/streamingapp/popups/dag_edit.js 
b/services/dashboard/views/apps/streamingapp/popups/dag_edit.js
index 205328a..539a981 100644
--- a/services/dashboard/views/apps/streamingapp/popups/dag_edit.js
+++ b/services/dashboard/views/apps/streamingapp/popups/dag_edit.js
@@ -16,6 +16,7 @@ angular.module('dashboard')
       $scope.taskClass = processor.taskClass;
       $scope.description = processor.description;
       $scope.parallelism = processor.parallelism;
+      $scope.taskConf = processor.taskConf;
 
       $scope.invalid = {};
       $scope.canReplace = function () {
@@ -38,6 +39,13 @@ angular.module('dashboard')
           parallelism: $scope.parallelism
         };
 
+        //If only change processor's parallelism, inherit old processor's 
configuration
+        if ($scope.changeParallelismOnly) {
+          newProcessor = angular.merge(newProcessor, {
+            taskConf: $scope.taskConf
+          })
+        }
+
         if (Array.isArray($scope.transitTime) && $scope.transitTime.length === 
2) {
           var tuple = [$scope.transitTime[0] || '', $scope.transitTime[1] || 
''];
           var format = 'YYYY-MM-DD';

Reply via email to