Repository: ranger
Updated Branches:
  refs/heads/ranger-1.0 b64b66e37 -> 50957cdf3


RANGER-2117:RangerKafkaAuthorizer to support new resources and operations which 
are in Apache Kafka 1.0.0

Signed-off-by: rmani <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/ranger/commit/50957cdf
Tree: http://git-wip-us.apache.org/repos/asf/ranger/tree/50957cdf
Diff: http://git-wip-us.apache.org/repos/asf/ranger/diff/50957cdf

Branch: refs/heads/ranger-1.0
Commit: 50957cdf33b313ac62fd1bdd6e9058ebf1895bfa
Parents: b64b66e
Author: rmani <[email protected]>
Authored: Mon Jun 4 14:46:40 2018 -0700
Committer: rmani <[email protected]>
Committed: Tue Jun 5 11:21:44 2018 -0700

----------------------------------------------------------------------
 .../service-defs/ranger-servicedef-kafka.json   |  37 ++-
 .../kafka/authorizer/RangerKafkaAuthorizer.java |  12 +
 .../KafkaRangerAuthorizerGSSTest.java           |  22 ++
 .../src/test/resources/kafka-policies.json      | 146 +++++++++++-
 .../PatchForKafkaServiceDefUpdate_J10015.java   | 236 +++++++++++++++++++
 5 files changed, 449 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ranger/blob/50957cdf/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
----------------------------------------------------------------------
diff --git 
a/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json 
b/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
index 839d780..ca3e0fe 100644
--- a/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
+++ b/agents-common/src/main/resources/service-defs/ranger-servicedef-kafka.json
@@ -24,6 +24,20 @@
                        "uiHint":"",
                        "label":"Topic",
                        "description":"Topic"
+               },
+               {
+                       "itemId":2,
+                       "name":"transactionalid",
+                       "type":"string",
+                       "level":1,
+                       "excludesSupported":true,
+                       
"matcher":"org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher",
+                       "matcherOptions":{
+                               "wildCard":true,
+                               "ignoreCase":true
+                       },
+                       "label":"Transactional Id",
+                       "description":"Transactional Id"
                }
                
        ],
@@ -68,7 +82,10 @@
                {
                        "itemId":9,
                        "name":"delete",
-                       "label":"Delete"
+                       "label":"Delete",
+                       "impliedGrants":[
+                               "describe"
+                       ]
                },
                {
                        "itemId":7,
@@ -83,6 +100,24 @@
                                "delete"
                        ]
                        
+               },
+               {
+                       "itemId":10,
+                       "name":"idempotent_write",
+                       "label":"Idempotent Write"
+               },
+               {
+                       "itemId":11,
+                       "name":"describe_configs",
+                       "label":"Describe Configs"
+               },
+               {
+                       "itemId":12,
+                       "name":"alter_configs",
+                       "label":"Alter Configs",
+                       "impliedGrants":[
+                               "describe_configs"
+                       ]
                }
        ],
        "configs":[

http://git-wip-us.apache.org/repos/asf/ranger/blob/50957cdf/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
----------------------------------------------------------------------
diff --git 
a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
 
b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
index 630d1af..b5d151e 100644
--- 
a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
+++ 
b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java
@@ -57,6 +57,7 @@ public class RangerKafkaAuthorizer implements Authorizer {
        public static final String KEY_TOPIC = "topic";
        public static final String KEY_CLUSTER = "cluster";
        public static final String KEY_CONSUMER_GROUP = "consumer_group";
+       public static final String KEY_TRANSACTIONALID = "transactionalid";
 
        public static final String ACCESS_TYPE_READ = "consume";
        public static final String ACCESS_TYPE_WRITE = "publish";
@@ -65,6 +66,9 @@ public class RangerKafkaAuthorizer implements Authorizer {
        public static final String ACCESS_TYPE_CONFIGURE = "configure";
        public static final String ACCESS_TYPE_DESCRIBE = "describe";
        public static final String ACCESS_TYPE_KAFKA_ADMIN = "kafka_admin";
+       public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = 
"describe_configs";
+       public static final String ACCESS_TYPE_ALTER_CONFIGS    = 
"alter_configs";
+       public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = 
"idempotent_write";
 
        private static volatile RangerBasePlugin rangerPlugin = null;
 
@@ -198,6 +202,8 @@ public class RangerKafkaAuthorizer implements Authorizer {
                        // rangerResource.setValue(KEY_CLUSTER, 
resource.name());
                } else if (resource.resourceType().equals(Group$.MODULE$)) {
                        rangerResource.setValue(KEY_CONSUMER_GROUP, 
resource.name());
+               } else if 
(resource.resourceType().equals(TransactionalId$.MODULE$)) {
+                       
rangerResource.setValue(KEY_TRANSACTIONALID,resource.name());
                } else {
                        logger.fatal("Unsupported resourceType=" + 
resource.resourceType());
                        validationFailed = true;
@@ -327,6 +333,12 @@ public class RangerKafkaAuthorizer implements Authorizer {
                        return ACCESS_TYPE_CREATE;
                } else if (operation.equals(Delete$.MODULE$)) {
                        return ACCESS_TYPE_DELETE;
+               } else if (operation.equals(DescribeConfigs$.MODULE$)) {
+                       return ACCESS_TYPE_DESCRIBE_CONFIGS;
+               } else if (operation.equals(AlterConfigs$.MODULE$)) {
+                       return ACCESS_TYPE_ALTER_CONFIGS;
+               } else if (operation.equals(IdempotentWrite$.MODULE$)) {
+                       return ACCESS_TYPE_IDEMPOTENT_WRITE;
                }
                return null;
        }

http://git-wip-us.apache.org/repos/asf/ranger/blob/50957cdf/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
----------------------------------------------------------------------
diff --git 
a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
 
b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
index 23b9299..2624478 100644
--- 
a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
+++ 
b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java
@@ -316,4 +316,26 @@ public class KafkaRangerAuthorizerGSSTest {
         producer.close();
     }
 
+
+    @Test
+    public void testAuthorizedIdempotentWrite() throws Exception {
+        // Create the Producer
+        Properties producerProps = new Properties();
+        producerProps.put("bootstrap.servers", "localhost:" + port);
+        producerProps.put("acks", "all");
+        producerProps.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+        producerProps.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+        producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
"SASL_PLAINTEXT");
+        producerProps.put("sasl.mechanism", "GSSAPI");
+        producerProps.put("sasl.kerberos.service.name", "kafka");
+        producerProps.put("enable.idempotence", "true");
+
+        final Producer<String, String> producer = new 
KafkaProducer<>(producerProps);
+
+        // Send a message
+        Future<RecordMetadata> record =
+                producer.send(new ProducerRecord<String, String>("test", 
"somekey", "somevalue"));
+        producer.flush();
+        producer.close();
+    }
 }

http://git-wip-us.apache.org/repos/asf/ranger/blob/50957cdf/plugin-kafka/src/test/resources/kafka-policies.json
----------------------------------------------------------------------
diff --git a/plugin-kafka/src/test/resources/kafka-policies.json 
b/plugin-kafka/src/test/resources/kafka-policies.json
index d0e469a..0c07604 100644
--- a/plugin-kafka/src/test/resources/kafka-policies.json
+++ b/plugin-kafka/src/test/resources/kafka-policies.json
@@ -49,10 +49,22 @@
             {
               "type": "kafka_admin",
               "isAllowed": true
+            },
+            {
+              "type": "idempotent_write",
+              "isAllowed": true
+            },
+            {
+              "type": "describe_configs",
+              "isAllowed": true
+            },
+            {
+              "type": "alter_configs",
+              "isAllowed": true
             }
           ],
           "users": [
-            "admin"
+            "admin","kafka"
           ],
           "groups": [
             "IT"
@@ -99,6 +111,18 @@
             {
               "type": "describe",
               "isAllowed": true
+            },
+            {
+              "type": "idempotent_write",
+              "isAllowed": true
+            },
+            {
+              "type": "describe_configs",
+              "isAllowed": true
+            },
+            {
+              "type": "alter_configs",
+              "isAllowed": true
             }
           ],
           "users": [],
@@ -120,6 +144,64 @@
     },
     {
       "service": "cl1_kafka",
+      "name": "ClusterLevelPolicy",
+      "policyType": 0,
+      "description": "",
+      "isAuditEnabled": true,
+      "resources": {
+        "topic": {
+          "values": [
+            "test"
+          ],
+          "isExcludes": false,
+          "isRecursive": false
+        }
+      },
+      "policyItems": [
+        {
+          "accesses": [
+            {
+              "type": "publish",
+              "isAllowed": true
+            },
+            {
+              "type": "consume",
+              "isAllowed": true
+            },
+            {
+              "type": "describe",
+              "isAllowed": true
+            },
+            {
+              "type": "idempotent_write",
+              "isAllowed": true
+            },
+            {
+              "type": "describe_configs",
+              "isAllowed": true
+            },
+            {
+              "type": "alter_configs",
+              "isAllowed": true
+            }
+          ],
+          "users": ["kafka"],
+          "groups": [],
+          "conditions": [],
+          "delegateAdmin": false
+        }
+      ],
+      "denyPolicyItems": [],
+      "allowExceptions": [],
+      "denyExceptions": [],
+      "dataMaskPolicyItems": [],
+      "rowFilterPolicyItems": [],
+      "id": 25,
+      "isEnabled": true,
+      "version": 1
+    },
+    {
+      "service": "cl1_kafka",
       "name": "DevPolicy",
       "policyType": 0,
       "description": "",
@@ -220,6 +302,26 @@
         "uiHint": "",
         "label": "Topic",
         "description": "Topic"
+      },
+      {
+        "itemId":2,
+        "name":"transactionalid",
+        "type":"string",
+        "level":1,
+        "mandatory":true,
+        "lookupSupported":false,
+        "recursiveSupported":false,
+        "excludesSupported":true,
+        
"matcher":"org.apache.ranger.plugin.resourcematcher.RangerDefaultResourceMatcher",
+        "matcherOptions":{
+          "wildCard":true,
+          "ignoreCase":true
+        },
+        "validationRegEx":"",
+        "validationMessage":"",
+        "uiHint":"",
+        "label":"Transactional Id",
+        "description":"Transactional Id"
       }
     ],
     "accessTypes": [
@@ -263,7 +365,7 @@
         "itemId": 9,
         "name": "delete",
         "label": "Delete",
-        "impliedGrants": []
+        "impliedGrants": ["describe"]
       },
       {
         "itemId": 7,
@@ -277,6 +379,24 @@
           "create",
           "delete"
         ]
+      },
+      {
+        "itemId":10,
+        "name":"idempotent_write",
+        "label":"Idempotent Write"
+      },
+      {
+        "itemId":11,
+        "name":"describe_configs",
+        "label":"Describe Configs"
+      },
+      {
+        "itemId":12,
+        "name":"alter_configs",
+        "label":"Alter Configs",
+        "impliedGrants":[
+          "describe_configs"
+        ]
       }
     ],
     "policyConditions": [
@@ -976,7 +1096,9 @@
           "itemId": 9018,
           "name": "kafka:delete",
           "label": "Delete",
-          "impliedGrants": []
+          "impliedGrants": [
+            "kafka:describe"
+          ]
         },
         {
           "itemId": 9016,
@@ -992,6 +1114,24 @@
           ]
         },
         {
+          "itemId":9019,
+          "name":"idempotent_write",
+          "label":"Idempotent Write"
+        },
+        {
+          "itemId":9020,
+          "name":"describe_configs",
+          "label":"Describe Configs"
+        },
+        {
+          "itemId":9021,
+          "name":"alter_configs",
+          "label":"Alter Configs",
+          "impliedGrants":[
+            "describe_configs"
+          ]
+        },
+        {
           "itemId": 11012,
           "name": "atlas:read",
           "label": "read",

http://git-wip-us.apache.org/repos/asf/ranger/blob/50957cdf/security-admin/src/main/java/org/apache/ranger/patch/PatchForKafkaServiceDefUpdate_J10015.java
----------------------------------------------------------------------
diff --git 
a/security-admin/src/main/java/org/apache/ranger/patch/PatchForKafkaServiceDefUpdate_J10015.java
 
b/security-admin/src/main/java/org/apache/ranger/patch/PatchForKafkaServiceDefUpdate_J10015.java
new file mode 100644
index 0000000..cb829a7
--- /dev/null
+++ 
b/security-admin/src/main/java/org/apache/ranger/patch/PatchForKafkaServiceDefUpdate_J10015.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ranger.patch;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.ranger.biz.RangerBizUtil;
+import org.apache.ranger.biz.ServiceDBStore;
+import org.apache.ranger.common.JSONUtil;
+import org.apache.ranger.common.RangerValidatorFactory;
+import org.apache.ranger.common.StringUtil;
+import org.apache.ranger.db.RangerDaoManager;
+import org.apache.ranger.entity.XXServiceDef;
+import org.apache.ranger.plugin.model.RangerServiceDef;
+import org.apache.ranger.plugin.model.validation.RangerServiceDefValidator;
+import org.apache.ranger.plugin.model.validation.RangerValidator.Action;
+import org.apache.ranger.plugin.store.EmbeddedServiceDefsUtil;
+import org.apache.ranger.service.RangerPolicyService;
+import org.apache.ranger.service.XPermMapService;
+import org.apache.ranger.service.XPolicyService;
+import org.apache.ranger.util.CLIUtil;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Component
+public class PatchForKafkaServiceDefUpdate_J10015 extends BaseLoader {
+       private static final Logger logger = 
Logger.getLogger(PatchForKafkaServiceDefUpdate_J10015.class);
+       public static final String SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME  
= "kafka";
+       public static final String TRANSACTIONALID_RESOURCE_NAME 
="transactionalid";
+
+       @Autowired
+       RangerDaoManager daoMgr;
+
+       @Autowired
+       ServiceDBStore svcDBStore;
+
+       @Autowired
+       JSONUtil jsonUtil;
+
+       @Autowired
+       RangerPolicyService policyService;
+
+       @Autowired
+       StringUtil stringUtil;
+
+       @Autowired
+       XPolicyService xPolService;
+
+       @Autowired
+       XPermMapService xPermMapService;
+
+       @Autowired
+       RangerBizUtil bizUtil;
+
+       @Autowired
+       RangerValidatorFactory validatorFactory;
+
+       @Autowired
+       ServiceDBStore svcStore;
+
+       public static void main(String[] args) {
+               logger.info("main()");
+               try {
+                       PatchForKafkaServiceDefUpdate_J10015 loader = 
(PatchForKafkaServiceDefUpdate_J10015) 
CLIUtil.getBean(PatchForKafkaServiceDefUpdate_J10015.class);
+                       loader.init();
+                       while (loader.isMoreToProcess()) {
+                               loader.load();
+                       }
+                       logger.info("Load complete. Exiting!!!");
+                       System.exit(0);
+               } catch (Exception e) {
+                       logger.error("Error loading", e);
+                       System.exit(1);
+               }
+       }
+
+       @Override
+       public void init() throws Exception {
+               // Do Nothing
+       }
+
+       @Override
+       public void execLoad() {
+               logger.info("==> 
PatchForKafkaServiceDefUpdate_J10015.execLoad()");
+               try {
+                       updateHiveServiceDef();
+               } catch (Exception e) {
+                       logger.error("Error while applying 
PatchForKafkaServiceDefUpdate_J10015...", e);
+               }
+               logger.info("<== 
PatchForKafkaServiceDefUpdate_J10015.execLoad()");
+       }
+
+       @Override
+       public void printStats() {
+               logger.info("PatchForKafkaServiceDefUpdate_J10015 ");
+       }
+
+       private void updateHiveServiceDef(){
+               RangerServiceDef ret                                     = null;
+               RangerServiceDef embeddedKafkaServiceDef = null;
+               RangerServiceDef dbKafkaServiceDef               = null;
+               List<RangerServiceDef.RangerResourceDef>        
embeddedKafkaResourceDefs  = null;
+               List<RangerServiceDef.RangerAccessTypeDef>      
embeddedKafkaAccessTypes   = null;
+               XXServiceDef xXServiceDefObj                    = null;
+               try{
+                       
embeddedKafkaServiceDef=EmbeddedServiceDefsUtil.instance().getEmbeddedServiceDef(SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME);
+                       if(embeddedKafkaServiceDef!=null){
+
+                               xXServiceDefObj = 
daoMgr.getXXServiceDef().findByName(SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME);
+                               Map<String, String> 
serviceDefOptionsPreUpdate=null;
+                               String jsonStrPreUpdate=null;
+                               if(xXServiceDefObj!=null) {
+                                       
jsonStrPreUpdate=xXServiceDefObj.getDefOptions();
+                                       
serviceDefOptionsPreUpdate=jsonStringToMap(jsonStrPreUpdate);
+                                       xXServiceDefObj=null;
+                               }
+                               
dbKafkaServiceDef=svcDBStore.getServiceDefByName(SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME);
+
+                               if(dbKafkaServiceDef!=null){
+                                       embeddedKafkaResourceDefs = 
embeddedKafkaServiceDef.getResources();
+                                       embeddedKafkaAccessTypes  = 
embeddedKafkaServiceDef.getAccessTypes();
+
+                                       if 
(checkNewKafkaresourcePresent(embeddedKafkaResourceDefs)) {
+                                               // This is to check if URL def 
is added to the resource definition, if so update the resource def and 
accessType def
+                                               if (embeddedKafkaResourceDefs 
!= null) {
+                                                       
dbKafkaServiceDef.setResources(embeddedKafkaResourceDefs);
+                                               }
+                                               if (embeddedKafkaAccessTypes != 
null) {
+                                                       
if(!embeddedKafkaAccessTypes.toString().equalsIgnoreCase(dbKafkaServiceDef.getAccessTypes().toString()))
 {
+                                                               
dbKafkaServiceDef.setAccessTypes(embeddedKafkaAccessTypes);
+                                                       }
+                                               }
+                                       }
+
+                                       RangerServiceDefValidator validator = 
validatorFactory.getServiceDefValidator(svcStore);
+                                       validator.validate(dbKafkaServiceDef, 
Action.UPDATE);
+
+                                       ret = 
svcStore.updateServiceDef(dbKafkaServiceDef);
+                                       if(ret==null){
+                                               logger.error("Error while 
updating "+SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME+"service-def");
+                                               throw new 
RuntimeException("Error while updating 
"+SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME+"service-def");
+                                       }
+                                       xXServiceDefObj = 
daoMgr.getXXServiceDef().findByName(SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME);
+                                       if(xXServiceDefObj!=null) {
+                                               String 
jsonStrPostUpdate=xXServiceDefObj.getDefOptions();
+                                               Map<String, String> 
serviceDefOptionsPostUpdate=jsonStringToMap(jsonStrPostUpdate);
+                                               if (serviceDefOptionsPostUpdate 
!= null && 
serviceDefOptionsPostUpdate.containsKey(RangerServiceDef.OPTION_ENABLE_DENY_AND_EXCEPTIONS_IN_POLICIES))
 {
+                                                       
if(serviceDefOptionsPreUpdate == null || 
!serviceDefOptionsPreUpdate.containsKey(RangerServiceDef.OPTION_ENABLE_DENY_AND_EXCEPTIONS_IN_POLICIES))
 {
+                                                               String 
preUpdateValue = serviceDefOptionsPreUpdate == null ? null : 
serviceDefOptionsPreUpdate.get(RangerServiceDef.OPTION_ENABLE_DENY_AND_EXCEPTIONS_IN_POLICIES);
+                                                               if 
(preUpdateValue == null) {
+                                                                       
serviceDefOptionsPostUpdate.remove(RangerServiceDef.OPTION_ENABLE_DENY_AND_EXCEPTIONS_IN_POLICIES);
+                                                               } else {
+                                                                       
serviceDefOptionsPostUpdate.put(RangerServiceDef.OPTION_ENABLE_DENY_AND_EXCEPTIONS_IN_POLICIES,
 preUpdateValue);
+                                                               }
+                                                               
xXServiceDefObj.setDefOptions(mapToJsonString(serviceDefOptionsPostUpdate));
+                                                               
daoMgr.getXXServiceDef().update(xXServiceDefObj);
+                                                       }
+                                               }
+                                       }
+                               }
+                       }
+                       }catch(Exception e)
+                       {
+                               logger.error("Error while updating 
"+SERVICEDBSTORE_SERVICEDEFBYNAME_KAFKA_NAME+"service-def", e);
+                       }
+       }
+
+       private boolean 
checkNewKafkaresourcePresent(List<RangerServiceDef.RangerResourceDef> 
resourceDefs) {
+               boolean ret = false;
+               for(RangerServiceDef.RangerResourceDef resourceDef : 
resourceDefs) {
+                       if 
(TRANSACTIONALID_RESOURCE_NAME.equals(resourceDef.getName()) ) {
+                               ret = true ;
+                               break;
+                       }
+               }
+               return ret;
+       }
+
+       private String mapToJsonString(Map<String, String> map) {
+               String ret = null;
+               if(map != null) {
+                       try {
+                               ret = jsonUtil.readMapToString(map);
+                       } catch(Exception excp) {
+                               logger.warn("mapToJsonString() failed to 
convert map: " + map, excp);
+                       }
+               }
+               return ret;
+       }
+
+       protected Map<String, String> jsonStringToMap(String jsonStr) {
+               Map<String, String> ret = null;
+               if(!StringUtils.isEmpty(jsonStr)) {
+                       try {
+                               ret = jsonUtil.jsonToMap(jsonStr);
+                       } catch(Exception excp) {
+                               // fallback to earlier format: 
"name1=value1;name2=value2"
+                               for(String optionString : jsonStr.split(";")) {
+                                       if(StringUtils.isEmpty(optionString)) {
+                                               continue;
+                                       }
+                                       String[] nvArr = 
optionString.split("=");
+                                       String name  = (nvArr != null && 
nvArr.length > 0) ? nvArr[0].trim() : null;
+                                       String value = (nvArr != null && 
nvArr.length > 1) ? nvArr[1].trim() : null;
+                                       if(StringUtils.isEmpty(name)) {
+                                               continue;
+                                       }
+                                       if(ret == null) {
+                                               ret = new HashMap<String, 
String>();
+                                       }
+                                       ret.put(name, value);
+                               }
+                       }
+               }
+               return ret;
+       }
+}
\ No newline at end of file

Reply via email to