[
https://issues.apache.org/jira/browse/GORA-502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16064169#comment-16064169
]
ASF GitHub Bot commented on GORA-502:
-------------------------------------
Github user kamaci commented on a diff in the pull request:
https://github.com/apache/gora/pull/111#discussion_r124167624
--- Diff:
gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java
---
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.aerospike.store;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Properties;
+
+import com.aerospike.client.Key;
+import com.aerospike.client.Value;
+import com.aerospike.client.Bin;
+import com.aerospike.client.Record;
+import com.aerospike.client.AerospikeClient;
+import com.aerospike.client.policy.ClientPolicy;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.util.Utf8;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.DirtyListWrapper;
+import org.apache.gora.persistency.impl.DirtyMapWrapper;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.query.PartitionQuery;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.store.impl.DataStoreBase;
+import org.apache.gora.util.AvroUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of a Aerospike data store to be used by gora.
+ *
+ * @param <K> class to be used for the key
+ * @param <T> class to be persisted within the store
+ */
+public class AerospikeStore<K, T extends PersistentBase> extends
DataStoreBase<K, T> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AerospikeStore.class);
+
+ private static final String PARSE_MAPPING_FILE_KEY =
"gora.aerospike.mapping.file";
+
+ private static final String DEFAULT_MAPPING_FILE =
"gora-aerospike-mapping.xml";
+
+ private AerospikeClient aerospikeClient;
+
+ private AerospikeParameters aerospikeParameters;
+
+ /**
+ * {@inheritDoc}
+ * In initializing the aerospike datastore, read the mapping file, sets
the basic
+ * aerospike specific parameters and creates the client with the user
defined policies
+ *
+ * @param keyClass key class
+ * @param persistentClass persistent class
+ * @param properties properties
+ */
+ @Override
+ public void initialize(Class<K> keyClass, Class<T> persistentClass,
Properties properties) {
+ super.initialize(keyClass, persistentClass, properties);
+
+ AerospikeMappingBuilder aerospikeMappingBuilder = new
AerospikeMappingBuilder();
+ aerospikeMappingBuilder
+ .readMappingFile(getConf().get(PARSE_MAPPING_FILE_KEY,
DEFAULT_MAPPING_FILE), keyClass,
+ persistentClass);
+ aerospikeParameters = new
AerospikeParameters(aerospikeMappingBuilder.getAerospikeMapping(),
+ properties);
+ ClientPolicy policy = new ClientPolicy();
+ policy.writePolicyDefault =
aerospikeParameters.getAerospikeMapping().getWritePolicy();
+ policy.readPolicyDefault =
aerospikeParameters.getAerospikeMapping().getReadPolicy();
+
+ aerospikeClient = new AerospikeClient(aerospikeParameters.getHost(),
+ aerospikeParameters.getPort());
+ aerospikeParameters.setServerSpecificParameters(aerospikeClient);
+
aerospikeParameters.validateServerBinConfiguration(persistentClass.getFields());
+ LOG.info("Aerospike Gora datastore initialized successfully.");
+ }
+
+ /**
+ * Aerospike, being a schemaless database does not support explicit
schema creation through the
+ * provided java client. When the records are added to the database, the
schema is created on
+ * the fly. Thus, schema related functionality is unavailable in
gora-aerospike module.
+ *
+ * @return null
+ */
+ @Override
+ public String getSchemaName() {
+ return null;
+ }
+
+ /**
+ * Aerospike, being a schemaless database does not support explicit
schema creation through the
+ * provided java client. When the records are added to the database, the
schema is created on
+ * the fly. Thus, schema creation functionality is unavailable in
gora-aerospike module.
+ */
+ @Override
+ public void createSchema() {
+ }
+
+ /**
+ * Aerospike, being a schemaless database does not support explicit
schema creation through the
+ * provided java client. When the records are added to the database, the
schema is created on
+ * the fly. Thus, schema deletion functionality is unavailable in
gora-aerospike module.
+ */
+ @Override
+ public void deleteSchema() {
+ }
+
+ /**
+ * Aerospike, being a schemaless database does not support explicit
schema creation through the
+ * provided java client. When the records are added to the database, the
schema is created on
+ * the fly. Thus, schema exists functionality is unavailable in
gora-aerospike module.
+ */
+ @Override
+ public boolean schemaExists() {
+ return true;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @param key the key of the object
+ * @param fields the fields required in the object. Pass null, to
retrieve all fields
+ * @return the Object corresponding to the key or null if it cannot be
found
+ */
+ @Override
+ public T get(K key, String[] fields) {
+
+ Key recordKey = getAerospikeKey(key);
+ fields = getFieldsToQuery(fields);
+
+ Record record = aerospikeClient
+
.get(aerospikeParameters.getAerospikeMapping().getReadPolicy(), recordKey,
fields);
+ if (record == null) {
+ return null;
+ }
+ return createPersistentInstance(record, fields);
+ }
+
+ /**
+ * Method to insert the persistent objects with the given key to the
aerospike database server.
+ * In writing the records, the policy defined in the mapping file is
used to decide on the
+ * behaviour of transaction handling.
+ *
+ * @param key key of the object
+ * @param persistent object to be persisted
+ */
+ @Override
+ public void put(K key, T persistent) {
+
+ Key recordKey = getAerospikeKey(key);
+
+ List<Field> fields = persistent.getSchema().getFields();
+ for (int i = 0; i < fields.size(); i++) {
+ Object persistentValue = persistent.get(i);
+ if (persistentValue != null) {
+ String mappingBinName =
aerospikeParameters.getAerospikeMapping().getBinMapping()
+ .get(fields.get(i).name());
+ if (mappingBinName == null) {
+ LOG.error(
+ "Aerospike mapping for field {}#{} not found. Wrong
gora-aerospike-mapping.xml?",
+ persistent.getClass().getName(), fields.get(i).name());
+ throw new RuntimeException(
+ "Aerospike mapping for field [" +
persistent.getClass().getName() + "#" + fields
+ .get(i).name() + "] not found. Wrong
gora-aerospike-mapping.xml?");
+ }
+ Bin bin = new Bin(mappingBinName,
+ getSerializableValue(persistentValue,
fields.get(i).schema()));
+ aerospikeClient
+
.put(aerospikeParameters.getAerospikeMapping().getWritePolicy(), recordKey,
bin);
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @param key the key of the object
+ * @return whether the object was successfully deleted
+ */
+ @Override
+ public boolean delete(K key) {
+ Key recordKey = getAerospikeKey(key);
+ return aerospikeClient
+
.delete(aerospikeParameters.getAerospikeMapping().getWritePolicy(), recordKey);
+ }
+
+ @Override
+ public long deleteByQuery(Query<K, T> query) {
+ return 0;
+ }
+
+ @Override
+ public Result<K, T> execute(Query<K, T> query) {
+ return null;
+ }
+
+ @Override
+ public Query<K, T> newQuery() {
+ return null;
+ }
+
+ @Override
+ public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
throws IOException {
+ return null;
+ }
+
+ @Override
+ public void flush() {
+ }
+
+ /**
+ * Method to close aerospike client connections to database server nodes
+ */
+ @Override
+ public void close() {
+ aerospikeClient.close();
+ LOG.info("Aerospike Gora datastore destroyed successfully.");
+ }
+
+ /**
+ * Method to get the aerospike key from the provided K
+ *
+ * @param key persistent key
+ * @return aerospike key for the record
+ */
+ public Key getAerospikeKey(K key) {
+ Value keyValue;
+ if (keyClass.getSimpleName().equalsIgnoreCase("string")) {
+ keyValue = Value.get(key.toString());
+ } else {
+ keyValue = Value.get(key);
+ }
+
+ return new
Key(aerospikeParameters.getAerospikeMapping().getNamespace(),
+ aerospikeParameters.getAerospikeMapping().getSet(), keyValue);
+ }
+
+ /**
+ * Method to get the value serializable in database from the Avro
persistent object
+ *
+ * @param object persistent object
+ * @param schema schema of the persistent object
+ * @return serializable value
+ */
+ private Value getSerializableValue(Object object, Schema schema) {
+
+ Value value = null;
+ switch (schema.getType()) {
+ case UNION:
+ if (object != null) {
+ int schemaPos = getUnionSchema(object, schema);
+ Schema unionSchema = schema.getTypes().get(schemaPos);
+ value = getSerializableValue(object, unionSchema);
+ }
+ break;
+ case STRING:
+ value = Value.get(object.toString());
+ break;
+ case BYTES:
+ value = Value.get(((ByteBuffer) object).array());
+ break;
+ case MAP:
+ Map<Object, Object> newMap = new HashMap<>();
+ Map<?, ?> fieldMap = (Map<?, ?>) object;
+ for (Object key : fieldMap.keySet()) {
--- End diff --
You can iterate over the `entrySet`.
> Implement Aerospike Datastore
> -----------------------------
>
> Key: GORA-502
> URL: https://issues.apache.org/jira/browse/GORA-502
> Project: Apache Gora
> Issue Type: New Feature
> Components: gora-aerospike, storage
> Affects Versions: 0.6.1
> Reporter: Cihad Guzel
> Assignee: Nishadi Kirielle
> Labels: gsoc2017
> Fix For: 0.8
>
>
> Aerospike is a NoSQL database solution for real-time operational
> applications, delivering predictable performance at scale, superior uptime,
> and high availability at the lowest TCO compared to first-generation NoSQL
> and relational databases. It could be nice to support Aerospike as a
> datastore at Gora. Aerospike uses Apache v2 license for Java client and uses
> aGPL license for Aerospike Server Community Edition.
> (http://www.aerospike.com/products/)
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)