Add mapping reading and initialization for aerospike module
Project: http://git-wip-us.apache.org/repos/asf/gora/repo Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/1ad1cc9c Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/1ad1cc9c Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/1ad1cc9c Branch: refs/heads/master Commit: 1ad1cc9c96d0e2a12612a9efa76503218a935316 Parents: e796c8c Author: nishadi <ndime...@gmail.com> Authored: Sat Jun 17 22:26:05 2017 +0530 Committer: nishadi <ndime...@gmail.com> Committed: Sat Jun 17 22:26:05 2017 +0530 ---------------------------------------------------------------------- gora-aerospike/pom.xml | 6 + .../gora/aerospike/store/AerospikeMapping.java | 64 +++++ .../store/AerospikeMappingBuilder.java | 246 +++++++++++++++++++ .../aerospike/store/AerospikeParameters.java | 127 ++++++++++ .../gora/aerospike/store/AerospikeStore.java | 49 ++-- 5 files changed, 477 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/gora/blob/1ad1cc9c/gora-aerospike/pom.xml ---------------------------------------------------------------------- diff --git a/gora-aerospike/pom.xml b/gora-aerospike/pom.xml index 1a557f4..ddf17a1 100644 --- a/gora-aerospike/pom.xml +++ b/gora-aerospike/pom.xml @@ -124,6 +124,12 @@ <artifactId>hadoop-client</artifactId> </dependency> + <dependency> + <groupId>org.jdom</groupId> + <artifactId>jdom</artifactId> + <scope>compile</scope> + </dependency> + <!-- Logging Dependencies --> <dependency> <groupId>org.slf4j</groupId> http://git-wip-us.apache.org/repos/asf/gora/blob/1ad1cc9c/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMapping.java ---------------------------------------------------------------------- diff --git a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMapping.java b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMapping.java new file mode 100644 index 0000000..5df8a92 --- /dev/null +++ b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMapping.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.aerospike.store; + +import com.aerospike.client.policy.Policy; +import com.aerospike.client.policy.WritePolicy; + +public class AerospikeMapping { + private String namespace; + private String set; + private WritePolicy writePolicy; + private Policy readPolicy; + + public AerospikeMapping() { + writePolicy = new WritePolicy(); + readPolicy = new Policy(); + } + + public String getNamespace() { + return namespace; + } + + public void setNamespace(String namespace) { + this.namespace = namespace; + } + + public String getSet() { + return set; + } + + public void setSet(String set) { + this.set = set; + } + + public WritePolicy getWritePolicy() { + return writePolicy; + } + + public void setWritePolicy(WritePolicy writePolicy) { + this.writePolicy = writePolicy; + } + + public Policy getReadPolicy() { + return readPolicy; + } + + public void setReadPolicy(Policy readPolicy) { + this.readPolicy = readPolicy; + } +} http://git-wip-us.apache.org/repos/asf/gora/blob/1ad1cc9c/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMappingBuilder.java ---------------------------------------------------------------------- diff --git a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMappingBuilder.java b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMappingBuilder.java new file mode 100644 index 0000000..4e2b997 --- /dev/null +++ b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMappingBuilder.java @@ -0,0 +1,246 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.aerospike.store; + +import com.aerospike.client.policy.GenerationPolicy; +import com.aerospike.client.policy.Policy; +import com.aerospike.client.policy.RecordExistsAction; +import com.aerospike.client.policy.WritePolicy; +import org.jdom.Document; +import org.jdom.Element; +import org.jdom.input.SAXBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.naming.ConfigurationException; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.Locale; + +public class AerospikeMappingBuilder { + + public static final Logger LOG = LoggerFactory.getLogger(AerospikeMappingBuilder.class); + + private AerospikeMapping aerospikeMapping; + + public AerospikeMappingBuilder(String mappingFile, Class<?> keyClass, Class<?> persistentClass) throws IOException { + this.aerospikeMapping = new AerospikeMapping(); + this.readMappingFile(mappingFile, keyClass, persistentClass); + } + + public AerospikeMapping getAerospikeMapping() { + return this.aerospikeMapping; + } + + private void readMappingFile(String fileName, Class<?> keyClass, Class<?> persistentClass) throws IOException { + try { + SAXBuilder saxBuilder = new SAXBuilder(); + InputStream inputStream = getClass().getClassLoader().getResourceAsStream(fileName); + if (inputStream == null) { + LOG.warn("Mapping file '" + fileName + "' could not be found!"); + throw new IOException("Mapping file '" + fileName + "' could not be found!"); + } + Document document = saxBuilder.build(inputStream); + if (document == null) { + LOG.warn("Mapping file '" + fileName + "' could not be found!"); + throw new IOException("Mapping file '" + fileName + "' could not be found!"); + } + + Element root = document.getRootElement(); + + // Mapping the defined policies + List<Element> policyElements = root.getChildren("policy"); + + for (Element policyElement : policyElements) { + + String policy = policyElement.getAttributeValue("name"); + + if (policy.equals("write")) { + + WritePolicy writePolicy = new WritePolicy(); + if (policyElement.getAttributeValue("gen") != null) + writePolicy.generationPolicy = getGenerationPolicyMapping(policyElement.getAttributeValue + ("gen").toUpperCase(Locale.getDefault())); + if (policyElement.getAttributeValue("exists") != null) + writePolicy.recordExistsAction = getRecordExistsAction(policyElement.getAttributeValue + ("exists").toUpperCase(Locale.getDefault())); + if (policyElement.getAttributeValue("key") != null) + writePolicy.sendKey = getKeyUsagePolicy(policyElement.getAttributeValue("key").toUpperCase + (Locale.getDefault())); + if (policyElement.getAttributeValue("retry") != null) + writePolicy.retryOnTimeout = getRetryOnTimeoutPolicy(policyElement.getAttributeValue + ("retry").toUpperCase(Locale.getDefault())); + if (policyElement.getAttributeValue("timeout") != null) + writePolicy.timeout = getTimeoutValue(policyElement.getAttributeValue("timeout")); + aerospikeMapping.setWritePolicy(writePolicy); + } else if (policy.equals("read")) { + + Policy readPolicy = new Policy(); + if (policyElement.getAttributeValue("key") != null) + readPolicy.sendKey = getKeyUsagePolicy(policyElement.getAttributeValue("key").toUpperCase(Locale + .getDefault())); + if (policyElement.getAttributeValue("timeout") != null) + readPolicy.timeout = getTimeoutValue(policyElement.getAttributeValue("timeout")); + aerospikeMapping.setReadPolicy(readPolicy); + } + } + + // Mapping the defined classes + List<Element> classElements = root.getChildren("policy"); + + boolean persistentAndKeyClassMatches = false; + for (Element classElement : classElements) { + if (classElement.getAttributeValue("keyClass").equals(keyClass.getCanonicalName()) + && classElement.getAttributeValue("name").equals(persistentClass.getCanonicalName())) { + persistentAndKeyClassMatches = true; + + String nameSpace = classElement.getAttributeValue("namespace"); + if (nameSpace == null || nameSpace.isEmpty()) { + throw new ConfigurationException("Gora-aerospike-mapping does not include the relevant namespace for the " + + "class"); + } + aerospikeMapping.setNamespace(nameSpace); + + String set = classElement.getAttributeValue("set"); + if (set != null && !set.isEmpty()) { + //ToDo : check for schema set name + aerospikeMapping.setSet(set); + } + } + } + if (!persistentAndKeyClassMatches) + throw new ConfigurationException("Gora-aerospike-mapping does not include the name and keyClass in the " + + "databean"); + } catch (IOException e) { + LOG.error(e.getMessage(), e); + throw new IOException(e); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new IOException(e); + } + } + + private GenerationPolicy getGenerationPolicyMapping(String genPolicy) { + + if (genPolicy == null) + return GenerationPolicy.NONE; + + GenerationPolicy generationPolicy; + switch (genPolicy) { + case "IGNORE": + generationPolicy = GenerationPolicy.NONE; + break; + case "EQ": + generationPolicy = GenerationPolicy.EXPECT_GEN_EQUAL; + break; + case "GT": + generationPolicy = GenerationPolicy.EXPECT_GEN_GT; + break; + default: { + LOG.warn("Invalid generation policy provided, using the default generation policy"); + generationPolicy = GenerationPolicy.NONE; + } + } + return generationPolicy; + } + + private RecordExistsAction getRecordExistsAction(String existsPolicy) { + if (existsPolicy == null) + return RecordExistsAction.UPDATE; + + RecordExistsAction recordExistsAction; + switch (existsPolicy) { + case "UPDATE": + recordExistsAction = RecordExistsAction.UPDATE; + break; + case "UPDATE_ONLY": + recordExistsAction = RecordExistsAction.UPDATE_ONLY; + break; + case "REPLACE": + recordExistsAction = RecordExistsAction.REPLACE; + break; + case "REPLACE_ONLY": + recordExistsAction = RecordExistsAction.REPLACE_ONLY; + break; + case "CREATE_ONLY": + recordExistsAction = RecordExistsAction.CREATE_ONLY; + break; + default: { + LOG.warn("Invalid record exists action provided, using the default record exists action"); + recordExistsAction = RecordExistsAction.UPDATE; + } + } + return recordExistsAction; + } + + private boolean getKeyUsagePolicy(String keyPolicy) { + + if (keyPolicy == null) + return false; + + boolean sendKey; + switch (keyPolicy) { + case "DIGEST": + sendKey = false; + break; + case "SEND": + sendKey = true; + break; + default: { + LOG.warn("Invalid key action policy provided, using the default key action policy"); + sendKey = false; + } + } + return sendKey; + } + + private boolean getRetryOnTimeoutPolicy(String retry) { + + if (retry == null) + return false; + + boolean retryOnTimeout; + switch (retry) { + case "NONE": + retryOnTimeout = false; + break; + case "ONCE": + retryOnTimeout = true; + break; + default: { + LOG.warn("Invalid key retry policy provided, using the default retry policy"); + retryOnTimeout = false; + } + } + return retryOnTimeout; + } + + private int getTimeoutValue(String timeout) { + + if (timeout == null) + return 0; + int timeoutInt = 0; + try { + timeoutInt = Integer.valueOf(timeout); + } catch (NumberFormatException e) { + LOG.warn("Invalid timeout value provided, using the default timeout value"); + } + return timeoutInt; + } + +} http://git-wip-us.apache.org/repos/asf/gora/blob/1ad1cc9c/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeParameters.java ---------------------------------------------------------------------- diff --git a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeParameters.java b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeParameters.java new file mode 100644 index 0000000..539e67d --- /dev/null +++ b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeParameters.java @@ -0,0 +1,127 @@ +package org.apache.gora.aerospike.store; + +import com.aerospike.client.AerospikeClient; +import com.aerospike.client.Info; +import com.aerospike.client.cluster.Node; +import com.aerospike.client.policy.Policy; +import com.aerospike.client.policy.WritePolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +public class AerospikeParameters { + private String host; + private int port; + private String user; + private String password; + private AerospikeMapping aerospikeMapping; + private boolean singleBin; + + // Property names + private static final String AS_SERVER_IP = "server.ip"; + private static final String AS_SERVER_port = "server.port"; + + // Default property values + private static final String DEFAULT_SERVER_IP = "localhost"; + private static final String DEFAULT_SERVER_PORT = "3000"; + + public static final Logger LOG = LoggerFactory.getLogger(AerospikeParameters.class); + + public AerospikeParameters(AerospikeMapping aerospikeMapping, Properties properties) throws NumberFormatException { + this.aerospikeMapping = aerospikeMapping; + this.host = properties.getProperty(AS_SERVER_IP, DEFAULT_SERVER_IP); + try { + this.port = Integer.parseInt(properties.getProperty(AS_SERVER_port, DEFAULT_SERVER_PORT)); + } catch (NumberFormatException e) { + LOG.error(e.getMessage(), e); + throw e; + } + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public String getUser() { + return user; + } + + public void setUser(String user) { + this.user = user; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public AerospikeMapping getAerospikeMapping() { + return aerospikeMapping; + } + + public void setAerospikeMapping(AerospikeMapping aerospikeMapping) { + this.aerospikeMapping = aerospikeMapping; + } + + public boolean isSingleBin() { + return singleBin; + } + + public void setSingleBin(boolean singleBin) { + this.singleBin = singleBin; + } + + + // Some database calls need to know how the server is configured. + public void setServerSpecificParameters(AerospikeClient client) throws Exception { + Node node = client.getNodes()[0]; + String namespaceFilter = "namespace/" + aerospikeMapping.getNamespace(); + String namespaceTokens = Info.request(null, node, namespaceFilter); + + if (namespaceTokens == null) { + throw new Exception("Failed to get namespace info"); + } + + singleBin = parseBoolean(namespaceTokens, "single-bin"); + } + + private static boolean parseBoolean(String namespaceTokens, String name) { + String search = name + '='; + int begin = namespaceTokens.indexOf(search); + + if (begin < 0) { + return false; + } + + begin += search.length(); + int end = namespaceTokens.indexOf(';', begin); + + if (end < 0) { + end = namespaceTokens.length(); + } + + String value = namespaceTokens.substring(begin, end); + return Boolean.parseBoolean(value); + } + + // ToDo : check + public String getBinName(String name) { + return singleBin ? "" : name; + } +} http://git-wip-us.apache.org/repos/asf/gora/blob/1ad1cc9c/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java ---------------------------------------------------------------------- diff --git a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java index 8c31773..6ed1a76 100644 --- a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java +++ b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -21,6 +21,7 @@ import java.util.List; import java.util.Properties; import com.aerospike.client.*; +import com.aerospike.client.policy.ClientPolicy; import org.apache.gora.persistency.impl.PersistentBase; import org.apache.gora.query.PartitionQuery; import org.apache.gora.query.Query; @@ -29,23 +30,40 @@ import org.apache.gora.store.DataStoreFactory; import org.apache.gora.store.impl.DataStoreBase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + /** * Implementation of a Aerospike data store to be used by gora. * - * @param <K> - * class to be used for the key - * @param <T> - * class to be persisted within the store + * @param <K> class to be used for the key + * @param <T> class to be persisted within the store */ -public class AerospikeStore<K,T extends PersistentBase> extends DataStoreBase<K,T> { +public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K, T> { public static final Logger LOG = LoggerFactory.getLogger(AerospikeStore.class); + private static final String PARSE_MAPPING_FILE_KEY = "gora.aerospike.mapping.file"; + private static final String DEFAULT_MAPPING_FILE = "gora-aerospike-mapping.xml"; + private AerospikeClient aerospikeClient; + private AerospikeParameters aerospikeParameters; @Override public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) { - super.initialize(keyClass, persistentClass, properties); + super.initialize(keyClass, persistentClass, properties); + + try { + AerospikeMappingBuilder aerospikeMappingBuilder = new AerospikeMappingBuilder(getConf().get + (PARSE_MAPPING_FILE_KEY, DEFAULT_MAPPING_FILE), keyClass, + persistentClass); + aerospikeParameters = new AerospikeParameters(aerospikeMappingBuilder.getAerospikeMapping(), properties); + ClientPolicy policy = new ClientPolicy(); + policy.writePolicyDefault = aerospikeParameters.getAerospikeMapping().getWritePolicy(); + policy.readPolicyDefault = aerospikeParameters.getAerospikeMapping().getReadPolicy(); + aerospikeClient = new AerospikeClient(aerospikeParameters.getHost(), aerospikeParameters.getPort()); + aerospikeParameters.setServerSpecificParameters(aerospikeClient); + } catch (Exception e) { + throw new RuntimeException(e); + } } @Override @@ -68,11 +86,12 @@ public class AerospikeStore<K,T extends PersistentBase> extends DataStoreBase<K, @Override public T get(K key, String[] fields) { - return null; + return null; } @Override - public void put(K key, T val) { + public void put(K key, T value) { + } @Override @@ -81,22 +100,22 @@ public class AerospikeStore<K,T extends PersistentBase> extends DataStoreBase<K, } @Override - public long deleteByQuery(Query<K,T> query) { + public long deleteByQuery(Query<K, T> query) { return 0; } @Override - public Result<K,T> execute(Query<K,T> query) { + public Result<K, T> execute(Query<K, T> query) { return null; } @Override - public Query<K,T> newQuery() { + public Query<K, T> newQuery() { return null; } @Override - public List<PartitionQuery<K,T>> getPartitions(Query<K,T> query) throws IOException { + public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws IOException { return null; }