Add tests for avro
Project: http://git-wip-us.apache.org/repos/asf/gora/repo Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/a9a3ad49 Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/a9a3ad49 Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/a9a3ad49 Branch: refs/heads/master Commit: a9a3ad4922a45a76c8c6b1b76c011d72736e0c30 Parents: 962d7a6 Author: madhawa-gunasekara <madha...@wso2.com> Authored: Fri Aug 4 21:35:14 2017 +0530 Committer: madhawa-gunasekara <madha...@wso2.com> Committed: Thu Aug 10 23:45:56 2017 +0530 ---------------------------------------------------------------------- gora-cassandra-cql/pom.xml | 5 +- .../nativeSerialization/ComplexTypes.java | 85 ++- .../generated/nativeSerialization/Customer.java | 81 +++ .../generated/nativeSerialization/Document.java | 137 +++++ .../generated/nativeSerialization/User.java | 86 ++- .../org/apache/gora/cassandra/bean/Field.java | 1 + .../persistent/CassandraNativePersistent.java | 108 ---- .../cassandra/query/CassandraResultSet.java | 6 +- .../serializers/AvroCassandraUtils.java | 20 +- .../cassandra/serializers/AvroSerializer.java | 126 +++-- .../serializers/CassandraQueryFactory.java | 526 +++++++++---------- .../serializers/CassandraSerializer.java | 87 +-- .../cassandra/serializers/NativeSerializer.java | 10 +- .../gora/cassandra/store/CassandraMapping.java | 8 +- .../store/CassandraMappingBuilder.java | 3 + .../gora/cassandra/store/CassandraStore.java | 26 +- .../conf/avroUDT/gora-cassandra-mapping.xml | 50 ++ .../src/test/conf/gora.properties | 4 +- .../conf/nativeUDT/gora-cassandra-mapping.xml | 33 ++ .../gora/cassandra/GoraCassandraTestDriver.java | 8 +- .../store/TestAvroSerializationWithUDT.java | 91 ++++ .../cassandra/store/TestCassandraStore.java | 97 +++- .../TestCassandraStoreWithCassandraKey.java | 68 +++ .../store/TestNativeSerializationWithUDT.java | 91 ++++ pom.xml | 4 +- 25 files changed, 1248 insertions(+), 513 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/gora/blob/a9a3ad49/gora-cassandra-cql/pom.xml ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/pom.xml b/gora-cassandra-cql/pom.xml index ed98a18..b78db24 100644 --- a/gora-cassandra-cql/pom.xml +++ b/gora-cassandra-cql/pom.xml @@ -167,11 +167,10 @@ </exclusions> </dependency> - <dependency> +<!-- <dependency> <groupId>io.netty</groupId> <artifactId>netty-transport-native-epoll</artifactId> - <version>4.0.37.Final</version> - </dependency> + </dependency>--> <dependency> <groupId>org.apache.gora</groupId> http://git-wip-us.apache.org/repos/asf/gora/blob/a9a3ad49/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/ComplexTypes.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/ComplexTypes.java b/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/ComplexTypes.java index 797ea62..c161ef9 100644 --- a/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/ComplexTypes.java +++ b/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/ComplexTypes.java @@ -3,7 +3,10 @@ package org.apache.gora.cassandra.example.generated.nativeSerialization; import com.datastax.driver.mapping.annotations.Column; import com.datastax.driver.mapping.annotations.PartitionKey; import com.datastax.driver.mapping.annotations.Table; -import org.apache.gora.cassandra.persistent.CassandraNativePersistent; +import com.datastax.driver.mapping.annotations.Transient; +import org.apache.avro.Schema; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.persistency.Tombstone; import java.util.List; import java.util.Map; @@ -18,7 +21,7 @@ import java.util.UUID; writeConsistency = "QUORUM", caseSensitiveKeyspace = false, caseSensitiveTable = true) -public class ComplexTypes extends CassandraNativePersistent { +public class ComplexTypes implements Persistent { @Column private List<String> listDataType; @@ -98,4 +101,82 @@ public class ComplexTypes extends CassandraNativePersistent { public void setId(String id) { this.id = id; } + + @Transient + @Override + public void clear() { + + } + + @Transient + @Override + public boolean isDirty(int fieldIndex) { + return false; + } + + @Transient + @Override + public boolean isDirty(String field) { + return false; + } + + @Transient + @Override + public void setDirty() { + + } + + @Transient + @Override + public void setDirty(int fieldIndex) { + + } + + @Transient + @Override + public void clearDirty(int fieldIndex) { + + } + + @Transient + @Override + public void clearDirty(String field) { + + } + + @Transient + @Override + public Tombstone getTombstone() { + return null; + } + + @Transient + @Override + public List<Schema.Field> getUnmanagedFields() { + return null; + } + + @Transient + @Override + public Persistent newInstance() { + return new ComplexTypes(); + } + + @Transient + @Override + public boolean isDirty() { + return false; + } + + @Transient + @Override + public void setDirty(String field) { + + } + + @Transient + @Override + public void clearDirty() { + + } } http://git-wip-us.apache.org/repos/asf/gora/blob/a9a3ad49/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/Customer.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/Customer.java b/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/Customer.java new file mode 100644 index 0000000..1b2de10 --- /dev/null +++ b/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/Customer.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.cassandra.example.generated.nativeSerialization; + +import com.datastax.driver.mapping.annotations.Field; +import com.datastax.driver.mapping.annotations.UDT; + +import java.util.List; +import java.util.Map; +import java.util.UUID; + +@UDT(name = "customer", keyspace = "nativeTestKeySpace") +public class Customer { + + public Customer() { + + } + @Field(name = "id") + private String id; + + @Field(name = "name") + private String name; + + @Field + private UUID age; + + @Field(name = "coupon_code") + private String couponCode; + + @Field(name = "address") + private String address; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getCouponCode() { + return couponCode; + } + + public void setCouponCode(String couponCode) { + this.couponCode = couponCode; + } + + public String getAddress() { + return address; + } + + public void setAddress(String address) { + this.address = address; + } + +} http://git-wip-us.apache.org/repos/asf/gora/blob/a9a3ad49/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/Document.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/Document.java b/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/Document.java new file mode 100644 index 0000000..f841a4e --- /dev/null +++ b/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/Document.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.cassandra.example.generated.nativeSerialization; + +import com.datastax.driver.mapping.annotations.Column; +import com.datastax.driver.mapping.annotations.PartitionKey; +import com.datastax.driver.mapping.annotations.Table; +import com.datastax.driver.mapping.annotations.Transient; +import org.apache.avro.Schema; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.persistency.Tombstone; + +import java.util.List; + +@Table(keyspace = "nativeTestKeySpace", name = "documents", + readConsistency = "QUORUM", + writeConsistency = "QUORUM", + caseSensitiveKeyspace = false, + caseSensitiveTable = false) +public class Document implements Persistent { + @Column + Customer customer; + @PartitionKey + @Column + String defaultId; + + public String getDefaultId() { + return defaultId; + } + + public void setDefaultId(String defaultId) { + this.defaultId = defaultId; + } + + public Customer getCustomer() { + return customer; + } + + public void setCustomer(Customer customer) { + this.customer = customer; + } + + @Transient + @Override + public void clear() { + + } + + @Transient + @Override + public boolean isDirty(int fieldIndex) { + return false; + } + + @Transient + @Override + public boolean isDirty(String field) { + return false; + } + + @Transient + @Override + public void setDirty() { + + } + + @Transient + @Override + public void setDirty(int fieldIndex) { + + } + + @Transient + @Override + public void clearDirty(int fieldIndex) { + + } + + @Transient + @Override + public void clearDirty(String field) { + + } + + @Transient + @Override + public Tombstone getTombstone() { + return null; + } + + @Transient + @Override + public List<Schema.Field> getUnmanagedFields() { + return null; + } + + @Transient + @Override + public Persistent newInstance() { + return new Document(); + } + + @Transient + @Override + public boolean isDirty() { + return false; + } + + @Transient + @Override + public void setDirty(String field) { + + } + + @Transient + @Override + public void clearDirty() { + + } + +} http://git-wip-us.apache.org/repos/asf/gora/blob/a9a3ad49/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/User.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/User.java b/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/User.java index c8d7a78..08abcd1 100644 --- a/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/User.java +++ b/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/User.java @@ -20,9 +20,13 @@ package org.apache.gora.cassandra.example.generated.nativeSerialization; import com.datastax.driver.mapping.annotations.Column; import com.datastax.driver.mapping.annotations.PartitionKey; import com.datastax.driver.mapping.annotations.Table; -import org.apache.gora.cassandra.persistent.CassandraNativePersistent; +import com.datastax.driver.mapping.annotations.Transient; +import org.apache.avro.Schema; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.persistency.Tombstone; import java.util.Date; +import java.util.List; import java.util.UUID; /** @@ -33,7 +37,7 @@ import java.util.UUID; writeConsistency = "QUORUM", caseSensitiveKeyspace = false, caseSensitiveTable = false) -public class User extends CassandraNativePersistent { +public class User implements Persistent { @PartitionKey @Column(name = "user_id") private UUID userId; @@ -75,4 +79,82 @@ public class User extends CassandraNativePersistent { public void setDateOfBirth(Date dateOfBirth) { this.dateOfBirth = dateOfBirth; } + + @Transient + @Override + public void clear() { + + } + + @Transient + @Override + public boolean isDirty(int fieldIndex) { + return false; + } + + @Transient + @Override + public boolean isDirty(String field) { + return false; + } + + @Transient + @Override + public void setDirty() { + + } + + @Transient + @Override + public void setDirty(int fieldIndex) { + + } + + @Transient + @Override + public void clearDirty(int fieldIndex) { + + } + + @Transient + @Override + public void clearDirty(String field) { + + } + + @Transient + @Override + public Tombstone getTombstone() { + return null; + } + + @Transient + @Override + public List<Schema.Field> getUnmanagedFields() { + return null; + } + + @Transient + @Override + public Persistent newInstance() { + return new User(); + } + + @Transient + @Override + public boolean isDirty() { + return false; + } + + @Transient + @Override + public void setDirty(String field) { + + } + + @Transient + @Override + public void clearDirty() { + + } } http://git-wip-us.apache.org/repos/asf/gora/blob/a9a3ad49/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/Field.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/Field.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/Field.java index 3bbda6d..237601d 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/Field.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/bean/Field.java @@ -30,6 +30,7 @@ public class Field { private String columnName; private String type; + private Map<String, String> properties; public Field() { http://git-wip-us.apache.org/repos/asf/gora/blob/a9a3ad49/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/persistent/CassandraNativePersistent.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/persistent/CassandraNativePersistent.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/persistent/CassandraNativePersistent.java deleted file mode 100644 index 9d6e103..0000000 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/persistent/CassandraNativePersistent.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gora.cassandra.persistent; - -import com.datastax.driver.mapping.annotations.Transient; -import org.apache.avro.Schema; -import org.apache.gora.persistency.Persistent; -import org.apache.gora.persistency.Tombstone; - -import java.util.List; - -/** - * This class should be used with Native Cassandra Serialization. - */ -public abstract class CassandraNativePersistent implements Persistent { - @Transient - @Override - public void clear() { - - } - - @Transient - @Override - public boolean isDirty(int fieldIndex) { - return false; - } - - @Transient - @Override - public boolean isDirty(String field) { - return false; - } - - @Transient - @Override - public void setDirty() { - - } - - @Transient - @Override - public void setDirty(int fieldIndex) { - - } - - @Transient - @Override - public void clearDirty(int fieldIndex) { - - } - - @Transient - @Override - public void clearDirty(String field) { - - } - - @Transient - @Override - public Tombstone getTombstone() { - return null; - } - - @Transient - @Override - public List<Schema.Field> getUnmanagedFields() { - return null; - } - - @Transient - @Override - public Persistent newInstance() { - return null; - } - - @Transient - @Override - public boolean isDirty() { - return false; - } - - @Transient - @Override - public void setDirty(String field) { - - } - - @Transient - @Override - public void clearDirty() { - - } -} http://git-wip-us.apache.org/repos/asf/gora/blob/a9a3ad49/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java index c3b2e59..7ad106d 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java @@ -99,8 +99,10 @@ public class CassandraResultSet<K, T extends Persistent> extends ResultBase<K, T } /** - * @param key - * @param token + * This method adds Result Element into result lists, So when user retrieves values from the Result these objects will be passed. + * + * @param key key + * @param token persistent Object */ public void addResultElement(K key, T token) { this.persistentKey.add(key); http://git-wip-us.apache.org/repos/asf/gora/blob/a9a3ad49/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java index 7baa1b1..252cf7b 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java @@ -19,7 +19,6 @@ package org.apache.gora.cassandra.serializers; import org.apache.avro.Schema; -import org.apache.avro.specific.SpecificData; import org.apache.avro.util.Utf8; import org.apache.gora.cassandra.bean.CassandraKey; import org.apache.gora.cassandra.bean.Field; @@ -91,20 +90,7 @@ class AvroCassandraUtils { switch (type) { // Record can be persist with two ways, udt and bytes case RECORD: - PersistentBase persistent = (PersistentBase) fieldValue; - if (field.getType().contains("frozen")) { - PersistentBase newRecord = (PersistentBase) SpecificData.get().newRecord(persistent, persistent.getSchema()); - for (Schema.Field member : fieldSchema.getFields()) { - if (member.pos() == 0 || !persistent.isDirty()) { - continue; - } - Schema memberSchema = member.schema(); - Schema.Type memberType = memberSchema.getType(); - Object memberValue = persistent.get(member.pos()); - newRecord.put(member.pos(), getFieldValueFromAvroBean(memberSchema, memberType, memberValue, field)); - } - fieldValue = newRecord; - } else if (field.getType().contains("blob")) { + if (field.getType().contains("blob")) { try { byte[] serializedBytes = HBaseByteInterface.toBytes(fieldValue, fieldSchema); fieldValue = ByteBuffer.wrap(serializedBytes); @@ -112,13 +98,13 @@ class AvroCassandraUtils { LOG.error("Error occurred when serializing {} field. {}", new Object[]{field.getFieldName(), e.getMessage()}); } } else { - throw new RuntimeException(""); + throw new RuntimeException("Unsupported Data Type for Record, Currently Supported Data Types are blob and UDT for Records"); } break; case MAP: Schema valueSchema = fieldSchema.getValueType(); Schema.Type valuetype = valueSchema.getType(); - HashMap<String, Object> map = new HashMap<>(); + Map<String, Object> map = new HashMap<>(); for (Map.Entry<CharSequence, ?> e : ((Map<CharSequence, ?>) fieldValue).entrySet()) { String mapKey = e.getKey().toString(); Object mapValue = e.getValue(); http://git-wip-us.apache.org/repos/asf/gora/blob/a9a3ad49/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java index 57d03f1..58b57dc 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java @@ -17,11 +17,15 @@ package org.apache.gora.cassandra.serializers; +import com.datastax.driver.core.AbstractGettableData; import com.datastax.driver.core.ColumnDefinitions; import com.datastax.driver.core.DataType; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; +import com.datastax.driver.core.UDTValue; +import com.datastax.driver.core.UserType; import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificData; import org.apache.commons.lang.ArrayUtils; import org.apache.gora.cassandra.bean.CassandraKey; import org.apache.gora.cassandra.bean.Field; @@ -36,10 +40,16 @@ import org.apache.gora.store.DataStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Date; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.UUID; /** * This class contains the operations relates to Avro Serialization. @@ -51,8 +61,8 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer { private DataStore<K, T> cassandraDataStore; - AvroSerializer(CassandraClient cassandraClient, DataStore<K, T> dataStore, CassandraMapping mapping) { - super(cassandraClient, dataStore.getKeyClass(), dataStore.getPersistentClass(), mapping); + AvroSerializer(CassandraClient cassandraClient, DataStore<K, T> dataStore, CassandraMapping mapping, Schema schema) { + super(cassandraClient, dataStore.getKeyClass(), dataStore.getPersistentClass(), mapping, schema); this.cassandraDataStore = dataStore; } @@ -71,7 +81,7 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer { T obj = null; if (iterator.hasNext()) { obj = cassandraDataStore.newPersistent(); - Row row = iterator.next(); + AbstractGettableData row = (AbstractGettableData) iterator.next(); populateValuesToPersistent(row, definitions, obj, fields); } return obj; @@ -94,7 +104,35 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer { } if (persistent.isDirty(f.pos()) || mapping.getInlinedDefinedPartitionKey().equals(mapping.getFieldFromFieldName(fieldName))) { Object value = persistentBase.get(f.pos()); - value = AvroCassandraUtils.getFieldValueFromAvroBean(f.schema(), f.schema().getType(), value, field); + String fieldType = field.getType(); + if (fieldType.contains("frozen")) { + fieldType = fieldType.substring(fieldType.indexOf("<") + 1, fieldType.indexOf(">")); + UserType userType = client.getSession().getCluster().getMetadata().getKeyspace(mapping.getKeySpace().getName()).getUserType(fieldType); + UDTValue udtValue = userType.newValue(); + Schema udtSchema = f.schema(); + if (udtSchema.getType().equals(Schema.Type.UNION)) { + for (Schema schema : udtSchema.getTypes()) { + if (schema.getType().equals(Schema.Type.RECORD)) { + udtSchema = schema; + break; + } + } + } + PersistentBase udtObjectBase = (PersistentBase) value; + for (Schema.Field udtField : udtSchema.getFields()) { + Object udtFieldValue = AvroCassandraUtils.getFieldValueFromAvroBean(udtField.schema(), udtField.schema().getType(), udtObjectBase.get(udtField.name()), field); + if (udtField.schema().getType().equals(Schema.Type.MAP)) { + udtValue.setMap(udtField.name(), (Map) udtFieldValue); + } else if (udtField.schema().getType().equals(Schema.Type.ARRAY)) { + udtValue.setList(udtField.name(), (List) udtFieldValue); + } else { + udtValue.set(udtField.name(), udtFieldValue, (Class) udtFieldValue.getClass()); + } + } + value = udtValue; + } else { + value = AvroCassandraUtils.getFieldValueFromAvroBean(f.schema(), f.schema().getType(), value, field); + } values.add(value); fields.add(fieldName); } @@ -122,7 +160,7 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer { T obj = null; if (iterator.hasNext()) { obj = cassandraDataStore.newPersistent(); - Row row = iterator.next(); + AbstractGettableData row = (AbstractGettableData) iterator.next(); populateValuesToPersistent(row, definitions, obj, mapping.getFieldNames()); } return obj; @@ -131,7 +169,7 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer { /** * This method wraps result set data in to DataEntry and creates a list of DataEntry. **/ - private void populateValuesToPersistent(Row row, ColumnDefinitions columnDefinitions, PersistentBase base, String[] fields) { + private void populateValuesToPersistent(AbstractGettableData row, ColumnDefinitions columnDefinitions, PersistentBase base, String[] fields) { Object paramValue; for (String fieldName : fields) { Schema.Field avroField = base.getSchema().getField(fieldName); @@ -142,16 +180,15 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer { } Schema fieldSchema = avroField.schema(); String columnName = field.getColumnName(); - paramValue = getValue(row, columnDefinitions, columnName); + paramValue = getValue(row, columnDefinitions.getType(columnName), columnName, fieldSchema); Object value = AvroCassandraUtils.getAvroFieldValue(paramValue, fieldSchema); base.put(avroField.pos(), value); } } - private Object getValue(Row row, ColumnDefinitions columnDefinitions, String columnName) { + private Object getValue(AbstractGettableData row, DataType columnType, String columnName, Schema schema) { Object paramValue; - Field field = mapping.getFieldFromColumnName(columnName); - DataType columnType = columnDefinitions.getType(columnName); + String dataType; switch (columnType.getName()) { case ASCII: paramValue = row.getString(columnName); @@ -202,24 +239,33 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer { paramValue = row.isNull(columnName) ? null : row.getUUID(columnName); break; case LIST: - String dataType = field.getType(); - dataType = dataType.substring(dataType.indexOf("<") + 1, dataType.indexOf(">")); + dataType = columnType.getTypeArguments().get(0).toString(); paramValue = row.isNull(columnName) ? null : row.getList(columnName, getRelevantClassForCassandraDataType(dataType)); break; case SET: - dataType = field.getType(); - dataType = dataType.substring(dataType.indexOf("<") + 1, dataType.indexOf(">")); + dataType = columnType.getTypeArguments().get(0).toString(); paramValue = row.isNull(columnName) ? null : row.getList(columnName, getRelevantClassForCassandraDataType(dataType)); break; case MAP: - dataType = field.getType(); - dataType = dataType.substring(dataType.indexOf("<") + 1, dataType.indexOf(">")); - dataType = dataType.split(",")[1]; + dataType = columnType.getTypeArguments().get(1).toString(); // Avro supports only String for keys paramValue = row.isNull(columnName) ? null : row.getMap(columnName, String.class, getRelevantClassForCassandraDataType(dataType)); break; case UDT: paramValue = row.isNull(columnName) ? null : row.getUDTValue(columnName); + if (paramValue != null) { + try { + PersistentBase udtObject = (PersistentBase) SpecificData.newInstance(Class.forName(schema.getFullName()), schema); + for (Schema.Field f : udtObject.getSchema().getFields()) { + DataType dType = ((UDTValue) paramValue).getType().getFieldType(f.name()); + Object fieldValue = getValue((UDTValue) paramValue, dType, f.name(), f.schema()); + udtObject.put(f.pos(), fieldValue); + } + paramValue = udtObject; + } catch (ClassNotFoundException e) { + throw new RuntimeException("Error occurred while populating data to " + schema.getFullName() + " : " + e.getMessage()); + } + } break; case TUPLE: paramValue = row.isNull(columnName) ? null : row.getTupleValue(columnName).toString(); @@ -234,35 +280,35 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer { return paramValue; } -/* public Collection<Object> getFieldValues(Object o) { - UDTValue udtValue = (UDTValue) o; - UserType type = udtValue.getType(); - - Collection<Object> values = new ArrayList<Object>(type.size()); - - *//* for (UserType.Field field : type) { - udtValue. - ByteBuffer bytes = udtValue.getBytesUnsafe(field.getName()); - DataType value = field.getType(); - for(DataType type1 : value.getTypeArguments()) { - type1. - } - values.add(value); - }*//* - - return values; - }*/ - - private Class getRelevantClassForCassandraDataType(String dataType) { switch (dataType) { - //// TODO: 7/25/17 support all the datatypes case "ascii": case "text": case "varchar": return String.class; case "blob": return ByteBuffer.class; + case "int": + return Integer.class; + case "double": + return Double.class; + case "bigint": + case "counter": + return Long.class; + case "decimal": + return BigDecimal.class; + case "float": + return Float.class; + case "boolean": + return Boolean.class; + case "inet": + return InetAddress.class; + case "varint": + return BigInteger.class; + case "uuid": + return UUID.class; + case "timestamp": + return Date.class; default: throw new RuntimeException("Invalid Cassandra DataType"); } @@ -302,7 +348,7 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer { K keyObject; CassandraKey cassandraKey = mapping.getCassandraKey(); while (iterator.hasNext()) { - Row row = iterator.next(); + AbstractGettableData row = (AbstractGettableData) iterator.next(); obj = cassandraDataStore.newPersistent(); keyObject = cassandraDataStore.newKey(); populateValuesToPersistent(row, definitions, obj, fields); @@ -310,7 +356,7 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer { populateValuesToPersistent(row, definitions, (PersistentBase) keyObject, cassandraKey.getFieldNames()); } else { Field key = mapping.getInlinedDefinedPartitionKey(); - keyObject = (K) getValue(row, definitions, key.getColumnName()); + keyObject = (K) getValue(row, definitions.getType(key.getColumnName()), key.getColumnName(), null); } cassandraResult.addResultElement(keyObject, obj); } http://git-wip-us.apache.org/repos/asf/gora/blob/a9a3ad49/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java index 10c8f68..184955c 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java @@ -16,28 +16,31 @@ */ package org.apache.gora.cassandra.serializers; +import com.datastax.driver.core.querybuilder.BuiltStatement; import com.datastax.driver.core.querybuilder.Delete; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.datastax.driver.core.querybuilder.Select; import com.datastax.driver.core.querybuilder.Update; +import com.datastax.driver.mapping.annotations.UDT; import org.apache.avro.Schema; import org.apache.gora.cassandra.bean.CassandraKey; import org.apache.gora.cassandra.bean.ClusterKeyField; import org.apache.gora.cassandra.bean.Field; import org.apache.gora.cassandra.bean.KeySpace; import org.apache.gora.cassandra.bean.PartitionKeyField; -import org.apache.gora.cassandra.persistent.CassandraNativePersistent; import org.apache.gora.cassandra.query.CassandraQuery; import org.apache.gora.cassandra.store.CassandraMapping; +import org.apache.gora.cassandra.store.CassandraStore; import org.apache.gora.query.Query; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.Set; /** * This class is used create Cassandra Queries. @@ -95,14 +98,12 @@ class CassandraQueryFactory { static String getCreateTableQuery(CassandraMapping mapping) { StringBuilder stringBuffer = new StringBuilder(); stringBuffer.append("CREATE TABLE IF NOT EXISTS ").append(mapping.getKeySpace().getName()).append(".").append(mapping.getCoreName()).append(" ("); - boolean isCommaNeeded = false; CassandraKey cassandraKey = mapping.getCassandraKey(); // appending Cassandra Persistent columns into db schema - processFieldsForCreateTableQuery(mapping.getFieldList(), isCommaNeeded, stringBuffer); + processFieldsForCreateTableQuery(mapping.getFieldList(), false, stringBuffer); if (cassandraKey != null) { - isCommaNeeded = true; - processFieldsForCreateTableQuery(cassandraKey.getFieldList(), isCommaNeeded, stringBuffer); + processFieldsForCreateTableQuery(cassandraKey.getFieldList(), true, stringBuffer); List<PartitionKeyField> partitionKeys = cassandraKey.getPartitionKeyFields(); if (partitionKeys != null) { stringBuffer.append(", PRIMARY KEY ("); @@ -253,17 +254,29 @@ class CassandraQueryFactory { String[] objects = new String[fields.size()]; Arrays.fill(objects, "?"); Delete delete = QueryBuilder.delete().from(mapping.getKeySpace().getName(), mapping.getCoreName()); - Delete.Where query = null; + return processKeys(columnNames, delete); + } + + private static String processKeys(String[] columnNames, BuiltStatement delete) { + BuiltStatement query = null; boolean isWhereNeeded = true; for (String columnName : columnNames) { if (isWhereNeeded) { - query = delete.where(QueryBuilder.eq(columnName, "?")); + if (delete instanceof Delete) { + query = ((Delete) delete).where(QueryBuilder.eq(columnName, "?")); + } else { + query = ((Select) delete).where(QueryBuilder.eq(columnName, "?")); + } isWhereNeeded = false; } else { - query = query.and(QueryBuilder.eq(columnName, "?")); + if (delete instanceof Delete) { + query = ((Delete.Where) query).and(QueryBuilder.eq(columnName, "?")); + } else { + query = ((Select.Where) query).and(QueryBuilder.eq(columnName, "?")); + } } } - return query.getQueryString(); + return query != null ? query.getQueryString() : null; } /** @@ -280,17 +293,7 @@ class CassandraQueryFactory { select.allowFiltering(); } String[] columnNames = getColumnNames(mapping, keyFields); - Select.Where query = null; - boolean isWhereNeeded = true; - for (String columnName : columnNames) { - if (isWhereNeeded) { - query = select.where(QueryBuilder.eq(columnName, "?")); - isWhereNeeded = false; - } else { - query = query.and(QueryBuilder.eq(columnName, "?")); - } - } - return query.getQueryString(); + return processKeys(columnNames, select); } /** @@ -309,17 +312,7 @@ class CassandraQueryFactory { select.allowFiltering(); } String[] columnNames = getColumnNames(mapping, keyFields); - Select.Where query = null; - boolean isWhereNeeded = true; - for (String columnName : columnNames) { - if (isWhereNeeded) { - query = select.where(QueryBuilder.eq(columnName, "?")); - isWhereNeeded = false; - } else { - query = query.and(QueryBuilder.eq(columnName, "?")); - } - } - return query.getQueryString(); + return processKeys(columnNames, select); } /** @@ -373,10 +366,6 @@ class CassandraQueryFactory { * @return CQL Query */ static String getExecuteQuery(CassandraMapping mapping, Query cassandraQuery, List<Object> objects, String[] fields) { - Object startKey = cassandraQuery.getStartKey(); - Object endKey = cassandraQuery.getEndKey(); - Object key = cassandraQuery.getKey(); - String primaryKey = null; long limit = cassandraQuery.getLimit(); Select select = QueryBuilder.select(getColumnNames(mapping, Arrays.asList(fields))).from(mapping.getKeySpace().getName(), mapping.getCoreName()); if (limit > 0) { @@ -385,7 +374,15 @@ class CassandraQueryFactory { if (Boolean.parseBoolean(mapping.getProperty("allowFiltering"))) { select.allowFiltering(); } - Select.Where query = null; + return processQuery(cassandraQuery, select, mapping, objects); + } + + private static String processQuery(Query cassandraQuery, BuiltStatement select, CassandraMapping mapping, List<Object> objects) { + String primaryKey = null; + BuiltStatement query = null; + Object startKey = cassandraQuery.getStartKey(); + Object endKey = cassandraQuery.getEndKey(); + Object key = cassandraQuery.getKey(); boolean isWhereNeeded = true; if (key != null) { if (mapping.getCassandraKey() != null) { @@ -395,17 +392,35 @@ class CassandraQueryFactory { String[] columnKeys = getColumnNames(mapping, cassandraKeys); for (int i = 0; i < cassandraKeys.size(); i++) { if (isWhereNeeded) { - query = select.where(QueryBuilder.eq(columnKeys[i], "?")); + if (select instanceof Select) { + query = ((Select) select).where(QueryBuilder.eq(columnKeys[i], "?")); + } else if (select instanceof Delete) { + query = ((Delete) select).where(QueryBuilder.eq(columnKeys[i], "?")); + } else { + query = ((Update.Assignments) select).where(QueryBuilder.eq(columnKeys[i], "?")); + } objects.add(cassandraValues.get(i)); isWhereNeeded = false; } else { - query = query.and(QueryBuilder.eq(columnKeys[i], "?")); + if (select instanceof Select) { + query = ((Select.Where) query).and(QueryBuilder.eq(columnKeys[i], "?")); + } else if (select instanceof Delete) { + query = ((Delete.Where) query).and(QueryBuilder.eq(columnKeys[i], "?")); + } else { + query = ((Update.Where) query).and(QueryBuilder.eq(columnKeys[i], "?")); + } objects.add(cassandraValues.get(i)); } } } else { primaryKey = getPKey(mapping.getFieldList()); - query = select.where(QueryBuilder.eq(primaryKey, "?")); + if (select instanceof Select) { + query = ((Select) select).where(QueryBuilder.eq(primaryKey, "?")); + } else if (select instanceof Delete) { + query = ((Delete) select).where(QueryBuilder.eq(primaryKey, "?")); + } else { + query = ((Update.Assignments) select).where(QueryBuilder.eq(primaryKey, "?")); + } objects.add(key); } } else { @@ -417,17 +432,44 @@ class CassandraQueryFactory { String[] columnKeys = getColumnNames(mapping, cassandraKeys); for (int i = 0; i < cassandraKeys.size(); i++) { if (isWhereNeeded) { - query = select.where(QueryBuilder.gte(columnKeys[i], "?")); + if (select instanceof Select) { + query = ((Select) select).where(QueryBuilder.gte(columnKeys[i], "?")); + } else if (select instanceof Delete) { + /* + According to the JIRA https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but It seems this not fixed yet. + */ + throw new RuntimeException("Delete by Query is not suppoted for Key Ranges."); + } else { + query = ((Update.Assignments) select).where(QueryBuilder.gte(columnKeys[i], "?")); + } objects.add(cassandraValues.get(i)); isWhereNeeded = false; } else { - query = query.and(QueryBuilder.gte(columnKeys[i], "?")); + if (select instanceof Select) { + query = ((Select.Where) query).and(QueryBuilder.gte(columnKeys[i], "?")); + } else if (select instanceof Delete) { + /* + According to the JIRA https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but It seems this not fixed yet. + */ + throw new RuntimeException("Delete by Query is not suppoted for Key Ranges."); + } else { + query = ((Update.Where) query).and(QueryBuilder.gte(columnKeys[i], "?")); + } objects.add(cassandraValues.get(i)); } } } else { primaryKey = getPKey(mapping.getFieldList()); - query = select.where(QueryBuilder.gte(primaryKey, "?")); + if (select instanceof Select) { + query = ((Select) select).where(QueryBuilder.gte(primaryKey, "?")); + } else if (select instanceof Delete) { + /* + According to the JIRA https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but It seems this not fixed yet. + */ + throw new RuntimeException("Delete by Query is not suppoted for Key Ranges."); + } else { + query = ((Update.Assignments) select).where(QueryBuilder.gte(primaryKey, "?")); + } objects.add(startKey); isWhereNeeded = false; } @@ -440,20 +482,56 @@ class CassandraQueryFactory { String[] columnKeys = getColumnNames(mapping, cassandraKeys); for (int i = 0; i < cassandraKeys.size(); i++) { if (isWhereNeeded) { - query = select.where(QueryBuilder.lte(columnKeys[i], "?")); + if (select instanceof Select) { + query = ((Select) select).where(QueryBuilder.lte(columnKeys[i], "?")); + } else if (select instanceof Delete) { + /* + According to the JIRA https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but It seems this not fixed yet. + */ + throw new RuntimeException("Delete by Query is not suppoted for Key Ranges."); + } else { + query = ((Update.Assignments) select).where(QueryBuilder.lte(columnKeys[i], "?")); + } objects.add(cassandraValues.get(i)); isWhereNeeded = false; } else { - query = query.and(QueryBuilder.lte(columnKeys[i], "?")); + if (select instanceof Select) { + query = ((Select.Where) query).and(QueryBuilder.lte(columnKeys[i], "?")); + } else if (select instanceof Delete) { + /* + According to the JIRA https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but It seems this not fixed yet. + */ + throw new RuntimeException("Delete by Query is not suppoted for Key Ranges."); + } else { + query = ((Update.Where) query).and(QueryBuilder.lte(columnKeys[i], "?")); + } objects.add(cassandraValues.get(i)); } } } else { primaryKey = primaryKey != null ? primaryKey : getPKey(mapping.getFieldList()); if (isWhereNeeded) { - query = select.where(QueryBuilder.lte(primaryKey, "?")); + if (select instanceof Select) { + query = ((Select) select).where(QueryBuilder.lte(primaryKey, "?")); + } else if (select instanceof Delete) { + /* + According to the JIRA https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but It seems this not fixed yet. + */ + throw new RuntimeException("Delete by Query is not suppoted for Key Ranges."); + } else { + query = ((Update.Assignments) select).where(QueryBuilder.lte(primaryKey, "?")); + } } else { - query = query.and(QueryBuilder.lte(primaryKey, "?")); + if (select instanceof Select) { + query = ((Select.Where) query).and(QueryBuilder.lte(primaryKey, "?")); + } else if (select instanceof Delete) { + /* + According to the JIRA https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but It seems this not fixed yet. + */ + throw new RuntimeException("Delete by Query is not suppoted for Key Ranges."); + } else { + query = ((Update.Where) query).and(QueryBuilder.lte(primaryKey, "?")); + } } objects.add(endKey); } @@ -462,7 +540,7 @@ class CassandraQueryFactory { if (startKey == null && endKey == null && key == null) { return select.getQueryString(); } - return query.getQueryString(); + return query != null ? query.getQueryString() : null; } private static String[] getColumnNames(CassandraMapping mapping, List<String> fields) { @@ -509,94 +587,13 @@ class CassandraQueryFactory { if (cassandraQuery.getFields() != null) { columns = getColumnNames(mapping, Arrays.asList(cassandraQuery.getFields())); } - Object startKey = cassandraQuery.getStartKey(); - Object endKey = cassandraQuery.getEndKey(); - Object key = cassandraQuery.getKey(); - String primaryKey = null; Delete delete; if (columns != null) { delete = QueryBuilder.delete(columns).from(mapping.getKeySpace().getName(), mapping.getCoreName()); } else { delete = QueryBuilder.delete().from(mapping.getKeySpace().getName(), mapping.getCoreName()); } - Delete.Where query = null; - boolean isWhereNeeded = true; - if (key != null) { - if (mapping.getCassandraKey() != null) { - ArrayList<String> cassandraKeys = new ArrayList<>(); - ArrayList<Object> cassandraValues = new ArrayList<>(); - AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues); - String[] columnKeys = getColumnNames(mapping, cassandraKeys); - for (int i = 0; i < cassandraKeys.size(); i++) { - if (isWhereNeeded) { - query = delete.where(QueryBuilder.eq(columnKeys[i], "?")); - objects.add(cassandraValues.get(i)); - isWhereNeeded = false; - } else { - query = query.and(QueryBuilder.eq(columnKeys[i], "?")); - objects.add(cassandraValues.get(i)); - } - } - } else { - primaryKey = getPKey(mapping.getFieldList()); - query = delete.where(QueryBuilder.eq(primaryKey, "?")); - objects.add(key); - } - } else { - if (startKey != null) { - if (mapping.getCassandraKey() != null) { - ArrayList<String> cassandraKeys = new ArrayList<>(); - ArrayList<Object> cassandraValues = new ArrayList<>(); - AvroCassandraUtils.processKeys(mapping, startKey, cassandraKeys, cassandraValues); - String[] columnKeys = getColumnNames(mapping, cassandraKeys); - for (int i = 0; i < cassandraKeys.size(); i++) { - if (isWhereNeeded) { - query = delete.where(QueryBuilder.gte(QueryBuilder.token(columnKeys[i]), QueryBuilder.token("?"))); - objects.add(cassandraValues.get(i)); - isWhereNeeded = false; - } else { - query = query.and(QueryBuilder.gte(QueryBuilder.token(columnKeys[i]), QueryBuilder.token("?"))); - objects.add(cassandraValues.get(i)); - } - } - } else { - primaryKey = getPKey(mapping.getFieldList()); - query = delete.where(QueryBuilder.gte(QueryBuilder.token(primaryKey), QueryBuilder.token("?"))); - objects.add(startKey); - isWhereNeeded = false; - } - } - if (endKey != null) { - if (mapping.getCassandraKey() != null) { - ArrayList<String> cassandraKeys = new ArrayList<>(); - ArrayList<Object> cassandraValues = new ArrayList<>(); - AvroCassandraUtils.processKeys(mapping, endKey, cassandraKeys, cassandraValues); - String[] columnKeys = getColumnNames(mapping, cassandraKeys); - for (int i = 0; i < cassandraKeys.size(); i++) { - if (isWhereNeeded) { - query = delete.where(QueryBuilder.lte(QueryBuilder.token(columnKeys[i]), QueryBuilder.token("?"))); - objects.add(cassandraValues.get(i)); - isWhereNeeded = false; - } else { - query = query.and(QueryBuilder.lte(QueryBuilder.token(columnKeys[i]), QueryBuilder.token("?"))); - objects.add(cassandraValues.get(i)); - } - } - } else { - primaryKey = primaryKey != null ? primaryKey : getPKey(mapping.getFieldList()); - if (isWhereNeeded) { - query = delete.where(QueryBuilder.lte(QueryBuilder.token(primaryKey), QueryBuilder.token("?"))); - } else { - query = query.and(QueryBuilder.lte(QueryBuilder.token(primaryKey), QueryBuilder.token("?"))); - } - objects.add(endKey); - } - } - } - if (startKey == null && endKey == null && key == null) { - return delete.getQueryString(); - } - return query.getQueryString(); + return processQuery(cassandraQuery, delete, mapping, objects); } /** @@ -608,12 +605,12 @@ class CassandraQueryFactory { * @param objects field Objects list * @return CQL Query */ - static String getUpdateByQuery(CassandraMapping mapping, Query cassandraQuery, List<Object> objects) { + static String getUpdateByQuery(CassandraMapping mapping, Query cassandraQuery, List<Object> objects, Schema schema) { Update update = QueryBuilder.update(mapping.getKeySpace().getName(), mapping.getCoreName()); Update.Assignments updateAssignments = null; if (cassandraQuery instanceof CassandraQuery) { String[] columnNames = getColumnNames(mapping, Arrays.asList(cassandraQuery.getFields())); - if (CassandraNativePersistent.class.isAssignableFrom(mapping.getPersistentClass())) { + if (((CassandraStore) cassandraQuery.getDataStore()).getSerializationType().equalsIgnoreCase("NATIVE")) { for (String column : columnNames) { updateAssignments = update.with(QueryBuilder.set(column, "?")); objects.add(((CassandraQuery) cassandraQuery).getUpdateFieldValue(mapping.getFieldFromColumnName(column).getFieldName())); @@ -624,159 +621,33 @@ class CassandraQueryFactory { Field field = mapping.getFieldFromColumnName(column); Object value = ((CassandraQuery) cassandraQuery).getUpdateFieldValue(field.getFieldName()); try { - Schema schema = (Schema) mapping.getPersistentClass().getField("SCHEMA$").get(null); Schema schemaField = schema.getField(field.getFieldName()).schema(); objects.add(AvroCassandraUtils.getFieldValueFromAvroBean(schemaField, schemaField.getType(), value, field)); - } catch (IllegalAccessException | NoSuchFieldException e) { - throw new RuntimeException("SCHEMA$ field can't accessible, Please recompile the Avro schema with goracompiler."); } catch (NullPointerException e) { throw new RuntimeException(field + " field couldn't find in the class " + mapping.getPersistentClass() + "."); } } } - } - String primaryKey = null; - Update.Where query = null; - Object startKey = cassandraQuery.getStartKey(); - Object endKey = cassandraQuery.getEndKey(); - Object key = cassandraQuery.getKey(); - boolean isWhereNeeded = true; - if (key != null) { - if (mapping.getCassandraKey() != null) { - ArrayList<String> cassandraKeys = new ArrayList<>(); - ArrayList<Object> cassandraValues = new ArrayList<>(); - AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues); - String[] columnKeys = getColumnNames(mapping, cassandraKeys); - for (int i = 0; i < cassandraKeys.size(); i++) { - if (isWhereNeeded) { - query = updateAssignments.where(QueryBuilder.eq(columnKeys[i], "?")); - objects.add(cassandraValues.get(i)); - isWhereNeeded = false; - } else { - query = query.and(QueryBuilder.eq(columnKeys[i], "?")); - objects.add(cassandraValues.get(i)); - } - } - } else { - primaryKey = getPKey(mapping.getFieldList()); - query = updateAssignments.where(QueryBuilder.eq(primaryKey, "?")); - objects.add(key); - } } else { - if (startKey != null) { - if (mapping.getCassandraKey() != null) { - ArrayList<String> cassandraKeys = new ArrayList<>(); - ArrayList<Object> cassandraValues = new ArrayList<>(); - AvroCassandraUtils.processKeys(mapping, startKey, cassandraKeys, cassandraValues); - String[] columnKeys = getColumnNames(mapping, cassandraKeys); - for (int i = 0; i < cassandraKeys.size(); i++) { - if (isWhereNeeded) { - query = updateAssignments.where(QueryBuilder.gte(columnKeys[i], "?")); - objects.add(cassandraValues.get(i)); - isWhereNeeded = false; - } else { - query = query.and(QueryBuilder.gte(columnKeys[i], "?")); - objects.add(cassandraValues.get(i)); - } - } - } else { - primaryKey = getPKey(mapping.getFieldList()); - query = updateAssignments.where(QueryBuilder.gte(primaryKey, "?")); - objects.add(startKey); - isWhereNeeded = false; - } - } - if (endKey != null) { - if (mapping.getCassandraKey() != null) { - ArrayList<String> cassandraKeys = new ArrayList<>(); - ArrayList<Object> cassandraValues = new ArrayList<>(); - AvroCassandraUtils.processKeys(mapping, endKey, cassandraKeys, cassandraValues); - String[] columnKeys = getColumnNames(mapping, cassandraKeys); - for (int i = 0; i < cassandraKeys.size(); i++) { - if (isWhereNeeded) { - query = updateAssignments.where(QueryBuilder.lte(columnKeys[i], "?")); - objects.add(cassandraValues.get(i)); - isWhereNeeded = false; - } else { - query = query.and(QueryBuilder.lte(columnKeys[i], "?")); - objects.add(cassandraValues.get(i)); - } - } - } else { - primaryKey = primaryKey != null ? primaryKey : getPKey(mapping.getFieldList()); - if (isWhereNeeded) { - query = updateAssignments.where(QueryBuilder.lte(primaryKey, "?")); - } else { - query = query.and(QueryBuilder.lte(primaryKey, "?")); - } - objects.add(endKey); - } - } - } - if (startKey == null && endKey == null && key == null) { - return updateAssignments.getQueryString(); + throw new RuntimeException("Please use Cassandra Query object to invoke, UpdateByQuery method."); } - return query.getQueryString(); - } - /** - * This method returns create Type CQL query to create user define types. - * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/cqlRefcreateType.html - * - * @param fieldSchema avroSchema {@link Schema} - * @param mapping Cassandra mapping {@link CassandraMapping} - * @return CQL Query - */ - static String getCreateUDTType(Schema fieldSchema, CassandraMapping mapping, Set<String> udtQueryStack) { - StringBuilder stringBuffer = new StringBuilder(); - if (fieldSchema.getType().equals(Schema.Type.UNION)) { - for (Schema fieldTypeSchema : fieldSchema.getTypes()) { - if (fieldTypeSchema.getType().equals(Schema.Type.RECORD)) { - fieldSchema = fieldTypeSchema; - break; - } - } - } - stringBuffer.append("CREATE TYPE IF NOT EXISTS ").append(mapping.getKeySpace().getName()).append(".").append(fieldSchema.getName()).append(" ("); - processRecord(fieldSchema, stringBuffer, mapping, udtQueryStack); - stringBuffer.append(")"); - return stringBuffer.toString(); + return processQuery(cassandraQuery, updateAssignments, mapping, objects); } - private static void processRecord(Schema recordSchema, StringBuilder stringBuilder, CassandraMapping mapping, Set<String> udtQueryStack) { - boolean isCommaNeeded = false; - for (Schema.Field field : recordSchema.getFields()) { - if (isCommaNeeded) { - stringBuilder.append(", "); - } - String fieldName = field.name(); - stringBuilder.append(fieldName).append(" "); - try { - populateFieldsToQuery(field.schema(), stringBuilder, mapping, udtQueryStack); - isCommaNeeded = true; - } catch (Exception e) { - int i = stringBuilder.indexOf(fieldName); - if (i != -1) { - stringBuilder.delete(i, i + fieldName.length()); - isCommaNeeded = false; - } - } - } - } - - private static void populateFieldsToQuery(Schema schema, StringBuilder builder, CassandraMapping mapping, Set<String> udtQueryStack) throws Exception { + private static void populateFieldsToQuery(Schema schema, StringBuilder builder) throws Exception { switch (schema.getType()) { case INT: builder.append("int"); break; case MAP: builder.append("map<text,"); - populateFieldsToQuery(schema.getValueType(), builder, mapping, udtQueryStack); + populateFieldsToQuery(schema.getValueType(), builder); builder.append(">"); break; case ARRAY: builder.append("list<"); - populateFieldsToQuery(schema.getElementType(), builder, mapping, udtQueryStack); + populateFieldsToQuery(schema.getElementType(), builder); builder.append(">"); break; case LONG: @@ -796,8 +667,6 @@ class CassandraQueryFactory { break; case RECORD: builder.append("frozen<").append(schema.getName()).append(">"); - String query = getCreateUDTType(schema, mapping, udtQueryStack); - udtQueryStack.add(query); break; case STRING: case FIXED: @@ -810,15 +679,13 @@ class CassandraQueryFactory { String recordName = unionElementSchema.getName(); if (!builder.toString().contains(recordName)) { builder.append("frozen<").append(recordName).append(">"); - query = getCreateUDTType(unionElementSchema, mapping, udtQueryStack); - udtQueryStack.add(query); } else { LOG.warn("Same Field Type can't be mapped recursively. This is not supported with Cassandra UDT types, Please use byte dataType for recursive mapping."); throw new Exception("Same Field Type has mapped recursively"); } break; } else if (!unionElementSchema.getType().equals(Schema.Type.NULL)) { - populateFieldsToQuery(unionElementSchema, builder, mapping, udtQueryStack); + populateFieldsToQuery(unionElementSchema, builder); break; } } @@ -826,4 +693,127 @@ class CassandraQueryFactory { } } + static void processRecord(Schema recordSchema, StringBuilder stringBuilder) { + boolean isCommaNeeded = false; + for (Schema.Field field : recordSchema.getFields()) { + if (isCommaNeeded) { + stringBuilder.append(", "); + } + String fieldName = field.name(); + stringBuilder.append(fieldName).append(" "); + try { + populateFieldsToQuery(field.schema(), stringBuilder); + isCommaNeeded = true; + } catch (Exception e) { + int i = stringBuilder.indexOf(fieldName); + if (i != -1) { + stringBuilder.delete(i, i + fieldName.length()); + isCommaNeeded = false; + } + } + } + } + + static String getCreateUDTTypeForNative(CassandraMapping mapping, Class persistentClass, String udtType, String fieldName) throws NoSuchFieldException { + StringBuilder stringBuffer = new StringBuilder(); + Class udtClass = persistentClass.getDeclaredField(fieldName).getType(); + UDT annotation = (UDT) udtClass.getAnnotation(UDT.class); + if (annotation != null) { + stringBuffer.append("CREATE TYPE IF NOT EXISTS ").append(mapping.getKeySpace().getName()).append(".").append(udtType).append(" ("); + boolean isCommaNeeded = false; + for (java.lang.reflect.Field udtField : udtClass.getDeclaredFields()) { + com.datastax.driver.mapping.annotations.Field fieldAnnotation = udtField.getDeclaredAnnotation(com.datastax.driver.mapping.annotations.Field.class); + if (fieldAnnotation != null) { + if (isCommaNeeded) { + stringBuffer.append(", "); + } + if(!fieldAnnotation.name().isEmpty()) { + stringBuffer.append(fieldAnnotation.name()).append(" "); + } else { + stringBuffer.append(udtField.getName()).append(" "); + } + stringBuffer.append(dataType(udtField, null)); + isCommaNeeded = true; + } + } + stringBuffer.append(")"); + } else { + throw new RuntimeException(""); + } + return stringBuffer.toString(); + } + + static String getCreateUDTTypeForAvro(CassandraMapping mapping, String udtType, Schema fieldSchema) { + StringBuilder stringBuffer = new StringBuilder(); + stringBuffer.append("CREATE TYPE IF NOT EXISTS ").append(mapping.getKeySpace().getName()).append(".").append(udtType).append(" ("); + CassandraQueryFactory.processRecord(fieldSchema, stringBuffer); + stringBuffer.append(")"); + return stringBuffer.toString(); + } + + private static String dataType(java.lang.reflect.Field field, Type fieldType) { + String type; + if (field != null) { + type = field.getType().getName(); + } else { + type = fieldType.getTypeName(); + } + String dataType; + String s = type; + if (s.equals("java.lang.String") || s.equals("java.lang.CharSequence")) { + dataType = "text"; + } else if (s.equals("int") || s.equals("java.lang.Integer")) { + dataType = "int"; + } else if (s.equals("double") || s.equals("java.lang.Double")) { + dataType = "double"; + } else if (s.equals("float") || s.equals("java.lang.Float")) { + dataType = "float"; + } else if (s.equals("boolean") || s.equals("java.lang.Boolean")) { + dataType = "boolean"; + } else if (s.equals("java.util.UUID")) { + dataType = "uuid"; + } else if (s.equals("java.lang.Long")) { + dataType = "bigint"; + }else if (s.equals("java.math.BigDecimal")) { + dataType = "decimal"; + }else if (s.equals("java.net.InetAddress")) { + dataType = "inet"; + }else if (s.equals("java.math.BigInteger")) { + dataType = "varint"; + } else if (s.equals("java.nio.ByteBuffer")) { + dataType = "blob"; + }else if (s.contains("Map")) { + ParameterizedType mapType; + if (field != null) { + mapType = (ParameterizedType) field.getGenericType(); + } else { + mapType = (ParameterizedType) fieldType; + } + Type value1 = mapType.getActualTypeArguments()[0]; + Type value2 = mapType.getActualTypeArguments()[1]; + dataType = "map<" +dataType(null, value1) + "," + dataType(null, value2) + ">"; + } else if (s.contains("List")) { + ParameterizedType listType; + if (field != null) { + listType = (ParameterizedType) field.getGenericType(); + } else { + listType = (ParameterizedType) fieldType; + } + Type value = listType.getActualTypeArguments()[0]; + dataType = "list<" + dataType(null, value) + ">"; + } else if (s.contains("Set")) { + ParameterizedType setType; + if (field != null) { + setType = (ParameterizedType) field.getGenericType(); + } else { + setType = (ParameterizedType) fieldType; + } + Type value = setType.getActualTypeArguments()[0]; + dataType = "set<" + dataType(null, value) + ">"; + } else { + throw new RuntimeException("Unsupported Cassandra DataType"); + } + return dataType; + } + } http://git-wip-us.apache.org/repos/asf/gora/blob/a9a3ad49/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java index 17e0568..651dfdb 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java @@ -26,6 +26,7 @@ import org.apache.gora.cassandra.store.CassandraClient; import org.apache.gora.cassandra.store.CassandraMapping; import org.apache.gora.cassandra.store.CassandraStore; import org.apache.gora.persistency.Persistent; +import org.apache.gora.persistency.impl.PersistentBase; import org.apache.gora.query.Query; import org.apache.gora.query.Result; import org.apache.gora.store.DataStore; @@ -33,10 +34,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.LinkedHashSet; +import java.util.HashMap; import java.util.List; import java.util.Locale; -import java.util.Set; +import java.util.Map; /** * This is the abstract Cassandra Serializer class. @@ -47,14 +48,23 @@ public abstract class CassandraSerializer<K, T extends Persistent> { protected Class<T> persistentClass; + private Map<String, String> userDefineTypeMaps; + protected CassandraMapping mapping; + private Schema persistentSchema; CassandraClient client; - CassandraSerializer(CassandraClient cc, Class<K> keyClass, Class<T> persistantClass, CassandraMapping mapping) { + CassandraSerializer(CassandraClient cc, Class<K> keyClass, Class<T> persistantClass, CassandraMapping mapping, Schema schema) { this.keyClass = keyClass; this.persistentClass = persistantClass; this.client = cc; this.mapping = mapping; + persistentSchema = schema; + try { + analyzePersistent(); + } catch (Exception e) { + throw new RuntimeException("Error occurred while analyzing the persistent class, :" + e.getMessage()); + } } /** @@ -68,24 +78,59 @@ public abstract class CassandraSerializer<K, T extends Persistent> { * @param <T> persistent class * @return Serializer */ - public static <K, T extends Persistent> CassandraSerializer getSerializer(CassandraClient cc, String type, final DataStore<K, T> dataStore, CassandraMapping mapping) { + public static <K, T extends Persistent> CassandraSerializer getSerializer(CassandraClient cc, String type, final DataStore<K, T> dataStore, CassandraMapping mapping, Schema schema) { CassandraStore.SerializerType serType = type.isEmpty() ? CassandraStore.SerializerType.NATIVE : CassandraStore.SerializerType.valueOf(type.toUpperCase(Locale.ENGLISH)); CassandraSerializer serializer; switch (serType) { case AVRO: - serializer = new AvroSerializer(cc, dataStore, mapping); + serializer = new AvroSerializer(cc, dataStore, mapping, schema); break; case NATIVE: default: - serializer = new NativeSerializer(cc, dataStore.getKeyClass(), dataStore.getPersistentClass(), mapping); + serializer = new NativeSerializer(cc, dataStore.getKeyClass(), dataStore.getPersistentClass(), mapping, schema); } return serializer; } + private void analyzePersistent() throws Exception { + userDefineTypeMaps = new HashMap<>(); + for (Field field : mapping.getFieldList()) { + String fieldType = field.getType(); + if (fieldType.contains("frozen")) { + String udtType = fieldType.substring(fieldType.indexOf("<") + 1, fieldType.indexOf(">")); + if (this instanceof AvroSerializer) { + if (PersistentBase.class.isAssignableFrom(persistentClass)) { + Schema fieldSchema = persistentSchema.getField(field.getFieldName()).schema(); + if (fieldSchema.getType().equals(Schema.Type.UNION)) { + for (Schema currentSchema : fieldSchema.getTypes()) { + if (currentSchema.getType().equals(Schema.Type.RECORD)) { + fieldSchema = currentSchema; + break; + } + } + } + String createQuery = CassandraQueryFactory.getCreateUDTTypeForAvro(mapping, udtType, fieldSchema); + userDefineTypeMaps.put(udtType, createQuery); + } else { + throw new RuntimeException("Unsupported Class for User Define Types, Please use PersistentBase class. field : " + udtType); + } + } else { + String createQuery = CassandraQueryFactory.getCreateUDTTypeForNative(mapping, persistentClass, udtType, field.getFieldName()); + userDefineTypeMaps.put(udtType, createQuery); + } + } + } + + } + + public void createSchema() { LOG.debug("creating Cassandra keyspace {}", mapping.getKeySpace().getName()); this.client.getSession().execute(CassandraQueryFactory.getCreateKeySpaceQuery(mapping)); - processUDTSchemas(); //TODO complete functionality + for (Map.Entry udtType : userDefineTypeMaps.entrySet()) { + LOG.debug("creating Cassandra User Define Type {}", udtType.getKey()); + this.client.getSession().execute((String) udtType.getValue()); + } LOG.debug("creating Cassandra column family / table {}", mapping.getCoreName()); this.client.getSession().execute(CassandraQueryFactory.getCreateTableQuery(mapping)); } @@ -116,32 +161,6 @@ public abstract class CassandraSerializer<K, T extends Persistent> { } } - private void processUDTSchemas() { - Set<String> schemaStack = new LinkedHashSet<>(); - for (Field field : mapping.getFieldList()) { - if (field.getType().contains("frozen")) { - try { - Schema schema = (Schema) mapping.getPersistentClass().getField("SCHEMA$").get(null); - Schema schemaField = schema.getField(field.getFieldName()).schema(); - String cqlQuery = CassandraQueryFactory.getCreateUDTType(schemaField, mapping, schemaStack); - schemaStack.add(cqlQuery); - } catch (IllegalAccessException | NoSuchFieldException e) { - throw new RuntimeException("SCHEMA$ field can't accessible, Please recompile the Avro schema with goracompiler."); - } catch (NullPointerException e) { - throw new RuntimeException(field + " field couldn't find in the class " + mapping.getPersistentClass() + "."); - } - } - } - createUserDefineTypes(schemaStack); - - } - - private void createUserDefineTypes(Set<String> queries) { - for (String cqlQuery : queries) { - this.client.getSession().execute(cqlQuery); - } - } - protected String[] getFields() { List<String> fields = new ArrayList<>(); for (Field field : mapping.getFieldList()) { @@ -162,7 +181,7 @@ public abstract class CassandraSerializer<K, T extends Persistent> { public boolean updateByQuery(Query query) { List<Object> objectArrayList = new ArrayList<>(); - String cqlQuery = CassandraQueryFactory.getUpdateByQuery(mapping, query, objectArrayList); + String cqlQuery = CassandraQueryFactory.getUpdateByQuery(mapping, query, objectArrayList, persistentSchema); ResultSet results; if (objectArrayList.size() == 0) { results = client.getSession().execute(cqlQuery); http://git-wip-us.apache.org/repos/asf/gora/blob/a9a3ad49/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java index f8bb066..abff409 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java @@ -21,9 +21,9 @@ import com.datastax.driver.core.ResultSet; import com.datastax.driver.mapping.Mapper; import com.datastax.driver.mapping.MappingManager; import com.datastax.driver.mapping.Result; +import org.apache.avro.Schema; import org.apache.commons.lang.ArrayUtils; import org.apache.gora.cassandra.bean.Field; -import org.apache.gora.cassandra.persistent.CassandraNativePersistent; import org.apache.gora.cassandra.query.CassandraResultSet; import org.apache.gora.cassandra.store.CassandraClient; import org.apache.gora.cassandra.store.CassandraMapping; @@ -41,14 +41,14 @@ import java.util.List; /** * This Class contains the operation relates to Native Serialization. */ -class NativeSerializer<K, T extends CassandraNativePersistent> extends CassandraSerializer { +class NativeSerializer<K, T extends Persistent> extends CassandraSerializer { private static final Logger LOG = LoggerFactory.getLogger(NativeSerializer.class); private Mapper<T> mapper; - NativeSerializer(CassandraClient cassandraClient, Class<K> keyClass, Class<T> persistentClass, CassandraMapping mapping) { - super(cassandraClient, keyClass, persistentClass, mapping); + NativeSerializer(CassandraClient cassandraClient, Class<K> keyClass, Class<T> persistentClass, CassandraMapping mapping, Schema schema) { + super(cassandraClient, keyClass, persistentClass, mapping, schema); this.createSchema(); MappingManager mappingManager = new MappingManager(cassandraClient.getSession()); mapper = mappingManager.mapper(persistentClass); @@ -131,7 +131,7 @@ class NativeSerializer<K, T extends CassandraNativePersistent> extends Cassandra break; } } - K key = null; + K key; Method keyMethod = null; try { for (Method method : this.persistentClass.getMethods()) { http://git-wip-us.apache.org/repos/asf/gora/blob/a9a3ad49/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java index ac46a30..807a99d 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java @@ -51,6 +51,7 @@ public class CassandraMapping { /** * This method returns the KeySpace in the mapping file, + * * @return Key space {@link KeySpace} */ public KeySpace getKeySpace() { @@ -59,6 +60,7 @@ public class CassandraMapping { /** * This method set the KeySpace in the Cassandra mapping. + * * @param keySpace Key space {@link KeySpace} */ void setKeySpace(KeySpace keySpace) { @@ -67,6 +69,7 @@ public class CassandraMapping { /** * Thi method returns only the fields which belongs only for the Persistent Object. + * * @return List of Fields */ public List<Field> getFieldList() { @@ -75,6 +78,7 @@ public class CassandraMapping { /** * This method returns the Persistent Object's Field from the mapping, according to the FieldName. + * * @param fieldName Field Name * @return Field {@link Field} */ @@ -89,6 +93,7 @@ public class CassandraMapping { /** * This method returns the Persistent Object's Field from the mapping, according to the ColumnName. + * * @param columnName Column Name * @return Field {@link Field} */ @@ -103,6 +108,7 @@ public class CassandraMapping { /** * This method returns the Field Names + * * @return array of Field Names */ public String[] getFieldNames() { @@ -116,6 +122,7 @@ public class CassandraMapping { /** * This method returns + * * @return */ public String[] getAllFieldsIncludingKeys() { @@ -133,7 +140,6 @@ public class CassandraMapping { } /** - * * @return */ public String[] getAllKeys() { http://git-wip-us.apache.org/repos/asf/gora/blob/a9a3ad49/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java index f151458..1d787de 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java @@ -242,6 +242,9 @@ class CassandraMappingBuilder<K, T extends Persistent> { attributeValue = attributeValue.replace("udt(", "frozen("); } fieldKey.setType(attributeValue.replace("(", "<").replace(")", ">")); + if (fieldKey.getType().equalsIgnoreCase("udt")) { + throw new RuntimeException("Invalid udt type, Please enter dataType for udt with a unique name for particular user define data type, like udt(metadata)."); + } break; default: fieldKey.addProperty(attributeName, attributeValue); http://git-wip-us.apache.org/repos/asf/gora/blob/a9a3ad49/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java index c481610..5e31fcc 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java @@ -17,12 +17,12 @@ package org.apache.gora.cassandra.store; -import org.apache.avro.data.RecordBuilder; -import org.apache.gora.cassandra.persistent.CassandraNativePersistent; +import org.apache.avro.Schema; import org.apache.gora.cassandra.query.CassandraQuery; import org.apache.gora.cassandra.serializers.CassandraSerializer; import org.apache.gora.persistency.BeanFactory; import org.apache.gora.persistency.Persistent; +import org.apache.gora.persistency.impl.BeanFactoryImpl; import org.apache.gora.persistency.impl.PersistentBase; import org.apache.gora.query.PartitionQuery; import org.apache.gora.query.Query; @@ -54,16 +54,23 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> private Class<K> keyClass; + private Schema persistentSchema; + private Class<T> persistentClass; private CassandraMapping mapping; private CassandraSerializer cassandraSerializer; + private String serializationType; public CassandraStore() { super(); } + public String getSerializationType() { + return serializationType; + } + /** * In initializing the cassandra datastore, read the mapping file, creates the basic connection to cassandra cluster, * according to the gora properties @@ -77,12 +84,21 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> try { this.keyClass = keyClass; this.persistentClass = persistentClass; + if (this.beanFactory == null) { + this.beanFactory = new BeanFactoryImpl<>(keyClass, persistentClass); + } + if (PersistentBase.class.isAssignableFrom(persistentClass)) { + persistentSchema = ((PersistentBase) this.beanFactory.getCachedPersistent()).getSchema(); + } else { + persistentSchema = null; + } String mappingFile = DataStoreFactory.getMappingFile(properties, this, DEFAULT_MAPPING_FILE); + serializationType = properties.getProperty(CassandraStoreParameters.CASSANDRA_SERIALIZATION_TYPE); CassandraMappingBuilder mappingBuilder = new CassandraMappingBuilder(this); mapping = mappingBuilder.readMapping(mappingFile); CassandraClient cassandraClient = new CassandraClient(); cassandraClient.initialize(properties, mapping); - cassandraSerializer = CassandraSerializer.getSerializer(cassandraClient, properties.getProperty(CassandraStoreParameters.CASSANDRA_SERIALIZATION_TYPE), this, mapping); + cassandraSerializer = CassandraSerializer.getSerializer(cassandraClient, serializationType, this, mapping, persistentSchema); } catch (Exception e) { throw new RuntimeException("Error while initializing Cassandra store: " + e.getMessage(), e); } @@ -100,7 +116,6 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> * This is a setter method to set the class of persistent objects. * * @param persistentClass class of persistent objects - * {@link CassandraNativePersistent} * {@link org.apache.gora.persistency.Persistent} */ @Override @@ -153,9 +168,6 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> try { if (beanFactory != null) { return this.beanFactory.newPersistent(); - } else if (PersistentBase.class.isAssignableFrom(persistentClass)) { - RecordBuilder builder = (RecordBuilder) persistentClass.getMethod("newBuilder").invoke(null); - return (T) RecordBuilder.class.getMethod("build").invoke(builder); } else { return persistentClass.newInstance(); } http://git-wip-us.apache.org/repos/asf/gora/blob/a9a3ad49/gora-cassandra-cql/src/test/conf/avroUDT/gora-cassandra-mapping.xml ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/test/conf/avroUDT/gora-cassandra-mapping.xml b/gora-cassandra-cql/src/test/conf/avroUDT/gora-cassandra-mapping.xml new file mode 100644 index 0000000..9dc3c0b --- /dev/null +++ b/gora-cassandra-cql/src/test/conf/avroUDT/gora-cassandra-mapping.xml @@ -0,0 +1,50 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<gora-otd> + + <keyspace name="avroKeySpace" durableWrite="false"> + <placementStrategy name="SimpleStrategy" replication_factor="1"/> + </keyspace> + + <class name="org.apache.gora.examples.generated.WebPage" keyClass="java.lang.String" table="WebPage" + allowFiltering="true" + keyspace="avroKeySpace"> + <field name="url" column="url" type="ascii"/> + <field name="content" column="content" type="blob"/> + <field name="parsedContent" column="parsedContent" type="list(ascii)"/> + <field name="outlinks" column="outlinks" type="map(text,text)"/> + <field name="headers" column="headers" type="map(text,text)"/> + <field name="byteData" column="byteData" type="map(text,blob)"/> + <field name="metadata" column="metadata" type="udt(metadata)"/> + <field name="stringData" column="stringData" type="map(text,ascii)"/> + </class> + + <class name="org.apache.gora.examples.generated.Employee" keyClass="java.lang.String" keyspace="avroKeySpace" + allowFiltering="true" + table="Employee" compactStorage="true"> + <field name="name" column="name" type="text"/> + <field name="dateOfBirth" column="dob" type="bigint"/> + <field name="ssn" column="ssn" type="text"/> + <field name="salary" column="salary" type="int"/> + <field name="boss" column="boss" type="blob"/> + <field name="webpage" column="webpage" type="blob"/> + </class> + +</gora-otd>