Add put functionality for aerospike module
Project: http://git-wip-us.apache.org/repos/asf/gora/repo Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/8571ad90 Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/8571ad90 Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/8571ad90 Branch: refs/heads/master Commit: 8571ad90db93507118897e7fb51500ac4821c47c Parents: 5608731 Author: nishadi <[email protected]> Authored: Sun Jun 18 14:10:27 2017 +0530 Committer: nishadi <[email protected]> Committed: Sun Jun 18 14:12:57 2017 +0530 ---------------------------------------------------------------------- .../store/AerospikeMappingBuilder.java | 92 +++++++++++--------- .../gora/aerospike/store/AerospikeStore.java | 43 +++++++++ 2 files changed, 92 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/gora/blob/8571ad90/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMappingBuilder.java ---------------------------------------------------------------------- diff --git a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMappingBuilder.java b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMappingBuilder.java index 4e2b997..8744709 100644 --- a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMappingBuilder.java +++ b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMappingBuilder.java @@ -69,59 +69,65 @@ public class AerospikeMappingBuilder { for (Element policyElement : policyElements) { String policy = policyElement.getAttributeValue("name"); - - if (policy.equals("write")) { - - WritePolicy writePolicy = new WritePolicy(); - if (policyElement.getAttributeValue("gen") != null) - writePolicy.generationPolicy = getGenerationPolicyMapping(policyElement.getAttributeValue - ("gen").toUpperCase(Locale.getDefault())); - if (policyElement.getAttributeValue("exists") != null) - writePolicy.recordExistsAction = getRecordExistsAction(policyElement.getAttributeValue - ("exists").toUpperCase(Locale.getDefault())); - if (policyElement.getAttributeValue("key") != null) - writePolicy.sendKey = getKeyUsagePolicy(policyElement.getAttributeValue("key").toUpperCase - (Locale.getDefault())); - if (policyElement.getAttributeValue("retry") != null) - writePolicy.retryOnTimeout = getRetryOnTimeoutPolicy(policyElement.getAttributeValue - ("retry").toUpperCase(Locale.getDefault())); - if (policyElement.getAttributeValue("timeout") != null) - writePolicy.timeout = getTimeoutValue(policyElement.getAttributeValue("timeout")); - aerospikeMapping.setWritePolicy(writePolicy); - } else if (policy.equals("read")) { - - Policy readPolicy = new Policy(); - if (policyElement.getAttributeValue("key") != null) - readPolicy.sendKey = getKeyUsagePolicy(policyElement.getAttributeValue("key").toUpperCase(Locale - .getDefault())); - if (policyElement.getAttributeValue("timeout") != null) - readPolicy.timeout = getTimeoutValue(policyElement.getAttributeValue("timeout")); - aerospikeMapping.setReadPolicy(readPolicy); + if (policy != null) { + if (policy.equals("write")) { + WritePolicy writePolicy = new WritePolicy(); + if (policyElement.getAttributeValue("gen") != null) + writePolicy.generationPolicy = getGenerationPolicyMapping(policyElement.getAttributeValue + ("gen").toUpperCase(Locale.getDefault())); + if (policyElement.getAttributeValue("exists") != null) + writePolicy.recordExistsAction = getRecordExistsAction(policyElement.getAttributeValue + ("exists").toUpperCase(Locale.getDefault())); + if (policyElement.getAttributeValue("key") != null) + writePolicy.sendKey = getKeyUsagePolicy(policyElement.getAttributeValue("key").toUpperCase + (Locale.getDefault())); + if (policyElement.getAttributeValue("retry") != null) + writePolicy.retryOnTimeout = getRetryOnTimeoutPolicy(policyElement.getAttributeValue + ("retry").toUpperCase(Locale.getDefault())); + if (policyElement.getAttributeValue("timeout") != null) + writePolicy.timeout = getTimeoutValue(policyElement.getAttributeValue("timeout")); + aerospikeMapping.setWritePolicy(writePolicy); + } else if (policy.equals("read")) { + Policy readPolicy = new Policy(); + if (policyElement.getAttributeValue("key") != null) + readPolicy.sendKey = getKeyUsagePolicy(policyElement.getAttributeValue("key").toUpperCase(Locale + .getDefault())); + if (policyElement.getAttributeValue("timeout") != null) + readPolicy.timeout = getTimeoutValue(policyElement.getAttributeValue("timeout")); + aerospikeMapping.setReadPolicy(readPolicy); + } } } // Mapping the defined classes - List<Element> classElements = root.getChildren("policy"); + List<Element> classElements = root.getChildren("class"); boolean persistentAndKeyClassMatches = false; for (Element classElement : classElements) { - if (classElement.getAttributeValue("keyClass").equals(keyClass.getCanonicalName()) - && classElement.getAttributeValue("name").equals(persistentClass.getCanonicalName())) { - persistentAndKeyClassMatches = true; - - String nameSpace = classElement.getAttributeValue("namespace"); - if (nameSpace == null || nameSpace.isEmpty()) { - throw new ConfigurationException("Gora-aerospike-mapping does not include the relevant namespace for the " + - "class"); - } - aerospikeMapping.setNamespace(nameSpace); - String set = classElement.getAttributeValue("set"); - if (set != null && !set.isEmpty()) { - //ToDo : check for schema set name - aerospikeMapping.setSet(set); + String mappingKeyClass = classElement.getAttributeValue("keyClass"); + String mappingClassName = classElement.getAttributeValue("name"); + + if (mappingKeyClass != null && mappingClassName != null) { + if (mappingKeyClass.equals(keyClass.getCanonicalName()) + && mappingClassName.equals(persistentClass.getCanonicalName())) { + + persistentAndKeyClassMatches = true; + + String nameSpace = classElement.getAttributeValue("namespace"); + if (nameSpace == null || nameSpace.isEmpty()) { + throw new ConfigurationException("Gora-aerospike-mapping does not include the relevant namespace for " + + "the class"); + } + aerospikeMapping.setNamespace(nameSpace); + + String set = classElement.getAttributeValue("set"); + if (set != null && !set.isEmpty()) { + aerospikeMapping.setSet(set); + } } } + } if (!persistentAndKeyClassMatches) throw new ConfigurationException("Gora-aerospike-mapping does not include the name and keyClass in the " + http://git-wip-us.apache.org/repos/asf/gora/blob/8571ad90/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java ---------------------------------------------------------------------- diff --git a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java index 6ed1a76..c8610e1 100644 --- a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java +++ b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java @@ -22,6 +22,8 @@ import java.util.Properties; import com.aerospike.client.*; import com.aerospike.client.policy.ClientPolicy; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; import org.apache.gora.persistency.impl.PersistentBase; import org.apache.gora.query.PartitionQuery; import org.apache.gora.query.Query; @@ -92,6 +94,18 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K @Override public void put(K key, T value) { + Key recordKey = new Key(aerospikeParameters.getAerospikeMapping().getNamespace(), aerospikeParameters + .getAerospikeMapping().getSet(), Value.get(key)); + + List<Field> fields = value.getSchema().getFields(); + + for (int i = 0; i < fields.size(); i++) { + + // In retrieving the bin name, it is checked whether the server is single bin valued + String binName = aerospikeParameters.getBinName(fields.get(i).name()); + Bin bin = getBin(binName, value.get(i), fields.get(i)); + aerospikeClient.put(aerospikeParameters.getAerospikeMapping().getWritePolicy(), recordKey, bin); + } } @Override @@ -126,4 +140,33 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K public void close() { aerospikeClient.close(); } + + /** + * Aerospike does not support Utf8 format returned from Avro. + * This method provides those utf8 valued bin values as strings + * for aerospike Value to obtain the corresponding bin value, + * and returns the Bin + * + * @param binName name of the bin + * @param value value of the bin + * @param field field corresponding to bin + * @return + */ + private Bin getBin(String binName, Object value, Field field){ + + boolean isStringType = false; + if (field.schema().getType().equals(Schema.Type.STRING)) + isStringType = true; + if (field.schema().getType().equals(Schema.Type.UNION)){ + for (Schema schema :field.schema().getTypes()) { + if (schema.getName().equals("string")) + isStringType = true; + } + } + + if (isStringType) + return new Bin(binName, Value.get(value.toString())); + else + return new Bin(binName, Value.get(value)); + } }
