Add put functionality for aerospike module

Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/8571ad90
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/8571ad90
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/8571ad90

Branch: refs/heads/master
Commit: 8571ad90db93507118897e7fb51500ac4821c47c
Parents: 5608731
Author: nishadi <[email protected]>
Authored: Sun Jun 18 14:10:27 2017 +0530
Committer: nishadi <[email protected]>
Committed: Sun Jun 18 14:12:57 2017 +0530

----------------------------------------------------------------------
 .../store/AerospikeMappingBuilder.java          | 92 +++++++++++---------
 .../gora/aerospike/store/AerospikeStore.java    | 43 +++++++++
 2 files changed, 92 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/8571ad90/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMappingBuilder.java
----------------------------------------------------------------------
diff --git 
a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMappingBuilder.java
 
b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMappingBuilder.java
index 4e2b997..8744709 100644
--- 
a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMappingBuilder.java
+++ 
b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMappingBuilder.java
@@ -69,59 +69,65 @@ public class AerospikeMappingBuilder {
       for (Element policyElement : policyElements) {
 
         String policy = policyElement.getAttributeValue("name");
-
-        if (policy.equals("write")) {
-
-          WritePolicy writePolicy = new WritePolicy();
-          if (policyElement.getAttributeValue("gen") != null)
-            writePolicy.generationPolicy = 
getGenerationPolicyMapping(policyElement.getAttributeValue
-              ("gen").toUpperCase(Locale.getDefault()));
-          if (policyElement.getAttributeValue("exists") != null)
-            writePolicy.recordExistsAction = 
getRecordExistsAction(policyElement.getAttributeValue
-              ("exists").toUpperCase(Locale.getDefault()));
-          if (policyElement.getAttributeValue("key") != null)
-            writePolicy.sendKey = 
getKeyUsagePolicy(policyElement.getAttributeValue("key").toUpperCase
-              (Locale.getDefault()));
-          if (policyElement.getAttributeValue("retry") != null)
-            writePolicy.retryOnTimeout = 
getRetryOnTimeoutPolicy(policyElement.getAttributeValue
-              ("retry").toUpperCase(Locale.getDefault()));
-          if (policyElement.getAttributeValue("timeout") != null)
-            writePolicy.timeout = 
getTimeoutValue(policyElement.getAttributeValue("timeout"));
-          aerospikeMapping.setWritePolicy(writePolicy);
-        } else if (policy.equals("read")) {
-
-          Policy readPolicy = new Policy();
-          if (policyElement.getAttributeValue("key") != null)
-            readPolicy.sendKey = 
getKeyUsagePolicy(policyElement.getAttributeValue("key").toUpperCase(Locale
-              .getDefault()));
-          if (policyElement.getAttributeValue("timeout") != null)
-            readPolicy.timeout = 
getTimeoutValue(policyElement.getAttributeValue("timeout"));
-          aerospikeMapping.setReadPolicy(readPolicy);
+        if (policy != null) {
+          if (policy.equals("write")) {
+            WritePolicy writePolicy = new WritePolicy();
+            if (policyElement.getAttributeValue("gen") != null)
+              writePolicy.generationPolicy = 
getGenerationPolicyMapping(policyElement.getAttributeValue
+                ("gen").toUpperCase(Locale.getDefault()));
+            if (policyElement.getAttributeValue("exists") != null)
+              writePolicy.recordExistsAction = 
getRecordExistsAction(policyElement.getAttributeValue
+                ("exists").toUpperCase(Locale.getDefault()));
+            if (policyElement.getAttributeValue("key") != null)
+              writePolicy.sendKey = 
getKeyUsagePolicy(policyElement.getAttributeValue("key").toUpperCase
+                (Locale.getDefault()));
+            if (policyElement.getAttributeValue("retry") != null)
+              writePolicy.retryOnTimeout = 
getRetryOnTimeoutPolicy(policyElement.getAttributeValue
+                ("retry").toUpperCase(Locale.getDefault()));
+            if (policyElement.getAttributeValue("timeout") != null)
+              writePolicy.timeout = 
getTimeoutValue(policyElement.getAttributeValue("timeout"));
+            aerospikeMapping.setWritePolicy(writePolicy);
+          } else if (policy.equals("read")) {
+            Policy readPolicy = new Policy();
+            if (policyElement.getAttributeValue("key") != null)
+              readPolicy.sendKey = 
getKeyUsagePolicy(policyElement.getAttributeValue("key").toUpperCase(Locale
+                .getDefault()));
+            if (policyElement.getAttributeValue("timeout") != null)
+              readPolicy.timeout = 
getTimeoutValue(policyElement.getAttributeValue("timeout"));
+            aerospikeMapping.setReadPolicy(readPolicy);
+          }
         }
       }
 
       // Mapping the defined classes
-      List<Element> classElements = root.getChildren("policy");
+      List<Element> classElements = root.getChildren("class");
 
       boolean persistentAndKeyClassMatches = false;
       for (Element classElement : classElements) {
-        if 
(classElement.getAttributeValue("keyClass").equals(keyClass.getCanonicalName())
-          && 
classElement.getAttributeValue("name").equals(persistentClass.getCanonicalName()))
 {
-          persistentAndKeyClassMatches = true;
-
-          String nameSpace = classElement.getAttributeValue("namespace");
-          if (nameSpace == null || nameSpace.isEmpty()) {
-            throw new ConfigurationException("Gora-aerospike-mapping does not 
include the relevant namespace for the " +
-              "class");
-          }
-          aerospikeMapping.setNamespace(nameSpace);
 
-          String set = classElement.getAttributeValue("set");
-          if (set != null && !set.isEmpty()) {
-            //ToDo : check for schema set name
-            aerospikeMapping.setSet(set);
+        String mappingKeyClass = classElement.getAttributeValue("keyClass");
+        String mappingClassName = classElement.getAttributeValue("name");
+
+        if (mappingKeyClass != null && mappingClassName != null) {
+          if (mappingKeyClass.equals(keyClass.getCanonicalName())
+            && mappingClassName.equals(persistentClass.getCanonicalName())) {
+
+            persistentAndKeyClassMatches = true;
+
+            String nameSpace = classElement.getAttributeValue("namespace");
+            if (nameSpace == null || nameSpace.isEmpty()) {
+              throw new ConfigurationException("Gora-aerospike-mapping does 
not include the relevant namespace for " +
+                "the class");
+            }
+            aerospikeMapping.setNamespace(nameSpace);
+
+            String set = classElement.getAttributeValue("set");
+            if (set != null && !set.isEmpty()) {
+              aerospikeMapping.setSet(set);
+            }
           }
         }
+
       }
       if (!persistentAndKeyClassMatches)
         throw new ConfigurationException("Gora-aerospike-mapping does not 
include the name and keyClass in the " +

http://git-wip-us.apache.org/repos/asf/gora/blob/8571ad90/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java
----------------------------------------------------------------------
diff --git 
a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java
 
b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java
index 6ed1a76..c8610e1 100644
--- 
a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java
+++ 
b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java
@@ -22,6 +22,8 @@ import java.util.Properties;
 
 import com.aerospike.client.*;
 import com.aerospike.client.policy.ClientPolicy;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
 import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.PartitionQuery;
 import org.apache.gora.query.Query;
@@ -92,6 +94,18 @@ public class AerospikeStore<K, T extends PersistentBase> 
extends DataStoreBase<K
   @Override
   public void put(K key, T value) {
 
+    Key recordKey = new 
Key(aerospikeParameters.getAerospikeMapping().getNamespace(), 
aerospikeParameters
+      .getAerospikeMapping().getSet(), Value.get(key));
+
+    List<Field> fields = value.getSchema().getFields();
+
+    for (int i = 0; i < fields.size(); i++) {
+
+      // In retrieving the bin name, it is checked whether the server is 
single bin valued
+      String binName = aerospikeParameters.getBinName(fields.get(i).name());
+      Bin bin = getBin(binName, value.get(i), fields.get(i));
+      
aerospikeClient.put(aerospikeParameters.getAerospikeMapping().getWritePolicy(), 
recordKey, bin);
+    }
   }
 
   @Override
@@ -126,4 +140,33 @@ public class AerospikeStore<K, T extends PersistentBase> 
extends DataStoreBase<K
   public void close() {
     aerospikeClient.close();
   }
+
+  /**
+   * Aerospike does not support Utf8 format returned from Avro.
+   * This method provides those utf8 valued bin values as strings
+   * for aerospike Value to obtain the corresponding bin value,
+   * and returns the Bin
+   *
+   * @param binName name of the bin
+   * @param value value of the bin
+   * @param field field corresponding to bin
+   * @return
+   */
+  private Bin getBin(String binName, Object value, Field field){
+    
+    boolean isStringType = false;
+    if (field.schema().getType().equals(Schema.Type.STRING))
+      isStringType = true;
+    if (field.schema().getType().equals(Schema.Type.UNION)){
+      for (Schema schema :field.schema().getTypes()) {
+        if (schema.getName().equals("string"))
+          isStringType = true;
+      }
+    }
+
+    if (isStringType)
+      return new Bin(binName, Value.get(value.toString()));
+    else
+      return new Bin(binName, Value.get(value));
+  }
 }

Reply via email to