http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/KeySpace.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/KeySpace.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/KeySpace.java index 6f8284d..7deb49a 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/KeySpace.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/KeySpace.java @@ -25,43 +25,30 @@ import java.util.Map; */ public class KeySpace { - public enum PlacementStrategy { - SimpleStrategy, - NetworkTopologyStrategy, - } - private String name; - private PlacementStrategy placementStrategy; - private boolean durableWritesEnabled; - private int replicationFactor; - private Map<String, Integer> dataCenters; public String getName() { return name; } - public boolean isDurableWritesEnabled() { - return durableWritesEnabled; - } - - public PlacementStrategy getPlacementStrategy() { - return placementStrategy; + public void setName(String name) { + this.name = name; } - public int getReplicationFactor() { - return replicationFactor; + public boolean isDurableWritesEnabled() { + return durableWritesEnabled; } - public Map<String, Integer> getDataCenters() { - return dataCenters; + public void setDurableWritesEnabled(boolean durableWritesEnabled) { + this.durableWritesEnabled = durableWritesEnabled; } - public void addDataCenter(String key, Integer value) { - this.dataCenters.put(key, value); + public PlacementStrategy getPlacementStrategy() { + return placementStrategy; } public void setPlacementStrategy(PlacementStrategy placementStrategy) { @@ -71,15 +58,24 @@ public class KeySpace { } } + public int getReplicationFactor() { + return replicationFactor; + } + public void setReplicationFactor(int replicationFactor) { this.replicationFactor = replicationFactor; } - public void setName(String name) { - this.name = name; + public Map<String, Integer> getDataCenters() { + return dataCenters; } - public void setDurableWritesEnabled(boolean durableWritesEnabled) { - this.durableWritesEnabled = durableWritesEnabled; + public void addDataCenter(String key, Integer value) { + this.dataCenters.put(key, value); + } + + public enum PlacementStrategy { + SimpleStrategy, + NetworkTopologyStrategy, } }
http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/PartitionKeyField.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/PartitionKeyField.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/PartitionKeyField.java index a0d9c4c..3aa4e36 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/PartitionKeyField.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/PartitionKeyField.java @@ -35,7 +35,7 @@ public class PartitionKeyField extends Field { public void setComposite(boolean composite) { isComposite = composite; - if(isComposite && fields == null) { + if (isComposite && fields == null) { fields = new ArrayList<>(); } } http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/package-info.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/package-info.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/package-info.java index 5247ecc..3ad9186 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/package-info.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/package-info.java @@ -5,15 +5,16 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + /** * This package contains Casandra datastore related all classes. */ http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/persistent/CassandraNativePersistent.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/persistent/CassandraNativePersistent.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/persistent/CassandraNativePersistent.java index bd17dcd..9d6e103 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/persistent/CassandraNativePersistent.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/persistent/CassandraNativePersistent.java @@ -21,7 +21,6 @@ import com.datastax.driver.mapping.annotations.Transient; import org.apache.avro.Schema; import org.apache.gora.persistency.Persistent; import org.apache.gora.persistency.Tombstone; -import org.apache.gora.persistency.impl.PersistentBase; import java.util.List; @@ -61,12 +60,6 @@ public abstract class CassandraNativePersistent implements Persistent { @Transient @Override - public void setDirty(String field) { - - } - - @Transient - @Override public void clearDirty(int fieldIndex) { } @@ -103,6 +96,12 @@ public abstract class CassandraNativePersistent implements Persistent { @Transient @Override + public void setDirty(String field) { + + } + + @Transient + @Override public void clearDirty() { } http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java index d6ba99c..1479686 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java @@ -70,7 +70,7 @@ public class CassandraQuery<K, T extends Persistent> extends QueryWSBase<K, T> { @Override public String[] getFields() { - if(updateFields.size() == 0) { + if (updateFields.size() == 0) { return super.getFields(); } else { String[] updateFieldsArray = new String[updateFields.size()]; http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java index 7ab3726..c3b2e59 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,16 +19,13 @@ package org.apache.gora.cassandra.query; import org.apache.gora.persistency.Persistent; -import org.apache.gora.persistency.impl.PersistentBase; import org.apache.gora.query.Query; import org.apache.gora.query.impl.ResultBase; import org.apache.gora.store.DataStore; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; /** * CassandraResult specific implementation of the {@link org.apache.gora.query.Result} @@ -45,7 +42,6 @@ public class CassandraResultSet<K, T extends Persistent> extends ResultBase<K, T private int position = 0; /** - * * @param dataStore * @param query */ @@ -54,34 +50,37 @@ public class CassandraResultSet<K, T extends Persistent> extends ResultBase<K, T } /** - *{@inheritDoc} + * {@inheritDoc} + * * @return * @throws IOException */ @Override protected boolean nextInner() throws IOException { - if(offset < size) { + if (offset < size) { persistent = persistentObject.get(position); key = persistentKey.get(position); - position ++; + position++; return true; } return false; } /** - *{@inheritDoc} + * {@inheritDoc} + * * @return * @throws IOException * @throws InterruptedException */ @Override public float getProgress() throws IOException, InterruptedException { - return ((float)position)/size; + return ((float) position) / size; } /** - *{@inheritDoc} + * {@inheritDoc} + * * @return */ @Override @@ -91,6 +90,7 @@ public class CassandraResultSet<K, T extends Persistent> extends ResultBase<K, T /** * {@inheritDoc} + * * @return */ @Override @@ -99,7 +99,6 @@ public class CassandraResultSet<K, T extends Persistent> extends ResultBase<K, T } /** - * * @param key * @param token */ http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/package-info.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/package-info.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/package-info.java index 49faefa..275c8d9 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/package-info.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/package-info.java @@ -5,15 +5,16 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + /** * This package contains all the Cassandra store query representation class as well as Result set representing class * when query is executed over the Cassandra dataStore. http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java index 5383949..7baa1b1 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java @@ -24,6 +24,7 @@ import org.apache.avro.util.Utf8; import org.apache.gora.cassandra.bean.CassandraKey; import org.apache.gora.cassandra.bean.Field; import org.apache.gora.cassandra.store.CassandraMapping; +import org.apache.gora.hbase.util.HBaseByteInterface; import org.apache.gora.persistency.Persistent; import org.apache.gora.persistency.impl.DirtyListWrapper; import org.apache.gora.persistency.impl.DirtyMapWrapper; @@ -31,6 +32,7 @@ import org.apache.gora.persistency.impl.PersistentBase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; @@ -52,32 +54,26 @@ class AvroCassandraUtils { static void processKeys(CassandraMapping cassandraMapping, Object key, List<String> keys, List<Object> values) { CassandraKey cassandraKey = cassandraMapping.getCassandraKey(); - if (cassandraMapping.isPartitionKeyDefined()) { - if (cassandraKey != null) { - if (key instanceof PersistentBase) { - PersistentBase keyBase = (PersistentBase) key; - for (Schema.Field field : keyBase.getSchema().getFields()) { - if (cassandraMapping.getFieldFromFieldName(field.name()) != null) { - keys.add(field.name()); - Object value = keyBase.get(field.pos()); - value = getFieldValueFromAvroBean(field.schema(), field.schema().getType(), value); - values.add(value); - } else { - LOG.debug("Ignoring field {}, Since field couldn't find in the {} mapping", new Object[]{field.name(), cassandraMapping.getPersistentClass()}); - } + if (cassandraKey != null) { + if (key instanceof PersistentBase) { + PersistentBase keyBase = (PersistentBase) key; + for (Schema.Field field : keyBase.getSchema().getFields()) { + Field mappedField = cassandraKey.getFieldFromFieldName(field.name()); + if (mappedField != null) { + keys.add(field.name()); + Object value = keyBase.get(field.pos()); + value = getFieldValueFromAvroBean(field.schema(), field.schema().getType(), value, mappedField); + values.add(value); + } else { + LOG.debug("Ignoring field {}, Since field couldn't find in the {} mapping", new Object[]{field.name(), cassandraMapping.getPersistentClass()}); } - } else { - LOG.error("Key bean isn't extended by {} .", new Object[]{cassandraMapping.getKeyClass(), PersistentBase.class}); } } else { - for (Field field : cassandraMapping.getInlinedDefinedPartitionKeys()) { - keys.add(field.getFieldName()); - values.add(key); - } + LOG.error("Key bean isn't extended by {} .", new Object[]{cassandraMapping.getKeyClass(), PersistentBase.class}); } } else { - keys.add(cassandraMapping.getDefaultCassandraKey().getFieldName()); - values.add(key.toString()); + keys.add(cassandraMapping.getInlinedDefinedPartitionKey().getFieldName()); + values.add(key); } } @@ -91,21 +87,33 @@ class AvroCassandraUtils { * @param fieldValue the field value. * @return field value */ - static Object getFieldValueFromAvroBean(Schema fieldSchema, Schema.Type type, Object fieldValue) { + static Object getFieldValueFromAvroBean(Schema fieldSchema, Schema.Type type, Object fieldValue, Field field) { switch (type) { + // Record can be persist with two ways, udt and bytes case RECORD: PersistentBase persistent = (PersistentBase) fieldValue; - PersistentBase newRecord = (PersistentBase) SpecificData.get().newRecord(persistent, persistent.getSchema()); - for (Schema.Field member : fieldSchema.getFields()) { - if (member.pos() == 0 || !persistent.isDirty()) { - continue; + if (field.getType().contains("frozen")) { + PersistentBase newRecord = (PersistentBase) SpecificData.get().newRecord(persistent, persistent.getSchema()); + for (Schema.Field member : fieldSchema.getFields()) { + if (member.pos() == 0 || !persistent.isDirty()) { + continue; + } + Schema memberSchema = member.schema(); + Schema.Type memberType = memberSchema.getType(); + Object memberValue = persistent.get(member.pos()); + newRecord.put(member.pos(), getFieldValueFromAvroBean(memberSchema, memberType, memberValue, field)); + } + fieldValue = newRecord; + } else if (field.getType().contains("blob")) { + try { + byte[] serializedBytes = HBaseByteInterface.toBytes(fieldValue, fieldSchema); + fieldValue = ByteBuffer.wrap(serializedBytes); + } catch (IOException e) { + LOG.error("Error occurred when serializing {} field. {}", new Object[]{field.getFieldName(), e.getMessage()}); } - Schema memberSchema = member.schema(); - Schema.Type memberType = memberSchema.getType(); - Object memberValue = persistent.get(member.pos()); - newRecord.put(member.pos(), getFieldValueFromAvroBean(memberSchema, memberType, memberValue)); + } else { + throw new RuntimeException(""); } - fieldValue = newRecord; break; case MAP: Schema valueSchema = fieldSchema.getValueType(); @@ -114,7 +122,7 @@ class AvroCassandraUtils { for (Map.Entry<CharSequence, ?> e : ((Map<CharSequence, ?>) fieldValue).entrySet()) { String mapKey = e.getKey().toString(); Object mapValue = e.getValue(); - mapValue = getFieldValueFromAvroBean(valueSchema, valuetype, mapValue); + mapValue = getFieldValueFromAvroBean(valueSchema, valuetype, mapValue, field); map.put(mapKey, mapValue); } fieldValue = map; @@ -124,7 +132,7 @@ class AvroCassandraUtils { valuetype = valueSchema.getType(); ArrayList<Object> list = new ArrayList<>(); for (Object item : (Collection<?>) fieldValue) { - Object value = getFieldValueFromAvroBean(valueSchema, valuetype, item); + Object value = getFieldValueFromAvroBean(valueSchema, valuetype, item, field); list.add(value); } fieldValue = list; @@ -136,7 +144,7 @@ class AvroCassandraUtils { int schemaPos = getUnionSchema(fieldValue, fieldSchema); Schema unionSchema = fieldSchema.getTypes().get(schemaPos); Schema.Type unionType = unionSchema.getType(); - fieldValue = getFieldValueFromAvroBean(unionSchema, unionType, fieldValue); + fieldValue = getFieldValueFromAvroBean(unionSchema, unionType, fieldValue, field); } break; case STRING: @@ -154,7 +162,7 @@ class AvroCassandraUtils { * If no data type can be inferred then we return a default value * of position 0. * - * @param pValue Object + * @param pValue Object * @param pUnionSchema avro Schema * @return the unionSchemaPosition. */ @@ -183,74 +191,84 @@ class AvroCassandraUtils { return unionSchemaPos; else if (pValue instanceof Persistent && schemaType.equals(Schema.Type.RECORD)) return unionSchemaPos; + else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.STRING)) + return unionSchemaPos; + else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.INT)) + return unionSchemaPos; + else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.LONG)) + return unionSchemaPos; + else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.DOUBLE)) + return unionSchemaPos; + else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.FLOAT)) + return unionSchemaPos; + else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.BOOLEAN)) + return unionSchemaPos; + else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.MAP)) + return unionSchemaPos; + else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.ARRAY)) + return unionSchemaPos; + else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.ENUM)) + return unionSchemaPos; + else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.FIXED)) + return unionSchemaPos; + else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.RECORD)) + return unionSchemaPos; unionSchemaPos++; } // if we weren't able to determine which data type it is, then we return the default return DEFAULT_UNION_SCHEMA; } - static String encodeFieldKey(final String key) { - if (key == null) { - return null; - } - return key.replace(".", "\u00B7") - .replace(":", "\u00FF") - .replace(";", "\u00FE") - .replace(" ", "\u00FD") - .replace("%", "\u00FC") - .replace("=", "\u00FB"); - } - - static String decodeFieldKey(final String key) { - if (key == null) { - return null; - } - return key.replace("\u00B7", ".") - .replace("\u00FF", ":") - .replace("\u00FE", ";") - .replace("\u00FD", " ") - .replace("\u00FC", "%") - .replace("\u00FB", "="); - } - static Object getAvroFieldValue(Object value, Schema schema) { Object result; switch (schema.getType()) { case MAP: Map<String, Object> rawMap = (Map<String, Object>) value; - Map<Utf8, Object> deserializableMap = new HashMap<>(); + Map<Utf8, Object> utf8ObjectHashMap = new HashMap<>(); if (rawMap == null) { - result = new DirtyMapWrapper(deserializableMap); + result = new DirtyMapWrapper(utf8ObjectHashMap); break; } for (Map.Entry<?, ?> e : rawMap.entrySet()) { Schema innerSchema = schema.getValueType(); Object obj = getAvroFieldValue(e.getValue(), innerSchema); if (e.getKey().getClass().getSimpleName().equalsIgnoreCase("Utf8")) { - deserializableMap.put((Utf8) e.getKey(), obj); + utf8ObjectHashMap.put((Utf8) e.getKey(), obj); } else { - deserializableMap.put(new Utf8((String) e.getKey()), obj); + utf8ObjectHashMap.put(new Utf8((String) e.getKey()), obj); } } - result = new DirtyMapWrapper<>(deserializableMap); + result = new DirtyMapWrapper<>(utf8ObjectHashMap); break; case ARRAY: List<Object> rawList = (List<Object>) value; - List<Object> deserializableList = new ArrayList<>(); + List<Object> objectArrayList = new ArrayList<>(); if (rawList == null) { - return new DirtyListWrapper(deserializableList); + return new DirtyListWrapper(objectArrayList); } for (Object item : rawList) { Object obj = getAvroFieldValue(item, schema.getElementType()); - deserializableList.add(obj); + objectArrayList.add(obj); } - result = new DirtyListWrapper<>(deserializableList); + result = new DirtyListWrapper<>(objectArrayList); break; case RECORD: - result = (PersistentBase) value; + if (value != null && ByteBuffer.class.isAssignableFrom(value.getClass())) { + ByteBuffer buffer = (ByteBuffer) value; + byte[] arr = new byte[buffer.remaining()]; + buffer.get(arr); + try { + result = (PersistentBase) HBaseByteInterface.fromBytes(schema, arr); + } catch (IOException e) { + LOG.error(""); + result = null; + } + } else { + result = (PersistentBase) value; + } break; case UNION: http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java index 4498caf..57d03f1 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java @@ -22,6 +22,8 @@ import com.datastax.driver.core.DataType; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import org.apache.avro.Schema; +import org.apache.commons.lang.ArrayUtils; +import org.apache.gora.cassandra.bean.CassandraKey; import org.apache.gora.cassandra.bean.Field; import org.apache.gora.cassandra.query.CassandraResultSet; import org.apache.gora.cassandra.store.CassandraClient; @@ -50,7 +52,7 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer { private DataStore<K, T> cassandraDataStore; AvroSerializer(CassandraClient cassandraClient, DataStore<K, T> dataStore, CassandraMapping mapping) { - super(cassandraClient, dataStore.getKeyClass(), dataStore.getKeyClass(), mapping); + super(cassandraClient, dataStore.getKeyClass(), dataStore.getPersistentClass(), mapping); this.cassandraDataStore = dataStore; } @@ -70,7 +72,7 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer { if (iterator.hasNext()) { obj = cassandraDataStore.newPersistent(); Row row = iterator.next(); - populateValuesToPersistent(row, definitions, obj); + populateValuesToPersistent(row, definitions, obj, fields); } return obj; } @@ -85,13 +87,14 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer { AvroCassandraUtils.processKeys(mapping, key, fields, values); for (Schema.Field f : persistentBase.getSchema().getFields()) { String fieldName = f.name(); - if (mapping.getFieldFromFieldName(fieldName) == null) { + Field field = mapping.getFieldFromFieldName(fieldName); + if (field == null) { LOG.debug("Ignoring {} adding field, {} field can't find in {} mapping", new Object[]{fieldName, fieldName, persistentClass}); continue; } - if (persistent.isDirty(f.pos()) || mapping.getInlinedDefinedPartitionKeys().contains(mapping.getFieldFromFieldName(fieldName))) { + if (persistent.isDirty(f.pos()) || mapping.getInlinedDefinedPartitionKey().equals(mapping.getFieldFromFieldName(fieldName))) { Object value = persistentBase.get(f.pos()); - value = AvroCassandraUtils.getFieldValueFromAvroBean(f.schema(), f.schema().getType(), value); + value = AvroCassandraUtils.getFieldValueFromAvroBean(f.schema(), f.schema().getType(), value, field); values.add(value); fields.add(fieldName); } @@ -120,7 +123,7 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer { if (iterator.hasNext()) { obj = cassandraDataStore.newPersistent(); Row row = iterator.next(); - populateValuesToPersistent(row, definitions, obj); + populateValuesToPersistent(row, definitions, obj, mapping.getFieldNames()); } return obj; } @@ -128,104 +131,129 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer { /** * This method wraps result set data in to DataEntry and creates a list of DataEntry. **/ - private void populateValuesToPersistent(Row row, ColumnDefinitions columnDefinitions, PersistentBase base) { - + private void populateValuesToPersistent(Row row, ColumnDefinitions columnDefinitions, PersistentBase base, String[] fields) { Object paramValue; - for (Schema.Field avrofield : base.getSchema().getFields()) { - - Field field = mapping.getFieldFromFieldName(avrofield.name()); + for (String fieldName : fields) { + Schema.Field avroField = base.getSchema().getField(fieldName); + Field field = mapping.getFieldFromFieldName(fieldName); //to ignore unspecified fields in the mapping - if (field == null) { + if (field == null || avroField == null) { continue; } - Schema fieldSchema = avrofield.schema(); + Schema fieldSchema = avroField.schema(); String columnName = field.getColumnName(); - DataType columnType = columnDefinitions.getType(columnName); - - switch (columnType.getName()) { - case ASCII: - paramValue = row.getString(columnName); - break; - case BIGINT: - paramValue = row.isNull(columnName) ? null : row.getLong(columnName); - break; - case BLOB: - paramValue = row.isNull(columnName) ? null : row.getBytes(columnName); - break; - case BOOLEAN: - paramValue = row.isNull(columnName) ? null : row.getBool(columnName); - break; - case COUNTER: - paramValue = row.isNull(columnName) ? null : row.getLong(columnName); - break; - case DECIMAL: - paramValue = row.isNull(columnName) ? null : row.getDecimal(columnName); - break; - case DOUBLE: - paramValue = row.isNull(columnName) ? null : row.getDouble(columnName); - break; - case FLOAT: - paramValue = row.isNull(columnName) ? null : row.getFloat(columnName); - break; - case INET: - paramValue = row.isNull(columnName) ? null : row.getInet(columnName).toString(); - break; - case INT: - paramValue = row.isNull(columnName) ? null : row.getInt(columnName); - break; - case TEXT: - paramValue = row.getString(columnName); - break; - case TIMESTAMP: - paramValue = row.isNull(columnName) ? null : row.getDate(columnName); - break; - case UUID: - paramValue = row.isNull(columnName) ? null : row.getUUID(columnName); - break; - case VARCHAR: - paramValue = row.getString(columnName); - break; - case VARINT: - paramValue = row.isNull(columnName) ? null : row.getVarint(columnName); - break; - case TIMEUUID: - paramValue = row.isNull(columnName) ? null : row.getUUID(columnName); - break; - case LIST: - String dataType = field.getType(); - dataType = dataType.substring(dataType.indexOf("<") + 1, dataType.indexOf(">")); - paramValue = row.isNull(columnName) ? null : row.getList(columnName, getRelevantClassForCassandraDataType(dataType)); - break; - case SET: - dataType = field.getType(); - dataType = dataType.substring(dataType.indexOf("<") + 1, dataType.indexOf(">")); - paramValue = row.isNull(columnName) ? null : row.getList(columnName, getRelevantClassForCassandraDataType(dataType)); - break; - case MAP: - dataType = field.getType(); - dataType = dataType.substring(dataType.indexOf("<") + 1, dataType.indexOf(">")); - dataType = dataType.split(",")[1]; - // Avro supports only String for keys - paramValue = row.isNull(columnName) ? null : row.getMap(columnName, String.class, getRelevantClassForCassandraDataType(dataType)); - break; - case UDT: - paramValue = row.isNull(columnName) ? null : row.getUDTValue(columnName).toString(); - break; - case TUPLE: - paramValue = row.isNull(columnName) ? null : row.getTupleValue(columnName).toString(); - break; - case CUSTOM: - paramValue = row.isNull(columnName) ? null : row.getBytes(columnName); - break; - default: - paramValue = row.getString(columnName); - break; - } + paramValue = getValue(row, columnDefinitions, columnName); Object value = AvroCassandraUtils.getAvroFieldValue(paramValue, fieldSchema); - base.put(avrofield.pos(), value); + base.put(avroField.pos(), value); + } + } + + private Object getValue(Row row, ColumnDefinitions columnDefinitions, String columnName) { + Object paramValue; + Field field = mapping.getFieldFromColumnName(columnName); + DataType columnType = columnDefinitions.getType(columnName); + switch (columnType.getName()) { + case ASCII: + paramValue = row.getString(columnName); + break; + case BIGINT: + paramValue = row.isNull(columnName) ? null : row.getLong(columnName); + break; + case BLOB: + paramValue = row.isNull(columnName) ? null : row.getBytes(columnName); + break; + case BOOLEAN: + paramValue = row.isNull(columnName) ? null : row.getBool(columnName); + break; + case COUNTER: + paramValue = row.isNull(columnName) ? null : row.getLong(columnName); + break; + case DECIMAL: + paramValue = row.isNull(columnName) ? null : row.getDecimal(columnName); + break; + case DOUBLE: + paramValue = row.isNull(columnName) ? null : row.getDouble(columnName); + break; + case FLOAT: + paramValue = row.isNull(columnName) ? null : row.getFloat(columnName); + break; + case INET: + paramValue = row.isNull(columnName) ? null : row.getInet(columnName).toString(); + break; + case INT: + paramValue = row.isNull(columnName) ? null : row.getInt(columnName); + break; + case TEXT: + paramValue = row.getString(columnName); + break; + case TIMESTAMP: + paramValue = row.isNull(columnName) ? null : row.getDate(columnName); + break; + case UUID: + paramValue = row.isNull(columnName) ? null : row.getUUID(columnName); + break; + case VARCHAR: + paramValue = row.getString(columnName); + break; + case VARINT: + paramValue = row.isNull(columnName) ? null : row.getVarint(columnName); + break; + case TIMEUUID: + paramValue = row.isNull(columnName) ? null : row.getUUID(columnName); + break; + case LIST: + String dataType = field.getType(); + dataType = dataType.substring(dataType.indexOf("<") + 1, dataType.indexOf(">")); + paramValue = row.isNull(columnName) ? null : row.getList(columnName, getRelevantClassForCassandraDataType(dataType)); + break; + case SET: + dataType = field.getType(); + dataType = dataType.substring(dataType.indexOf("<") + 1, dataType.indexOf(">")); + paramValue = row.isNull(columnName) ? null : row.getList(columnName, getRelevantClassForCassandraDataType(dataType)); + break; + case MAP: + dataType = field.getType(); + dataType = dataType.substring(dataType.indexOf("<") + 1, dataType.indexOf(">")); + dataType = dataType.split(",")[1]; + // Avro supports only String for keys + paramValue = row.isNull(columnName) ? null : row.getMap(columnName, String.class, getRelevantClassForCassandraDataType(dataType)); + break; + case UDT: + paramValue = row.isNull(columnName) ? null : row.getUDTValue(columnName); + break; + case TUPLE: + paramValue = row.isNull(columnName) ? null : row.getTupleValue(columnName).toString(); + break; + case CUSTOM: + paramValue = row.isNull(columnName) ? null : row.getBytes(columnName); + break; + default: + paramValue = row.getString(columnName); + break; } + return paramValue; } +/* public Collection<Object> getFieldValues(Object o) { + UDTValue udtValue = (UDTValue) o; + UserType type = udtValue.getType(); + + Collection<Object> values = new ArrayList<Object>(type.size()); + + *//* for (UserType.Field field : type) { + udtValue. + ByteBuffer bytes = udtValue.getBytesUnsafe(field.getName()); + DataType value = field.getType(); + for(DataType type1 : value.getTypeArguments()) { + type1. + } + values.add(value); + }*//* + + return values; + }*/ + + private Class getRelevantClassForCassandraDataType(String dataType) { switch (dataType) { //// TODO: 7/25/17 support all the datatypes @@ -254,8 +282,14 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer { @Override public Result execute(DataStore dataStore, Query query) { List<Object> objectArrayList = new ArrayList<>(); + String[] fields = query.getFields(); + if (fields != null) { + fields = (String[]) ArrayUtils.addAll(fields, mapping.getAllKeys()); + } else { + fields = mapping.getAllFieldsIncludingKeys(); + } CassandraResultSet<K, T> cassandraResult = new CassandraResultSet<>(dataStore, query); - String cqlQuery = CassandraQueryFactory.getExecuteQuery(mapping, query, objectArrayList); + String cqlQuery = CassandraQueryFactory.getExecuteQuery(mapping, query, objectArrayList, fields); ResultSet results; if (objectArrayList.size() == 0) { results = client.getSession().execute(cqlQuery); @@ -266,12 +300,18 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer { ColumnDefinitions definitions = results.getColumnDefinitions(); T obj; K keyObject; + CassandraKey cassandraKey = mapping.getCassandraKey(); while (iterator.hasNext()) { Row row = iterator.next(); obj = cassandraDataStore.newPersistent(); keyObject = cassandraDataStore.newKey(); - populateValuesToPersistent(row, definitions, obj); - populateValuesToPersistent(row, definitions, (PersistentBase) keyObject); + populateValuesToPersistent(row, definitions, obj, fields); + if (cassandraKey != null) { + populateValuesToPersistent(row, definitions, (PersistentBase) keyObject, cassandraKey.getFieldNames()); + } else { + Field key = mapping.getInlinedDefinedPartitionKey(); + keyObject = (K) getValue(row, definitions, key.getColumnName()); + } cassandraResult.addResultElement(keyObject, obj); } return cassandraResult; http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java index 4362a04..10c8f68 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java @@ -36,8 +36,8 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Locale; import java.util.Map; +import java.util.Set; /** * This class is used create Cassandra Queries. @@ -97,29 +97,17 @@ class CassandraQueryFactory { stringBuffer.append("CREATE TABLE IF NOT EXISTS ").append(mapping.getKeySpace().getName()).append(".").append(mapping.getCoreName()).append(" ("); boolean isCommaNeeded = false; CassandraKey cassandraKey = mapping.getCassandraKey(); - // appending Cassandra key columns into db schema - for (Field field : mapping.getFieldList()) { - if (isCommaNeeded) { - stringBuffer.append(", "); - } - stringBuffer.append(field.getColumnName()).append(" ").append(field.getType()); - boolean isStaticColumn = Boolean.parseBoolean(field.getProperty("static")); - boolean isPrimaryKey = Boolean.parseBoolean(field.getProperty("primarykey")); - if (isStaticColumn) { - stringBuffer.append(" STATIC"); - } - if (isPrimaryKey) { - stringBuffer.append(" PRIMARY KEY "); - } - isCommaNeeded = true; - } + // appending Cassandra Persistent columns into db schema + processFieldsForCreateTableQuery(mapping.getFieldList(), isCommaNeeded, stringBuffer); if (cassandraKey != null) { - List<PartitionKeyField> pkey = cassandraKey.getPartitionKeyFields(); - if (pkey != null) { + isCommaNeeded = true; + processFieldsForCreateTableQuery(cassandraKey.getFieldList(), isCommaNeeded, stringBuffer); + List<PartitionKeyField> partitionKeys = cassandraKey.getPartitionKeyFields(); + if (partitionKeys != null) { stringBuffer.append(", PRIMARY KEY ("); boolean isCommaNeededToApply = false; - for (PartitionKeyField keyField : pkey) { + for (PartitionKeyField keyField : partitionKeys) { if (isCommaNeededToApply) { stringBuffer.append(","); } @@ -141,11 +129,6 @@ class CassandraQueryFactory { } stringBuffer.append(")"); } - } else { - if (!stringBuffer.toString().toLowerCase(Locale.ENGLISH).contains("primary key")) { - Field field = mapping.getDefaultCassandraKey(); - stringBuffer.append(", ").append(field.getFieldName()).append(" ").append(field.getType()).append(" PRIMARY KEY "); - } } stringBuffer.append(")"); @@ -191,6 +174,24 @@ class CassandraQueryFactory { return stringBuffer.toString(); } + private static void processFieldsForCreateTableQuery(List<Field> fields, boolean isCommaNeeded, StringBuilder stringBuilder) { + for (Field field : fields) { + if (isCommaNeeded) { + stringBuilder.append(", "); + } + stringBuilder.append(field.getColumnName()).append(" ").append(field.getType()); + boolean isStaticColumn = Boolean.parseBoolean(field.getProperty("static")); + boolean isPrimaryKey = Boolean.parseBoolean(field.getProperty("primarykey")); + if (isStaticColumn) { + stringBuilder.append(" STATIC"); + } + if (isPrimaryKey) { + stringBuilder.append(" PRIMARY KEY "); + } + isCommaNeeded = true; + } + } + /** * This method returns the CQL query to drop table. * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/drop_table_r.html @@ -221,7 +222,7 @@ class CassandraQueryFactory { * @return CQL query */ static String getTruncateTableQuery(CassandraMapping mapping) { - return "TRUNCATE TABLE " + mapping.getKeySpace().getName() + "." + mapping.getCoreName(); + return QueryBuilder.truncate(mapping.getKeySpace().getName(), mapping.getCoreName()).getQueryString(); } /** @@ -371,9 +372,7 @@ class CassandraQueryFactory { * @param objects object list * @return CQL Query */ - static String getExecuteQuery(CassandraMapping mapping, Query cassandraQuery, List<Object> objects) { - String[] fields = cassandraQuery.getFields(); - fields = fields != null ? fields : mapping.getFieldNames(); + static String getExecuteQuery(CassandraMapping mapping, Query cassandraQuery, List<Object> objects, String[] fields) { Object startKey = cassandraQuery.getStartKey(); Object endKey = cassandraQuery.getEndKey(); Object key = cassandraQuery.getKey(); @@ -470,14 +469,17 @@ class CassandraQueryFactory { ArrayList<String> columnNames = new ArrayList<>(); for (String field : fields) { Field fieldBean = mapping.getFieldFromFieldName(field); + CassandraKey cassandraKey = mapping.getCassandraKey(); + Field keyBean = null; + if (cassandraKey != null) { + keyBean = cassandraKey.getFieldFromFieldName(field); + } if (fieldBean != null) { columnNames.add(fieldBean.getColumnName()); + } else if (keyBean != null) { + columnNames.add(keyBean.getColumnName()); } else { - if (mapping.getDefaultCassandraKey().getFieldName().equals(field)) { - columnNames.add(field); - } else { - LOG.warn("{} field is ignored, couldn't find relavant field in the persistent mapping", field); - } + LOG.warn("{} field is ignored, couldn't find relevant field in the persistent mapping", field); } } return columnNames.toArray(new String[0]); @@ -504,7 +506,7 @@ class CassandraQueryFactory { */ static String getDeleteByQuery(CassandraMapping mapping, Query cassandraQuery, List<Object> objects) { String[] columns = null; - if (!Arrays.equals(cassandraQuery.getFields(), mapping.getFieldNames())) { + if (cassandraQuery.getFields() != null) { columns = getColumnNames(mapping, Arrays.asList(cassandraQuery.getFields())); } Object startKey = cassandraQuery.getStartKey(); @@ -549,17 +551,17 @@ class CassandraQueryFactory { String[] columnKeys = getColumnNames(mapping, cassandraKeys); for (int i = 0; i < cassandraKeys.size(); i++) { if (isWhereNeeded) { - query = delete.where(QueryBuilder.gte(columnKeys[i], "?")); + query = delete.where(QueryBuilder.gte(QueryBuilder.token(columnKeys[i]), QueryBuilder.token("?"))); objects.add(cassandraValues.get(i)); isWhereNeeded = false; } else { - query = query.and(QueryBuilder.gte(columnKeys[i], "?")); + query = query.and(QueryBuilder.gte(QueryBuilder.token(columnKeys[i]), QueryBuilder.token("?"))); objects.add(cassandraValues.get(i)); } } } else { primaryKey = getPKey(mapping.getFieldList()); - query = delete.where(QueryBuilder.gte(primaryKey, "?")); + query = delete.where(QueryBuilder.gte(QueryBuilder.token(primaryKey), QueryBuilder.token("?"))); objects.add(startKey); isWhereNeeded = false; } @@ -572,20 +574,20 @@ class CassandraQueryFactory { String[] columnKeys = getColumnNames(mapping, cassandraKeys); for (int i = 0; i < cassandraKeys.size(); i++) { if (isWhereNeeded) { - query = delete.where(QueryBuilder.lte(columnKeys[i], "?")); + query = delete.where(QueryBuilder.lte(QueryBuilder.token(columnKeys[i]), QueryBuilder.token("?"))); objects.add(cassandraValues.get(i)); isWhereNeeded = false; } else { - query = query.and(QueryBuilder.lte(columnKeys[i], "?")); + query = query.and(QueryBuilder.lte(QueryBuilder.token(columnKeys[i]), QueryBuilder.token("?"))); objects.add(cassandraValues.get(i)); } } } else { primaryKey = primaryKey != null ? primaryKey : getPKey(mapping.getFieldList()); if (isWhereNeeded) { - query = delete.where(QueryBuilder.lte(primaryKey, "?")); + query = delete.where(QueryBuilder.lte(QueryBuilder.token(primaryKey), QueryBuilder.token("?"))); } else { - query = query.and(QueryBuilder.lte(primaryKey, "?")); + query = query.and(QueryBuilder.lte(QueryBuilder.token(primaryKey), QueryBuilder.token("?"))); } objects.add(endKey); } @@ -611,7 +613,7 @@ class CassandraQueryFactory { Update.Assignments updateAssignments = null; if (cassandraQuery instanceof CassandraQuery) { String[] columnNames = getColumnNames(mapping, Arrays.asList(cassandraQuery.getFields())); - if(CassandraNativePersistent.class.isAssignableFrom(mapping.getPersistentClass())) { + if (CassandraNativePersistent.class.isAssignableFrom(mapping.getPersistentClass())) { for (String column : columnNames) { updateAssignments = update.with(QueryBuilder.set(column, "?")); objects.add(((CassandraQuery) cassandraQuery).getUpdateFieldValue(mapping.getFieldFromColumnName(column).getFieldName())); @@ -619,12 +621,12 @@ class CassandraQueryFactory { } else { for (String column : columnNames) { updateAssignments = update.with(QueryBuilder.set(column, "?")); - String field = mapping.getFieldFromColumnName(column).getFieldName(); - Object value = ((CassandraQuery) cassandraQuery).getUpdateFieldValue(field); + Field field = mapping.getFieldFromColumnName(column); + Object value = ((CassandraQuery) cassandraQuery).getUpdateFieldValue(field.getFieldName()); try { Schema schema = (Schema) mapping.getPersistentClass().getField("SCHEMA$").get(null); - Schema schemaField = schema.getField(field).schema(); - objects.add(AvroCassandraUtils.getFieldValueFromAvroBean(schemaField, schemaField.getType(), value)); + Schema schemaField = schema.getField(field.getFieldName()).schema(); + objects.add(AvroCassandraUtils.getFieldValueFromAvroBean(schemaField, schemaField.getType(), value, field)); } catch (IllegalAccessException | NoSuchFieldException e) { throw new RuntimeException("SCHEMA$ field can't accessible, Please recompile the Avro schema with goracompiler."); } catch (NullPointerException e) { @@ -717,4 +719,111 @@ class CassandraQueryFactory { return query.getQueryString(); } + /** + * This method returns create Type CQL query to create user define types. + * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/cqlRefcreateType.html + * + * @param fieldSchema avroSchema {@link Schema} + * @param mapping Cassandra mapping {@link CassandraMapping} + * @return CQL Query + */ + static String getCreateUDTType(Schema fieldSchema, CassandraMapping mapping, Set<String> udtQueryStack) { + StringBuilder stringBuffer = new StringBuilder(); + if (fieldSchema.getType().equals(Schema.Type.UNION)) { + for (Schema fieldTypeSchema : fieldSchema.getTypes()) { + if (fieldTypeSchema.getType().equals(Schema.Type.RECORD)) { + fieldSchema = fieldTypeSchema; + break; + } + } + } + stringBuffer.append("CREATE TYPE IF NOT EXISTS ").append(mapping.getKeySpace().getName()).append(".").append(fieldSchema.getName()).append(" ("); + processRecord(fieldSchema, stringBuffer, mapping, udtQueryStack); + stringBuffer.append(")"); + return stringBuffer.toString(); + } + + private static void processRecord(Schema recordSchema, StringBuilder stringBuilder, CassandraMapping mapping, Set<String> udtQueryStack) { + boolean isCommaNeeded = false; + for (Schema.Field field : recordSchema.getFields()) { + if (isCommaNeeded) { + stringBuilder.append(", "); + } + String fieldName = field.name(); + stringBuilder.append(fieldName).append(" "); + try { + populateFieldsToQuery(field.schema(), stringBuilder, mapping, udtQueryStack); + isCommaNeeded = true; + } catch (Exception e) { + int i = stringBuilder.indexOf(fieldName); + if (i != -1) { + stringBuilder.delete(i, i + fieldName.length()); + isCommaNeeded = false; + } + } + } + } + + private static void populateFieldsToQuery(Schema schema, StringBuilder builder, CassandraMapping mapping, Set<String> udtQueryStack) throws Exception { + switch (schema.getType()) { + case INT: + builder.append("int"); + break; + case MAP: + builder.append("map<text,"); + populateFieldsToQuery(schema.getValueType(), builder, mapping, udtQueryStack); + builder.append(">"); + break; + case ARRAY: + builder.append("list<"); + populateFieldsToQuery(schema.getElementType(), builder, mapping, udtQueryStack); + builder.append(">"); + break; + case LONG: + builder.append("bigint"); + break; + case FLOAT: + builder.append("float"); + break; + case DOUBLE: + builder.append("double"); + break; + case BOOLEAN: + builder.append("boolean"); + break; + case BYTES: + builder.append("blob"); + break; + case RECORD: + builder.append("frozen<").append(schema.getName()).append(">"); + String query = getCreateUDTType(schema, mapping, udtQueryStack); + udtQueryStack.add(query); + break; + case STRING: + case FIXED: + case ENUM: + builder.append("text"); + break; + case UNION: + for (Schema unionElementSchema : schema.getTypes()) { + if (unionElementSchema.getType().equals(Schema.Type.RECORD)) { + String recordName = unionElementSchema.getName(); + if (!builder.toString().contains(recordName)) { + builder.append("frozen<").append(recordName).append(">"); + query = getCreateUDTType(unionElementSchema, mapping, udtQueryStack); + udtQueryStack.add(query); + } else { + LOG.warn("Same Field Type can't be mapped recursively. This is not supported with Cassandra UDT types, Please use byte dataType for recursive mapping."); + throw new Exception("Same Field Type has mapped recursively"); + } + break; + } else if (!unionElementSchema.getType().equals(Schema.Type.NULL)) { + populateFieldsToQuery(unionElementSchema, builder, mapping, udtQueryStack); + break; + } + } + break; + } + } + } http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java index ac4da42..17e0568 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java @@ -20,6 +20,7 @@ package org.apache.gora.cassandra.serializers; import com.datastax.driver.core.KeyspaceMetadata; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.TableMetadata; +import org.apache.avro.Schema; import org.apache.gora.cassandra.bean.Field; import org.apache.gora.cassandra.store.CassandraClient; import org.apache.gora.cassandra.store.CassandraMapping; @@ -32,22 +33,22 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.LinkedHashSet; import java.util.List; import java.util.Locale; +import java.util.Set; /** * This is the abstract Cassandra Serializer class. */ public abstract class CassandraSerializer<K, T extends Persistent> { - CassandraClient client; - + private static final Logger LOG = LoggerFactory.getLogger(CassandraStore.class); protected Class<K> keyClass; protected Class<T> persistentClass; protected CassandraMapping mapping; - - private static final Logger LOG = LoggerFactory.getLogger(CassandraStore.class); + CassandraClient client; CassandraSerializer(CassandraClient cc, Class<K> keyClass, Class<T> persistantClass, CassandraMapping mapping) { this.keyClass = keyClass; @@ -56,9 +57,35 @@ public abstract class CassandraSerializer<K, T extends Persistent> { this.mapping = mapping; } + /** + * This method returns the Cassandra Serializer according the Cassandra serializer property. + * + * @param cc Cassandra Client + * @param type Serialization type + * @param dataStore Cassandra DataStore + * @param mapping Cassandra Mapping + * @param <K> key class + * @param <T> persistent class + * @return Serializer + */ + public static <K, T extends Persistent> CassandraSerializer getSerializer(CassandraClient cc, String type, final DataStore<K, T> dataStore, CassandraMapping mapping) { + CassandraStore.SerializerType serType = type.isEmpty() ? CassandraStore.SerializerType.NATIVE : CassandraStore.SerializerType.valueOf(type.toUpperCase(Locale.ENGLISH)); + CassandraSerializer serializer; + switch (serType) { + case AVRO: + serializer = new AvroSerializer(cc, dataStore, mapping); + break; + case NATIVE: + default: + serializer = new NativeSerializer(cc, dataStore.getKeyClass(), dataStore.getPersistentClass(), mapping); + } + return serializer; + } + public void createSchema() { LOG.debug("creating Cassandra keyspace {}", mapping.getKeySpace().getName()); this.client.getSession().execute(CassandraQueryFactory.getCreateKeySpaceQuery(mapping)); + processUDTSchemas(); //TODO complete functionality LOG.debug("creating Cassandra column family / table {}", mapping.getCoreName()); this.client.getSession().execute(CassandraQueryFactory.getCreateTableQuery(mapping)); } @@ -89,29 +116,30 @@ public abstract class CassandraSerializer<K, T extends Persistent> { } } - /** - * This method returns the Cassandra Serializer according the Cassandra serializer property. - * - * @param cc Cassandra Client - * @param type Serialization type - * @param dataStore Cassandra DataStore - * @param mapping Cassandra Mapping - * @param <K> key class - * @param <T> persistent class - * @return Serializer - */ - public static <K, T extends Persistent> CassandraSerializer getSerializer(CassandraClient cc, String type, final DataStore<K,T> dataStore, CassandraMapping mapping) { - CassandraStore.SerializerType serType = type.isEmpty() ? CassandraStore.SerializerType.NATIVE : CassandraStore.SerializerType.valueOf(type.toUpperCase(Locale.ENGLISH)); - CassandraSerializer serializer; - switch (serType) { - case AVRO: - serializer = new AvroSerializer(cc, dataStore, mapping); - break; - case NATIVE: - default: - serializer = new NativeSerializer(cc, dataStore.getKeyClass(), dataStore.getPersistentClass(), mapping); + private void processUDTSchemas() { + Set<String> schemaStack = new LinkedHashSet<>(); + for (Field field : mapping.getFieldList()) { + if (field.getType().contains("frozen")) { + try { + Schema schema = (Schema) mapping.getPersistentClass().getField("SCHEMA$").get(null); + Schema schemaField = schema.getField(field.getFieldName()).schema(); + String cqlQuery = CassandraQueryFactory.getCreateUDTType(schemaField, mapping, schemaStack); + schemaStack.add(cqlQuery); + } catch (IllegalAccessException | NoSuchFieldException e) { + throw new RuntimeException("SCHEMA$ field can't accessible, Please recompile the Avro schema with goracompiler."); + } catch (NullPointerException e) { + throw new RuntimeException(field + " field couldn't find in the class " + mapping.getPersistentClass() + "."); + } + } + } + createUserDefineTypes(schemaStack); + + } + + private void createUserDefineTypes(Set<String> queries) { + for (String cqlQuery : queries) { + this.client.getSession().execute(cqlQuery); } - return serializer; } protected String[] getFields() { @@ -146,14 +174,22 @@ public abstract class CassandraSerializer<K, T extends Persistent> { public long deleteByQuery(Query query) { List<Object> objectArrayList = new ArrayList<>(); - String cqlQuery = CassandraQueryFactory.getDeleteByQuery(mapping, query, objectArrayList); - ResultSet results; - if (objectArrayList.size() == 0) { - results = client.getSession().execute(cqlQuery); + if (query.getKey() == null && query.getEndKey() == null && query.getStartKey() == null) { + if (query.getFields() == null) { + client.getSession().execute(CassandraQueryFactory.getTruncateTableQuery(mapping)); + } else { + LOG.error("Delete by Query is not supported for the Queries which didn't specify Query keys with fields."); + } } else { - results = client.getSession().execute(cqlQuery, objectArrayList.toArray()); + String cqlQuery = CassandraQueryFactory.getDeleteByQuery(mapping, query, objectArrayList); + ResultSet results; + if (objectArrayList.size() == 0) { + results = client.getSession().execute(cqlQuery); + } else { + results = client.getSession().execute(cqlQuery, objectArrayList.toArray()); + } + LOG.debug("Delete by Query was applied : " + results.wasApplied()); } - LOG.debug("Delete by Query was applied : " + results.wasApplied()); LOG.info("Delete By Query method doesn't return the deleted element count."); return 0; } http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java index 4695498..f8bb066 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java @@ -21,6 +21,7 @@ import com.datastax.driver.core.ResultSet; import com.datastax.driver.mapping.Mapper; import com.datastax.driver.mapping.MappingManager; import com.datastax.driver.mapping.Result; +import org.apache.commons.lang.ArrayUtils; import org.apache.gora.cassandra.bean.Field; import org.apache.gora.cassandra.persistent.CassandraNativePersistent; import org.apache.gora.cassandra.query.CassandraResultSet; @@ -46,16 +47,23 @@ class NativeSerializer<K, T extends CassandraNativePersistent> extends Cassandra private Mapper<T> mapper; + NativeSerializer(CassandraClient cassandraClient, Class<K> keyClass, Class<T> persistentClass, CassandraMapping mapping) { + super(cassandraClient, keyClass, persistentClass, mapping); + this.createSchema(); + MappingManager mappingManager = new MappingManager(cassandraClient.getSession()); + mapper = mappingManager.mapper(persistentClass); + } + @Override public void put(Object key, Persistent value) { - LOG.debug("Object is saved with key : {} and value : {}",key,value); + LOG.debug("Object is saved with key : {} and value : {}", key, value); mapper.save((T) value); } @Override public T get(Object key) { T object = mapper.get(key); - if(object != null) { + if (object != null) { LOG.debug("Object is found for key : {}", key); } else { LOG.debug("Object is not found for key : {}", key); @@ -72,7 +80,7 @@ class NativeSerializer<K, T extends CassandraNativePersistent> extends Cassandra @Override public Persistent get(Object key, String[] fields) { - if(fields == null) { + if (fields == null) { fields = getFields(); } String cqlQuery = CassandraQueryFactory.getSelectObjectWithFieldsQuery(mapping, fields); @@ -83,15 +91,21 @@ class NativeSerializer<K, T extends CassandraNativePersistent> extends Cassandra LOG.debug("Object is found for key : {}", key); return objectList.get(0); } - LOG.debug("Object is not found for key : {}" , key); + LOG.debug("Object is not found for key : {}", key); return null; } @Override public org.apache.gora.query.Result execute(DataStore dataStore, Query query) { List<Object> objectArrayList = new ArrayList<>(); + String[] fields = query.getFields(); + if (fields != null) { + fields = (String[]) ArrayUtils.addAll(fields, mapping.getAllKeys()); + } else { + fields = mapping.getAllFieldsIncludingKeys(); + } CassandraResultSet<K, T> cassandraResult = new CassandraResultSet<>(dataStore, query); - String cqlQuery = CassandraQueryFactory.getExecuteQuery(mapping, query, objectArrayList); + String cqlQuery = CassandraQueryFactory.getExecuteQuery(mapping, query, objectArrayList, fields); ResultSet results; if (objectArrayList.size() == 0) { results = client.getSession().execute(cqlQuery); @@ -108,13 +122,6 @@ class NativeSerializer<K, T extends CassandraNativePersistent> extends Cassandra return cassandraResult; } - NativeSerializer(CassandraClient cassandraClient, Class<K> keyClass, Class<T> persistentClass, CassandraMapping mapping) { - super(cassandraClient, keyClass, persistentClass, mapping); - this.createSchema(); - MappingManager mappingManager = new MappingManager(cassandraClient.getSession()); - mapper = mappingManager.mapper(persistentClass); - } - private K getKey(T object) { String keyField = null; for (Field field : mapping.getFieldList()) { http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/package-info.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/package-info.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/package-info.java index 5d22d94..ce1e3e7 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/package-info.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/package-info.java @@ -5,15 +5,16 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + /** * This package contains Cassandra store related util classes for serializer. */ http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java index 196f6a3..c973fe4 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.gora.cassandra.store; import com.datastax.driver.core.Cluster; @@ -53,6 +70,8 @@ public class CassandraClient { private Cluster cluster; + private Session session; + private CassandraMapping mapping; public Session getSession() { return session; @@ -62,11 +81,6 @@ public class CassandraClient { return cluster; } - private Session session; - - private CassandraMapping mapping; - - void initialize(Properties properties, CassandraMapping mapping) throws Exception { Cluster.Builder builder = Cluster.builder(); List<String> codecs = readCustomCodec(properties); http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java index 5699355..ac46a30 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java @@ -31,54 +31,80 @@ import java.util.Map; */ public class CassandraMapping { + private static final String PRIMARY_KEY = "primarykey"; private CassandraKey cassandraKey; - private Map<String, String> tableProperties; - private Class keyClass; - private Class persistentClass; - private KeySpace keySpace; - private List<Field> fieldList; - - private List<Field> inlinedDefinedPartitionKeys; - - private static final String PRIMARY_KEY = "primarykey"; - + private Field inlinedDefinedPartitionKey; private String coreName; + /** + * Constructor of the class + */ + CassandraMapping() { + this.fieldList = new ArrayList<>(); + this.tableProperties = new HashMap<>(); + } + + /** + * This method returns the KeySpace in the mapping file, + * @return Key space {@link KeySpace} + */ public KeySpace getKeySpace() { return keySpace; } - public void setKeySpace(KeySpace keySpace) { + /** + * This method set the KeySpace in the Cassandra mapping. + * @param keySpace Key space {@link KeySpace} + */ + void setKeySpace(KeySpace keySpace) { this.keySpace = keySpace; } + /** + * Thi method returns only the fields which belongs only for the Persistent Object. + * @return List of Fields + */ public List<Field> getFieldList() { return fieldList; } + /** + * This method returns the Persistent Object's Field from the mapping, according to the FieldName. + * @param fieldName Field Name + * @return Field {@link Field} + */ public Field getFieldFromFieldName(String fieldName) { for (Field field1 : fieldList) { - if (field1.getFieldName().equals(fieldName)) { + if (field1.getFieldName().equalsIgnoreCase(fieldName)) { return field1; } } return null; } + /** + * This method returns the Persistent Object's Field from the mapping, according to the ColumnName. + * @param columnName Column Name + * @return Field {@link Field} + */ public Field getFieldFromColumnName(String columnName) { for (Field field1 : fieldList) { - if (field1.getColumnName().equals(columnName)) { + if (field1.getColumnName().equalsIgnoreCase(columnName)) { return field1; } } return null; } + /** + * This method returns the Field Names + * @return array of Field Names + */ public String[] getFieldNames() { List<String> fieldNames = new ArrayList<>(fieldList.size()); for (Field field : fieldList) { @@ -88,60 +114,79 @@ public class CassandraMapping { return fieldNames.toArray(fieldNameArray); } + /** + * This method returns + * @return + */ + public String[] getAllFieldsIncludingKeys() { + List<String> fieldNames = new ArrayList<>(fieldList.size()); + for (Field field : fieldList) { + fieldNames.add(field.getFieldName()); + } + if (cassandraKey != null) { + for (Field field : cassandraKey.getFieldList()) { + fieldNames.add(field.getFieldName()); + } + } + String[] fieldNameArray = new String[fieldNames.size()]; + return fieldNames.toArray(fieldNameArray); + } + + /** + * + * @return + */ + public String[] getAllKeys() { + List<String> fieldNames = new ArrayList<>(); + Field keyField = getInlinedDefinedPartitionKey(); + if (cassandraKey != null) { + for (Field field : cassandraKey.getFieldList()) { + fieldNames.add(field.getFieldName()); + } + } else { + fieldNames.add(keyField.getFieldName()); + } + String[] fieldNameArray = new String[fieldNames.size()]; + return fieldNames.toArray(fieldNameArray); + } + public CassandraKey getCassandraKey() { return cassandraKey; } void setCassandraKey(CassandraKey cassandraKey) { this.cassandraKey = cassandraKey; - this.fieldList.addAll(cassandraKey.getFieldList()); } - CassandraMapping() { - this.fieldList = new ArrayList<>(); - this.tableProperties = new HashMap<>(); + public String getCoreName() { + return coreName; } - public void setCoreName(String coreName) { + void setCoreName(String coreName) { this.coreName = coreName; } - public String getCoreName() { - return coreName; - } - - public void addCassandraField(Field field) { + void addCassandraField(Field field) { this.fieldList.add(field); } - public void addProperty(String key, String value) { - this.tableProperties.put(key,value); + void addProperty(String key, String value) { + this.tableProperties.put(key, value); } public String getProperty(String key) { return this.tableProperties.get(key); } - public Field getDefaultCassandraKey() { + private Field getDefaultCassandraKey() { Field field = new Field(); field.setFieldName("defaultId"); field.setColumnName("defaultId"); - field.setType("text"); + field.setType("varchar"); + field.addProperty("primarykey", "true"); return field; } - public boolean isPartitionKeyDefined() { - if (cassandraKey == null) { - for (Field field : fieldList) { - if (Boolean.parseBoolean(field.getProperty(PRIMARY_KEY))) { - return true; - } - } - return false; - } - return true; - } - public Class getKeyClass() { return keyClass; } @@ -154,21 +199,38 @@ public class CassandraMapping { return persistentClass; } - public void setPersistentClass(Class persistentClass) { + void setPersistentClass(Class persistentClass) { this.persistentClass = persistentClass; } - public List<Field> getInlinedDefinedPartitionKeys() { - if(inlinedDefinedPartitionKeys != null) { - return inlinedDefinedPartitionKeys; + /** + * This method return the Inlined defined Partition Key, + * If there isn't any inlined define partition keys, + * this method returns default predefined partition key "defaultId". + * + * @return Partition Key {@link Field} + */ + public Field getInlinedDefinedPartitionKey() { + if (inlinedDefinedPartitionKey != null) { + return inlinedDefinedPartitionKey; } else { - inlinedDefinedPartitionKeys = new ArrayList<>(); for (Field field : fieldList) { if (Boolean.parseBoolean(field.getProperty(PRIMARY_KEY))) { - inlinedDefinedPartitionKeys.add(field); + inlinedDefinedPartitionKey = field; + break; } } - return inlinedDefinedPartitionKeys; + if (inlinedDefinedPartitionKey == null) { + return getDefaultCassandraKey(); + } + return inlinedDefinedPartitionKey; + } + } + + void finalized() { + Field field = getInlinedDefinedPartitionKey(); + if (!fieldList.contains(field) && cassandraKey == null) { + fieldList.add(field); } } } http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java index c501cc5..f151458 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.gora.cassandra.store; import org.apache.gora.cassandra.bean.CassandraKey; @@ -203,6 +220,7 @@ class CassandraMappingBuilder<K, T extends Persistent> { } catch (Exception ex) { throw new IOException(ex); } + cassandraMapping.finalized(); return cassandraMapping; } @@ -219,6 +237,10 @@ class CassandraMappingBuilder<K, T extends Persistent> { fieldKey.setColumnName(attributeValue); break; case "type": + // replace UDT into frozen + if (attributeValue.contains("udt(")) { + attributeValue = attributeValue.replace("udt(", "frozen("); + } fieldKey.setType(attributeValue.replace("(", "<").replace(")", ">")); break; default: @@ -229,8 +251,8 @@ class CassandraMappingBuilder<K, T extends Persistent> { } private int getReplicationFactor(Element element) { - String value = element.getAttributeValue("replication_factor"); - if(value == null) { + String value = element.getAttributeValue("replication_factor"); + if (value == null) { return 1; } else { return Integer.parseInt(value); http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java index 74d3862..c481610 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java @@ -17,11 +17,13 @@ package org.apache.gora.cassandra.store; +import org.apache.avro.data.RecordBuilder; import org.apache.gora.cassandra.persistent.CassandraNativePersistent; import org.apache.gora.cassandra.query.CassandraQuery; import org.apache.gora.cassandra.serializers.CassandraSerializer; import org.apache.gora.persistency.BeanFactory; import org.apache.gora.persistency.Persistent; +import org.apache.gora.persistency.impl.PersistentBase; import org.apache.gora.query.PartitionQuery; import org.apache.gora.query.Query; import org.apache.gora.query.Result; @@ -58,16 +60,6 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> private CassandraSerializer cassandraSerializer; - public enum SerializerType { - AVRO("AVRO"), NATIVE("NATIVE"); - String val; - - SerializerType(String v) { - this.val = v; - } - } - - public CassandraStore() { super(); } @@ -96,6 +88,12 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> } } + @SuppressWarnings("all") + @Override + public Class<T> getPersistentClass() { + return (Class<T>) this.persistentClass; + } + /** * {@inheritDoc} * <p> @@ -110,12 +108,6 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> this.persistentClass = persistentClass; } - @SuppressWarnings("all") - @Override - public Class<T> getPersistentClass() { - return (Class<T>) this.persistentClass; - } - @Override public String getSchemaName() { return mapping.getCoreName(); @@ -148,7 +140,6 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> if (beanFactory != null) { return beanFactory.newKey(); } else { - LOG.warn("beanFactory is hasn't been initialized. It's recommended to initialize beanFactory."); return keyClass.newInstance(); } } catch (Exception ex) { @@ -162,8 +153,10 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> try { if (beanFactory != null) { return this.beanFactory.newPersistent(); + } else if (PersistentBase.class.isAssignableFrom(persistentClass)) { + RecordBuilder builder = (RecordBuilder) persistentClass.getMethod("newBuilder").invoke(null); + return (T) RecordBuilder.class.getMethod("build").invoke(builder); } else { - LOG.warn("beanFactory is hasn't been initialized. It's recommended to initialize beanFactory."); return persistentClass.newInstance(); } } catch (Exception ex) { @@ -171,10 +164,6 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> } } - @Override - public void setBeanFactory(BeanFactory<K, T> beanFactory) { - this.beanFactory = beanFactory; - } @Override public BeanFactory<K, T> getBeanFactory() { @@ -182,6 +171,11 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> } @Override + public void setBeanFactory(BeanFactory<K, T> beanFactory) { + this.beanFactory = beanFactory; + } + + @Override public void close() { this.cassandraSerializer.close(); } @@ -229,7 +223,6 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> @Override public Query<K, T> newQuery() { Query<K, T> query = new CassandraQuery(this); - query.setFields(mapping.getFieldNames()); return query; } @@ -262,4 +255,13 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> return cassandraSerializer.schemaExists(); } + public enum SerializerType { + AVRO("AVRO"), NATIVE("NATIVE"); + String val; + + SerializerType(String v) { + this.val = v; + } + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStoreParameters.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStoreParameters.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStoreParameters.java index 95e1c0f..bb758f6 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStoreParameters.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStoreParameters.java @@ -28,13 +28,13 @@ public class CassandraStoreParameters { */ public static final String CASSANDRA_SERVERS = "gora.cassandrastore.cassandraServers"; /** - *Property pointing to the Cassandra keyspace. + * Property pointing to the Cassandra keyspace. * string */ public static final String KEYSPACE = "gora.cassandrastore.keyspace"; /** - * Property pointing to the port to use to connect to the Cassandra hosts. - * integer + * Property pointing to the port to use to connect to the Cassandra hosts. + * integer */ public static final String PORT = "gora.cassandrastore.port"; @@ -97,7 +97,7 @@ public class CassandraStoreParameters { * Property pointing to set local host new connection threshold. * integer */ - public static final String LOCAL_NEW_CONNECTION_THRESHOLD= "gora.cassandrastore.localNewConnectionThreshold"; + public static final String LOCAL_NEW_CONNECTION_THRESHOLD = "gora.cassandrastore.localNewConnectionThreshold"; /** * Property pointing to set remote host new connection threshold. * integer http://git-wip-us.apache.org/repos/asf/gora/blob/962d7a6a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/package-info.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/package-info.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/package-info.java index e6d0176..2cd9003 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/package-info.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/package-info.java @@ -5,15 +5,16 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ + /** * This package contains all the Cassandra store related classes. */
