[
https://issues.apache.org/jira/browse/GORA-502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16064142#comment-16064142
]
ASF GitHub Bot commented on GORA-502:
-------------------------------------
Github user kamaci commented on a diff in the pull request:
https://github.com/apache/gora/pull/111#discussion_r124165139
--- Diff:
gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java
---
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.aerospike.store;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Properties;
+
+import com.aerospike.client.Key;
+import com.aerospike.client.Value;
+import com.aerospike.client.Bin;
+import com.aerospike.client.Record;
+import com.aerospike.client.AerospikeClient;
+import com.aerospike.client.policy.ClientPolicy;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.util.Utf8;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.DirtyListWrapper;
+import org.apache.gora.persistency.impl.DirtyMapWrapper;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.query.PartitionQuery;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.store.impl.DataStoreBase;
+import org.apache.gora.util.AvroUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of a Aerospike data store to be used by gora.
+ *
+ * @param <K> class to be used for the key
+ * @param <T> class to be persisted within the store
+ */
+public class AerospikeStore<K, T extends PersistentBase> extends
DataStoreBase<K, T> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AerospikeStore.class);
+
+ private static final String PARSE_MAPPING_FILE_KEY =
"gora.aerospike.mapping.file";
+
+ private static final String DEFAULT_MAPPING_FILE =
"gora-aerospike-mapping.xml";
+
+ private AerospikeClient aerospikeClient;
+
+ private AerospikeParameters aerospikeParameters;
+
+ /**
+ * {@inheritDoc}
+ * In initializing the aerospike datastore, read the mapping file, sets
the basic
+ * aerospike specific parameters and creates the client with the user
defined policies
+ *
+ * @param keyClass key class
+ * @param persistentClass persistent class
+ * @param properties properties
+ */
+ @Override
+ public void initialize(Class<K> keyClass, Class<T> persistentClass,
Properties properties) {
+ super.initialize(keyClass, persistentClass, properties);
+
+ AerospikeMappingBuilder aerospikeMappingBuilder = new
AerospikeMappingBuilder();
+ aerospikeMappingBuilder
+ .readMappingFile(getConf().get(PARSE_MAPPING_FILE_KEY,
DEFAULT_MAPPING_FILE), keyClass,
+ persistentClass);
+ aerospikeParameters = new
AerospikeParameters(aerospikeMappingBuilder.getAerospikeMapping(),
+ properties);
+ ClientPolicy policy = new ClientPolicy();
+ policy.writePolicyDefault =
aerospikeParameters.getAerospikeMapping().getWritePolicy();
+ policy.readPolicyDefault =
aerospikeParameters.getAerospikeMapping().getReadPolicy();
+
+ aerospikeClient = new AerospikeClient(aerospikeParameters.getHost(),
+ aerospikeParameters.getPort());
+ aerospikeParameters.setServerSpecificParameters(aerospikeClient);
+
aerospikeParameters.validateServerBinConfiguration(persistentClass.getFields());
+ LOG.info("Aerospike Gora datastore initialized successfully.");
+ }
+
+ /**
+ * Aerospike, being a schemaless database does not support explicit
schema creation through the
+ * provided java client. When the records are added to the database, the
schema is created on
+ * the fly. Thus, schema related functionality is unavailable in
gora-aerospike module.
+ *
+ * @return null
+ */
+ @Override
+ public String getSchemaName() {
+ return null;
+ }
+
+ /**
+ * Aerospike, being a schemaless database does not support explicit
schema creation through the
+ * provided java client. When the records are added to the database, the
schema is created on
+ * the fly. Thus, schema creation functionality is unavailable in
gora-aerospike module.
+ */
+ @Override
+ public void createSchema() {
+ }
+
+ /**
+ * Aerospike, being a schemaless database does not support explicit
schema creation through the
+ * provided java client. When the records are added to the database, the
schema is created on
+ * the fly. Thus, schema deletion functionality is unavailable in
gora-aerospike module.
+ */
+ @Override
+ public void deleteSchema() {
+ }
+
+ /**
+ * Aerospike, being a schemaless database does not support explicit
schema creation through the
+ * provided java client. When the records are added to the database, the
schema is created on
+ * the fly. Thus, schema exists functionality is unavailable in
gora-aerospike module.
+ */
+ @Override
+ public boolean schemaExists() {
+ return true;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @param key the key of the object
+ * @param fields the fields required in the object. Pass null, to
retrieve all fields
+ * @return the Object corresponding to the key or null if it cannot be
found
+ */
+ @Override
+ public T get(K key, String[] fields) {
+
+ Key recordKey = getAerospikeKey(key);
+ fields = getFieldsToQuery(fields);
+
+ Record record = aerospikeClient
+
.get(aerospikeParameters.getAerospikeMapping().getReadPolicy(), recordKey,
fields);
+ if (record == null) {
+ return null;
+ }
+ return createPersistentInstance(record, fields);
+ }
+
+ /**
+ * Method to insert the persistent objects with the given key to the
aerospike database server.
+ * In writing the records, the policy defined in the mapping file is
used to decide on the
+ * behaviour of transaction handling.
+ *
+ * @param key key of the object
+ * @param persistent object to be persisted
+ */
+ @Override
+ public void put(K key, T persistent) {
+
+ Key recordKey = getAerospikeKey(key);
+
+ List<Field> fields = persistent.getSchema().getFields();
+ for (int i = 0; i < fields.size(); i++) {
+ Object persistentValue = persistent.get(i);
+ if (persistentValue != null) {
+ String mappingBinName =
aerospikeParameters.getAerospikeMapping().getBinMapping()
+ .get(fields.get(i).name());
+ if (mappingBinName == null) {
+ LOG.error(
+ "Aerospike mapping for field {}#{} not found. Wrong
gora-aerospike-mapping.xml?",
+ persistent.getClass().getName(), fields.get(i).name());
+ throw new RuntimeException(
+ "Aerospike mapping for field [" +
persistent.getClass().getName() + "#" + fields
+ .get(i).name() + "] not found. Wrong
gora-aerospike-mapping.xml?");
+ }
+ Bin bin = new Bin(mappingBinName,
+ getSerializableValue(persistentValue,
fields.get(i).schema()));
+ aerospikeClient
+
.put(aerospikeParameters.getAerospikeMapping().getWritePolicy(), recordKey,
bin);
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @param key the key of the object
+ * @return whether the object was successfully deleted
+ */
+ @Override
+ public boolean delete(K key) {
+ Key recordKey = getAerospikeKey(key);
+ return aerospikeClient
+
.delete(aerospikeParameters.getAerospikeMapping().getWritePolicy(), recordKey);
+ }
+
+ @Override
+ public long deleteByQuery(Query<K, T> query) {
+ return 0;
+ }
+
+ @Override
+ public Result<K, T> execute(Query<K, T> query) {
+ return null;
+ }
+
+ @Override
+ public Query<K, T> newQuery() {
+ return null;
+ }
+
+ @Override
+ public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
throws IOException {
+ return null;
+ }
+
+ @Override
+ public void flush() {
+ }
+
+ /**
+ * Method to close aerospike client connections to database server nodes
+ */
+ @Override
+ public void close() {
+ aerospikeClient.close();
+ LOG.info("Aerospike Gora datastore destroyed successfully.");
+ }
+
+ /**
+ * Method to get the aerospike key from the provided K
+ *
+ * @param key persistent key
+ * @return aerospike key for the record
+ */
+ public Key getAerospikeKey(K key) {
+ Value keyValue;
+ if (keyClass.getSimpleName().equalsIgnoreCase("string")) {
+ keyValue = Value.get(key.toString());
+ } else {
+ keyValue = Value.get(key);
+ }
+
+ return new
Key(aerospikeParameters.getAerospikeMapping().getNamespace(),
+ aerospikeParameters.getAerospikeMapping().getSet(), keyValue);
+ }
+
+ /**
+ * Method to get the value serializable in database from the Avro
persistent object
+ *
+ * @param object persistent object
+ * @param schema schema of the persistent object
+ * @return serializable value
+ */
+ private Value getSerializableValue(Object object, Schema schema) {
+
+ Value value = null;
+ switch (schema.getType()) {
+ case UNION:
+ if (object != null) {
+ int schemaPos = getUnionSchema(object, schema);
+ Schema unionSchema = schema.getTypes().get(schemaPos);
+ value = getSerializableValue(object, unionSchema);
+ }
+ break;
+ case STRING:
+ value = Value.get(object.toString());
+ break;
+ case BYTES:
+ value = Value.get(((ByteBuffer) object).array());
+ break;
+ case MAP:
+ Map<Object, Object> newMap = new HashMap<>();
+ Map<?, ?> fieldMap = (Map<?, ?>) object;
+ for (Object key : fieldMap.keySet()) {
+ newMap.put(key.toString(),
+ getSerializableValue(fieldMap.get(key),
schema.getValueType()));
+ }
+ value = Value.get(newMap);
+ break;
+ case ARRAY:
+ List<Object> objectList = new ArrayList<>();
+ for (Object obj : (List<Object>) object) {
+ objectList.add(getSerializableValue(obj,
schema.getElementType()));
+ }
+ value = Value.get(objectList);
+ break;
+ default:
+ value = Value.get(object);
+ break;
+ }
+ return value;
+ }
+
+ /**
+ * Method to create a persistent object given the retrieved record
+ * from Aerospike database
+ *
+ * @param record record retrieved from database
+ * @param fields fields
+ * @return persistent object created
+ */
+ private T createPersistentInstance(Record record, String[] fields) {
+
+ T persistent = newPersistent();
+ for (String field : fields) {
+ setPersistentField(field, record, persistent);
+ }
+ return persistent;
+ }
+
+ /**
+ * Method to set a field in the persistent object
+ *
+ * @param fieldName field name
+ * @param record record retrieved from database
+ * @param persistent persistent object for the field to be set
+ */
+ private void setPersistentField(String fieldName, Record record, T
persistent) {
+
+ String binName =
aerospikeParameters.getAerospikeMapping().getBinName(fieldName);
+ if (binName == null) {
+ LOG.error("Aerospike mapping for field {} not found. Wrong
gora-aerospike-mapping.xml",
+ fieldName);
+ throw new RuntimeException("Aerospike mapping for field [" +
fieldName + "] not found. "
+ + "Wrong gora-aerospike-mapping.xml?");
+ }
+ if (record.bins.get(fieldName) == null) {
+ return;
+ }
+ String binDataType =
record.bins.get(fieldName).getClass().getSimpleName();
+ Object binValue = record.bins.get(binName);
+
+ persistent.put(fieldName,
+ getDeserializedObject(binValue, binDataType,
fieldMap.get(fieldName).schema()));
+ }
+
+ /**
+ * Method to get Avro mapped persistent object from the record retrieved
from the database
+ *
+ * @param binValue value retrieved from database
+ * @param binDataType data type of the database value
+ * @param schema corresponding schema in the persistent class
+ * @return persistent object
+ */
+ private Object getDeserializedObject(Object binValue, String
binDataType, Schema schema) {
+
+ Object result;
+ switch (schema.getType()) {
+
+ case MAP:
+ Map<String, Object> rawMap = (Map<String, Object>) binValue;
+ Map<Utf8, Object> deserializableMap = new HashMap<>();
+ if (rawMap == null) {
+ result = new DirtyMapWrapper(deserializableMap);
+ break;
+ }
+ for (Map.Entry<?, ?> e : rawMap.entrySet()) {
+ Schema innerSchema = schema.getValueType();
+ Object obj = getDeserializedObject(e.getValue(),
e.getValue().getClass().getSimpleName(),
+ innerSchema);
+ if
(e.getKey().getClass().getSimpleName().equalsIgnoreCase("Utf8")) {
+ deserializableMap.put((Utf8) e.getKey(), obj);
+ } else {
+ deserializableMap.put(new Utf8((String) e.getKey()), obj);
+ }
+ }
+ result = new DirtyMapWrapper<>(deserializableMap);
+ break;
+
+ case ARRAY:
+ List<Object> rawList = (List<Object>) binValue;
+ List<Object> deserializableList = new ArrayList<>();
+ if (rawList == null) {
+ return new DirtyListWrapper(deserializableList);
+ }
+ for (Object item : rawList) {
+ Object obj = getDeserializedObject(item,
item.getClass().getSimpleName(),
+ schema.getElementType());
+ deserializableList.add(obj);
+ }
+ result = new DirtyListWrapper<>(deserializableList);
+ break;
+
+ case RECORD:
+ result = (PersistentBase) binValue;
+ break;
+
+ case UNION:
+ int index = getUnionSchema(binValue, schema);
+ Schema resolvedSchema = schema.getTypes().get(index);
+ result = getDeserializedObject(binValue, binDataType,
resolvedSchema);
+ break;
+
+ case ENUM:
+ result = AvroUtils.getEnumValue(schema, (String) binValue);
+ break;
+
+ case BYTES:
+ result = ByteBuffer.wrap((byte[]) binValue);
+ break;
+
+ case STRING:
+ if (binValue instanceof org.apache.avro.util.Utf8)
+ result = binValue;
+ else
+ result = new Utf8((String) binValue);
+ break;
+
+ case INT:
+ if (binDataType.equalsIgnoreCase("long")) {
+ result = Math.toIntExact((Long) binValue);
+ } else {
+ result = binValue;
+ }
+ break;
+
+ default:
+ result = binValue;
+ }
+ return result;
+ }
+
+ /**
+ * Method to retrieve the corresponding schema type index of a
particular object having UNION
+ * schema. As UNION type can have one or more types and at a given
instance, it holds an object
+ * of only one type of the defined types, this method is used to figure
out the corresponding
+ * instance's
+ * schema type index.
+ *
+ * @param instanceValue value that the object holds
+ * @param unionSchema union schema containing all of the data types
+ * @return the unionSchemaPosition corresponding schema position
+ */
+ private int getUnionSchema(Object instanceValue, Schema unionSchema) {
+ int unionSchemaPos = 0;
+ for (Schema currentSchema : unionSchema.getTypes()) {
+ Schema.Type schemaType = currentSchema.getType();
+ if (instanceValue instanceof CharSequence &&
schemaType.equals(Schema.Type.STRING))
+ return unionSchemaPos;
+ else if (instanceValue instanceof ByteBuffer &&
schemaType.equals(Schema.Type.BYTES))
--- End diff --
`else` after a `return` statement?
> Implement Aerospike Datastore
> -----------------------------
>
> Key: GORA-502
> URL: https://issues.apache.org/jira/browse/GORA-502
> Project: Apache Gora
> Issue Type: New Feature
> Components: gora-aerospike, storage
> Affects Versions: 0.6.1
> Reporter: Cihad Guzel
> Assignee: Nishadi Kirielle
> Labels: gsoc2017
> Fix For: 0.8
>
>
> Aerospike is a NoSQL database solution for real-time operational
> applications, delivering predictable performance at scale, superior uptime,
> and high availability at the lowest TCO compared to first-generation NoSQL
> and relational databases. It could be nice to support Aerospike as a
> datastore at Gora. Aerospike uses Apache v2 license for Java client and uses
> aGPL license for Aerospike Server Community Edition.
> (http://www.aerospike.com/products/)
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)