Add mapping reading and initialization for aerospike module

Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/1ad1cc9c
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/1ad1cc9c
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/1ad1cc9c

Branch: refs/heads/master
Commit: 1ad1cc9c96d0e2a12612a9efa76503218a935316
Parents: e796c8c
Author: nishadi <ndime...@gmail.com>
Authored: Sat Jun 17 22:26:05 2017 +0530
Committer: nishadi <ndime...@gmail.com>
Committed: Sat Jun 17 22:26:05 2017 +0530

----------------------------------------------------------------------
 gora-aerospike/pom.xml                          |   6 +
 .../gora/aerospike/store/AerospikeMapping.java  |  64 +++++
 .../store/AerospikeMappingBuilder.java          | 246 +++++++++++++++++++
 .../aerospike/store/AerospikeParameters.java    | 127 ++++++++++
 .../gora/aerospike/store/AerospikeStore.java    |  49 ++--
 5 files changed, 477 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/1ad1cc9c/gora-aerospike/pom.xml
----------------------------------------------------------------------
diff --git a/gora-aerospike/pom.xml b/gora-aerospike/pom.xml
index 1a557f4..ddf17a1 100644
--- a/gora-aerospike/pom.xml
+++ b/gora-aerospike/pom.xml
@@ -124,6 +124,12 @@
       <artifactId>hadoop-client</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.jdom</groupId>
+      <artifactId>jdom</artifactId>
+      <scope>compile</scope>
+    </dependency>
+
     <!-- Logging Dependencies -->
     <dependency>
       <groupId>org.slf4j</groupId>

http://git-wip-us.apache.org/repos/asf/gora/blob/1ad1cc9c/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMapping.java
----------------------------------------------------------------------
diff --git 
a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMapping.java
 
b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMapping.java
new file mode 100644
index 0000000..5df8a92
--- /dev/null
+++ 
b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMapping.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.aerospike.store;
+
+import com.aerospike.client.policy.Policy;
+import com.aerospike.client.policy.WritePolicy;
+
+public class AerospikeMapping {
+  private String namespace;
+  private String set;
+  private WritePolicy writePolicy;
+  private Policy readPolicy;
+
+  public AerospikeMapping() {
+    writePolicy = new WritePolicy();
+    readPolicy = new Policy();
+  }
+
+  public String getNamespace() {
+    return namespace;
+  }
+
+  public void setNamespace(String namespace) {
+    this.namespace = namespace;
+  }
+
+  public String getSet() {
+    return set;
+  }
+
+  public void setSet(String set) {
+    this.set = set;
+  }
+
+  public WritePolicy getWritePolicy() {
+    return writePolicy;
+  }
+
+  public void setWritePolicy(WritePolicy writePolicy) {
+    this.writePolicy = writePolicy;
+  }
+
+  public Policy getReadPolicy() {
+    return readPolicy;
+  }
+
+  public void setReadPolicy(Policy readPolicy) {
+    this.readPolicy = readPolicy;
+  }
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/1ad1cc9c/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMappingBuilder.java
----------------------------------------------------------------------
diff --git 
a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMappingBuilder.java
 
b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMappingBuilder.java
new file mode 100644
index 0000000..4e2b997
--- /dev/null
+++ 
b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMappingBuilder.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.aerospike.store;
+
+import com.aerospike.client.policy.GenerationPolicy;
+import com.aerospike.client.policy.Policy;
+import com.aerospike.client.policy.RecordExistsAction;
+import com.aerospike.client.policy.WritePolicy;
+import org.jdom.Document;
+import org.jdom.Element;
+import org.jdom.input.SAXBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.naming.ConfigurationException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Locale;
+
+public class AerospikeMappingBuilder {
+
+  public static final Logger LOG = 
LoggerFactory.getLogger(AerospikeMappingBuilder.class);
+
+  private AerospikeMapping aerospikeMapping;
+
+  public AerospikeMappingBuilder(String mappingFile, Class<?> keyClass, 
Class<?> persistentClass) throws IOException {
+    this.aerospikeMapping = new AerospikeMapping();
+    this.readMappingFile(mappingFile, keyClass, persistentClass);
+  }
+
+  public AerospikeMapping getAerospikeMapping() {
+    return this.aerospikeMapping;
+  }
+
+  private void readMappingFile(String fileName, Class<?> keyClass, Class<?> 
persistentClass) throws IOException {
+    try {
+      SAXBuilder saxBuilder = new SAXBuilder();
+      InputStream inputStream = 
getClass().getClassLoader().getResourceAsStream(fileName);
+      if (inputStream == null) {
+        LOG.warn("Mapping file '" + fileName + "' could not be found!");
+        throw new IOException("Mapping file '" + fileName + "' could not be 
found!");
+      }
+      Document document = saxBuilder.build(inputStream);
+      if (document == null) {
+        LOG.warn("Mapping file '" + fileName + "' could not be found!");
+        throw new IOException("Mapping file '" + fileName + "' could not be 
found!");
+      }
+
+      Element root = document.getRootElement();
+
+      // Mapping the defined policies
+      List<Element> policyElements = root.getChildren("policy");
+
+      for (Element policyElement : policyElements) {
+
+        String policy = policyElement.getAttributeValue("name");
+
+        if (policy.equals("write")) {
+
+          WritePolicy writePolicy = new WritePolicy();
+          if (policyElement.getAttributeValue("gen") != null)
+            writePolicy.generationPolicy = 
getGenerationPolicyMapping(policyElement.getAttributeValue
+              ("gen").toUpperCase(Locale.getDefault()));
+          if (policyElement.getAttributeValue("exists") != null)
+            writePolicy.recordExistsAction = 
getRecordExistsAction(policyElement.getAttributeValue
+              ("exists").toUpperCase(Locale.getDefault()));
+          if (policyElement.getAttributeValue("key") != null)
+            writePolicy.sendKey = 
getKeyUsagePolicy(policyElement.getAttributeValue("key").toUpperCase
+              (Locale.getDefault()));
+          if (policyElement.getAttributeValue("retry") != null)
+            writePolicy.retryOnTimeout = 
getRetryOnTimeoutPolicy(policyElement.getAttributeValue
+              ("retry").toUpperCase(Locale.getDefault()));
+          if (policyElement.getAttributeValue("timeout") != null)
+            writePolicy.timeout = 
getTimeoutValue(policyElement.getAttributeValue("timeout"));
+          aerospikeMapping.setWritePolicy(writePolicy);
+        } else if (policy.equals("read")) {
+
+          Policy readPolicy = new Policy();
+          if (policyElement.getAttributeValue("key") != null)
+            readPolicy.sendKey = 
getKeyUsagePolicy(policyElement.getAttributeValue("key").toUpperCase(Locale
+              .getDefault()));
+          if (policyElement.getAttributeValue("timeout") != null)
+            readPolicy.timeout = 
getTimeoutValue(policyElement.getAttributeValue("timeout"));
+          aerospikeMapping.setReadPolicy(readPolicy);
+        }
+      }
+
+      // Mapping the defined classes
+      List<Element> classElements = root.getChildren("policy");
+
+      boolean persistentAndKeyClassMatches = false;
+      for (Element classElement : classElements) {
+        if 
(classElement.getAttributeValue("keyClass").equals(keyClass.getCanonicalName())
+          && 
classElement.getAttributeValue("name").equals(persistentClass.getCanonicalName()))
 {
+          persistentAndKeyClassMatches = true;
+
+          String nameSpace = classElement.getAttributeValue("namespace");
+          if (nameSpace == null || nameSpace.isEmpty()) {
+            throw new ConfigurationException("Gora-aerospike-mapping does not 
include the relevant namespace for the " +
+              "class");
+          }
+          aerospikeMapping.setNamespace(nameSpace);
+
+          String set = classElement.getAttributeValue("set");
+          if (set != null && !set.isEmpty()) {
+            //ToDo : check for schema set name
+            aerospikeMapping.setSet(set);
+          }
+        }
+      }
+      if (!persistentAndKeyClassMatches)
+        throw new ConfigurationException("Gora-aerospike-mapping does not 
include the name and keyClass in the " +
+          "databean");
+    } catch (IOException e) {
+      LOG.error(e.getMessage(), e);
+      throw new IOException(e);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+      throw new IOException(e);
+    }
+  }
+
+  private GenerationPolicy getGenerationPolicyMapping(String genPolicy) {
+
+    if (genPolicy == null)
+      return GenerationPolicy.NONE;
+
+    GenerationPolicy generationPolicy;
+    switch (genPolicy) {
+      case "IGNORE":
+        generationPolicy = GenerationPolicy.NONE;
+        break;
+      case "EQ":
+        generationPolicy = GenerationPolicy.EXPECT_GEN_EQUAL;
+        break;
+      case "GT":
+        generationPolicy = GenerationPolicy.EXPECT_GEN_GT;
+        break;
+      default: {
+        LOG.warn("Invalid generation policy provided, using the default 
generation policy");
+        generationPolicy = GenerationPolicy.NONE;
+      }
+    }
+    return generationPolicy;
+  }
+
+  private RecordExistsAction getRecordExistsAction(String existsPolicy) {
+    if (existsPolicy == null)
+      return RecordExistsAction.UPDATE;
+
+    RecordExistsAction recordExistsAction;
+    switch (existsPolicy) {
+      case "UPDATE":
+        recordExistsAction = RecordExistsAction.UPDATE;
+        break;
+      case "UPDATE_ONLY":
+        recordExistsAction = RecordExistsAction.UPDATE_ONLY;
+        break;
+      case "REPLACE":
+        recordExistsAction = RecordExistsAction.REPLACE;
+        break;
+      case "REPLACE_ONLY":
+        recordExistsAction = RecordExistsAction.REPLACE_ONLY;
+        break;
+      case "CREATE_ONLY":
+        recordExistsAction = RecordExistsAction.CREATE_ONLY;
+        break;
+      default: {
+        LOG.warn("Invalid record exists action provided, using the default 
record exists action");
+        recordExistsAction = RecordExistsAction.UPDATE;
+      }
+    }
+    return recordExistsAction;
+  }
+
+  private boolean getKeyUsagePolicy(String keyPolicy) {
+
+    if (keyPolicy == null)
+      return false;
+
+    boolean sendKey;
+    switch (keyPolicy) {
+      case "DIGEST":
+        sendKey = false;
+        break;
+      case "SEND":
+        sendKey = true;
+        break;
+      default: {
+        LOG.warn("Invalid key action policy provided, using the default key 
action policy");
+        sendKey = false;
+      }
+    }
+    return sendKey;
+  }
+
+  private boolean getRetryOnTimeoutPolicy(String retry) {
+
+    if (retry == null)
+      return false;
+
+    boolean retryOnTimeout;
+    switch (retry) {
+      case "NONE":
+        retryOnTimeout = false;
+        break;
+      case "ONCE":
+        retryOnTimeout = true;
+        break;
+      default: {
+        LOG.warn("Invalid key retry policy provided, using the default retry 
policy");
+        retryOnTimeout = false;
+      }
+    }
+    return retryOnTimeout;
+  }
+
+  private int getTimeoutValue(String timeout) {
+
+    if (timeout == null)
+      return 0;
+    int timeoutInt = 0;
+    try {
+      timeoutInt = Integer.valueOf(timeout);
+    } catch (NumberFormatException e) {
+      LOG.warn("Invalid timeout value provided, using the default timeout 
value");
+    }
+    return timeoutInt;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/1ad1cc9c/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeParameters.java
----------------------------------------------------------------------
diff --git 
a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeParameters.java
 
b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeParameters.java
new file mode 100644
index 0000000..539e67d
--- /dev/null
+++ 
b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeParameters.java
@@ -0,0 +1,127 @@
+package org.apache.gora.aerospike.store;
+
+import com.aerospike.client.AerospikeClient;
+import com.aerospike.client.Info;
+import com.aerospike.client.cluster.Node;
+import com.aerospike.client.policy.Policy;
+import com.aerospike.client.policy.WritePolicy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+public class AerospikeParameters {
+  private String host;
+  private int port;
+  private String user;
+  private String password;
+  private AerospikeMapping aerospikeMapping;
+  private boolean singleBin;
+
+  // Property names
+  private static final String AS_SERVER_IP = "server.ip";
+  private static final String AS_SERVER_port = "server.port";
+
+  // Default property values
+  private static final String DEFAULT_SERVER_IP = "localhost";
+  private static final String DEFAULT_SERVER_PORT = "3000";
+
+  public static final Logger LOG = 
LoggerFactory.getLogger(AerospikeParameters.class);
+
+  public AerospikeParameters(AerospikeMapping aerospikeMapping, Properties 
properties) throws NumberFormatException {
+    this.aerospikeMapping = aerospikeMapping;
+    this.host = properties.getProperty(AS_SERVER_IP, DEFAULT_SERVER_IP);
+    try {
+      this.port = Integer.parseInt(properties.getProperty(AS_SERVER_port, 
DEFAULT_SERVER_PORT));
+    } catch (NumberFormatException e) {
+      LOG.error(e.getMessage(), e);
+      throw e;
+    }
+  }
+
+  public String getHost() {
+    return host;
+  }
+
+  public void setHost(String host) {
+    this.host = host;
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  public void setPort(int port) {
+    this.port = port;
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+  public void setUser(String user) {
+    this.user = user;
+  }
+
+  public String getPassword() {
+    return password;
+  }
+
+  public void setPassword(String password) {
+    this.password = password;
+  }
+
+  public AerospikeMapping getAerospikeMapping() {
+    return aerospikeMapping;
+  }
+
+  public void setAerospikeMapping(AerospikeMapping aerospikeMapping) {
+    this.aerospikeMapping = aerospikeMapping;
+  }
+
+  public boolean isSingleBin() {
+    return singleBin;
+  }
+
+  public void setSingleBin(boolean singleBin) {
+    this.singleBin = singleBin;
+  }
+
+
+  // Some database calls need to know how the server is configured.
+  public void setServerSpecificParameters(AerospikeClient client) throws 
Exception {
+    Node node = client.getNodes()[0];
+    String namespaceFilter = "namespace/" + aerospikeMapping.getNamespace();
+    String namespaceTokens = Info.request(null, node, namespaceFilter);
+
+    if (namespaceTokens == null) {
+      throw new Exception("Failed to get namespace info");
+    }
+
+    singleBin = parseBoolean(namespaceTokens, "single-bin");
+  }
+
+  private static boolean parseBoolean(String namespaceTokens, String name) {
+    String search = name + '=';
+    int begin = namespaceTokens.indexOf(search);
+
+    if (begin < 0) {
+      return false;
+    }
+
+    begin += search.length();
+    int end = namespaceTokens.indexOf(';', begin);
+
+    if (end < 0) {
+      end = namespaceTokens.length();
+    }
+
+    String value = namespaceTokens.substring(begin, end);
+    return Boolean.parseBoolean(value);
+  }
+
+  // ToDo : check
+  public String getBinName(String name) {
+    return singleBin ? "" : name;
+  }
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/1ad1cc9c/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java
----------------------------------------------------------------------
diff --git 
a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java
 
b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java
index 8c31773..6ed1a76 100644
--- 
a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java
+++ 
b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Properties;
 
 import com.aerospike.client.*;
+import com.aerospike.client.policy.ClientPolicy;
 import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.PartitionQuery;
 import org.apache.gora.query.Query;
@@ -29,23 +30,40 @@ import org.apache.gora.store.DataStoreFactory;
 import org.apache.gora.store.impl.DataStoreBase;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 /**
  * Implementation of a Aerospike data store to be used by gora.
  *
- * @param <K>
- *            class to be used for the key
- * @param <T>
- *            class to be persisted within the store
+ * @param <K> class to be used for the key
+ * @param <T> class to be persisted within the store
  */
-public class AerospikeStore<K,T extends PersistentBase> extends 
DataStoreBase<K,T> {
+public class AerospikeStore<K, T extends PersistentBase> extends 
DataStoreBase<K, T> {
 
   public static final Logger LOG = 
LoggerFactory.getLogger(AerospikeStore.class);
+  private static final String PARSE_MAPPING_FILE_KEY = 
"gora.aerospike.mapping.file";
+  private static final String DEFAULT_MAPPING_FILE = 
"gora-aerospike-mapping.xml";
+
 
   private AerospikeClient aerospikeClient;
+  private AerospikeParameters aerospikeParameters;
 
   @Override
   public void initialize(Class<K> keyClass, Class<T> persistentClass, 
Properties properties) {
-      super.initialize(keyClass, persistentClass, properties);
+    super.initialize(keyClass, persistentClass, properties);
+
+    try {
+      AerospikeMappingBuilder aerospikeMappingBuilder = new 
AerospikeMappingBuilder(getConf().get
+        (PARSE_MAPPING_FILE_KEY, DEFAULT_MAPPING_FILE), keyClass,
+        persistentClass);
+      aerospikeParameters = new 
AerospikeParameters(aerospikeMappingBuilder.getAerospikeMapping(), properties);
+      ClientPolicy policy = new ClientPolicy();
+      policy.writePolicyDefault = 
aerospikeParameters.getAerospikeMapping().getWritePolicy();
+      policy.readPolicyDefault = 
aerospikeParameters.getAerospikeMapping().getReadPolicy();
+      aerospikeClient = new AerospikeClient(aerospikeParameters.getHost(), 
aerospikeParameters.getPort());
+      aerospikeParameters.setServerSpecificParameters(aerospikeClient);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
@@ -68,11 +86,12 @@ public class AerospikeStore<K,T extends PersistentBase> 
extends DataStoreBase<K,
 
   @Override
   public T get(K key, String[] fields) {
-    return  null;
+    return null;
   }
 
   @Override
-  public void put(K key, T val) {
+  public void put(K key, T value) {
+
   }
 
   @Override
@@ -81,22 +100,22 @@ public class AerospikeStore<K,T extends PersistentBase> 
extends DataStoreBase<K,
   }
 
   @Override
-  public long deleteByQuery(Query<K,T> query) {
+  public long deleteByQuery(Query<K, T> query) {
     return 0;
   }
 
   @Override
-  public Result<K,T> execute(Query<K,T> query) {
+  public Result<K, T> execute(Query<K, T> query) {
     return null;
   }
 
   @Override
-  public Query<K,T> newQuery() {
+  public Query<K, T> newQuery() {
     return null;
   }
 
   @Override
-  public List<PartitionQuery<K,T>> getPartitions(Query<K,T> query) throws 
IOException {
+  public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws 
IOException {
     return null;
   }
 

Reply via email to