Refactored the code
Project: http://git-wip-us.apache.org/repos/asf/gora/repo Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/5e383ef9 Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/5e383ef9 Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/5e383ef9 Branch: refs/heads/master Commit: 5e383ef974541804382a639b79e3efbf6b03d2b2 Parents: 89109b8 Author: madhawa <[email protected]> Authored: Wed Jun 28 23:45:18 2017 +0530 Committer: madhawa <[email protected]> Committed: Wed Jun 28 23:48:54 2017 +0530 ---------------------------------------------------------------------- gora-cassandra-cql/pom.xml | 10 - gora-cassandra-cql/src/examples/java/.gitignore | 15 - .../generated/nativeSerialization/User.java | 66 ++ .../persistent/CassandraNativePersistent.java | 109 ++++ .../cassandra/serializers/AvroSerializer.java | 51 ++ .../serializers/CassandraNativePersistent.java | 91 --- .../serializers/CassandraQueryFactory.java | 248 ++++++++ .../serializers/CassandraSerializer.java | 105 +++ .../cassandra/serializers/NativeSerializer.java | 61 ++ .../gora/cassandra/store/CassandraClient.java | 368 +++++++++++ .../store/CassandraMappingBuilder.java | 234 +++++++ .../cassandra/store/CassandraQueryFactory.java | 224 ------- .../gora/cassandra/store/CassandraStore.java | 636 ++----------------- .../store/CassandraStoreParameters.java | 2 +- .../test/conf/avro/gora-cassandra-mapping.xml | 73 +++ .../gora-cassandra-mapping.xml | 2 +- gora-cassandra-cql/src/test/java/.gitignore | 15 - ...stCassandraStoreWithNativeSerialization.java | 25 +- .../nativeSerialization/DateAsStringCodec.java | 17 + .../test/nativeSerialization/User.java | 66 -- 20 files changed, 1404 insertions(+), 1014 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/pom.xml ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/pom.xml b/gora-cassandra-cql/pom.xml index d56e07a..13e5a1a 100644 --- a/gora-cassandra-cql/pom.xml +++ b/gora-cassandra-cql/pom.xml @@ -110,16 +110,6 @@ </build> <dependencies> - - <!-- OSX JDK 7 --> - <!-- should be removed once fixed in Cassandra --> - <dependency> - <groupId>org.xerial.snappy</groupId> - <artifactId>snappy-java</artifactId> - <version>1.0.5-M3</version> - <scope>test</scope> - </dependency> - <!-- Gora Internal Dependencies --> <dependency> <groupId>org.apache.gora</groupId> http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/examples/java/.gitignore ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/examples/java/.gitignore b/gora-cassandra-cql/src/examples/java/.gitignore deleted file mode 100644 index 09697dc..0000000 --- a/gora-cassandra-cql/src/examples/java/.gitignore +++ /dev/null @@ -1,15 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/User.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/User.java b/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/User.java new file mode 100644 index 0000000..1e810a0 --- /dev/null +++ b/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/User.java @@ -0,0 +1,66 @@ +package org.apache.gora.cassandra.example.generated.nativeSerialization; + +import com.datastax.driver.mapping.annotations.Column; +import com.datastax.driver.mapping.annotations.PartitionKey; +import com.datastax.driver.mapping.annotations.Table; +import com.datastax.driver.mapping.annotations.Transient; +import org.apache.gora.cassandra.persistent.CassandraNativePersistent; + +import java.util.Date; +import java.util.UUID; + +/** + * Created by madhawa on 6/23/17. + */ + +@Table(keyspace = "nativeTestKeySpace", name = "users", + readConsistency = "QUORUM", + writeConsistency = "QUORUM", + caseSensitiveKeyspace = false, + caseSensitiveTable = false) +public class User extends CassandraNativePersistent { + @PartitionKey + @Column(name = "user_id") + private UUID userId; + @Column(name = "name") + private String name; + @Column(name = "dob") + private Date dateOfBirth; + + @Transient + private boolean dirty; + + public User() { + + } + + public User(UUID userId, String name, Date dateOfBirth) { + this.userId = userId; + this.name = name; + this.dateOfBirth = dateOfBirth; + } + + public void setUserId(UUID userId) { + this.userId = userId; + } + + public void setName(String name) { + this.name = name; + } + + public void setDateOfBirth(Date dateOfBirth) { + this.dateOfBirth = dateOfBirth; + } + + public UUID getUserId() { + return userId; + } + + public String getName() { + return name; + } + + public Date getDateOfBirth() { + return dateOfBirth; + } +} http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/persistent/CassandraNativePersistent.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/persistent/CassandraNativePersistent.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/persistent/CassandraNativePersistent.java new file mode 100644 index 0000000..bd17dcd --- /dev/null +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/persistent/CassandraNativePersistent.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.cassandra.persistent; + +import com.datastax.driver.mapping.annotations.Transient; +import org.apache.avro.Schema; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.persistency.Tombstone; +import org.apache.gora.persistency.impl.PersistentBase; + +import java.util.List; + +/** + * This class should be used with Native Cassandra Serialization. + */ +public abstract class CassandraNativePersistent implements Persistent { + @Transient + @Override + public void clear() { + + } + + @Transient + @Override + public boolean isDirty(int fieldIndex) { + return false; + } + + @Transient + @Override + public boolean isDirty(String field) { + return false; + } + + @Transient + @Override + public void setDirty() { + + } + + @Transient + @Override + public void setDirty(int fieldIndex) { + + } + + @Transient + @Override + public void setDirty(String field) { + + } + + @Transient + @Override + public void clearDirty(int fieldIndex) { + + } + + @Transient + @Override + public void clearDirty(String field) { + + } + + @Transient + @Override + public Tombstone getTombstone() { + return null; + } + + @Transient + @Override + public List<Schema.Field> getUnmanagedFields() { + return null; + } + + @Transient + @Override + public Persistent newInstance() { + return null; + } + + @Transient + @Override + public boolean isDirty() { + return false; + } + + @Transient + @Override + public void clearDirty() { + + } +} http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java new file mode 100644 index 0000000..bb9d99c --- /dev/null +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.cassandra.serializers; + +import org.apache.gora.cassandra.store.CassandraClient; +import org.apache.gora.cassandra.store.CassandraMapping; +import org.apache.gora.persistency.impl.PersistentBase; + +/** + * Created by madhawa on 6/26/17. + */ +public class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer { + + + public AvroSerializer(CassandraClient cassandraClient, Class<K> keyClass, Class<T> persistentClass, CassandraMapping mapping) { + super(cassandraClient, keyClass, persistentClass, mapping); + } + + + @Override + public PersistentBase get(Object key) { + return null; + } + + @Override + public void put(Object key, Object value) { + + } + + @Override + public boolean delete(Object key) { + return false; + } + + +} http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraNativePersistent.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraNativePersistent.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraNativePersistent.java deleted file mode 100644 index c493433..0000000 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraNativePersistent.java +++ /dev/null @@ -1,91 +0,0 @@ -package org.apache.gora.cassandra.serializers; - -import com.datastax.driver.mapping.annotations.Transient; -import org.apache.avro.Schema; -import org.apache.gora.persistency.Persistent; -import org.apache.gora.persistency.Tombstone; - -import java.util.List; - -/** - * This class should be used with Native Cassandra Serialization. - */ -public abstract class CassandraNativePersistent implements Persistent { - @Transient - @Override - public void clear() { - - } - - @Transient - @Override - public boolean isDirty(int fieldIndex) { - return false; - } - - @Transient - @Override - public boolean isDirty(String field) { - return false; - } - - @Transient - @Override - public void setDirty() { - - } - - @Transient - @Override - public void setDirty(int fieldIndex) { - - } - - @Transient - @Override - public void setDirty(String field) { - - } - - @Transient - @Override - public void clearDirty(int fieldIndex) { - - } - - @Transient - @Override - public void clearDirty(String field) { - - } - - @Transient - @Override - public Tombstone getTombstone() { - return null; - } - - @Transient - @Override - public List<Schema.Field> getUnmanagedFields() { - return null; - } - - @Transient - @Override - public Persistent newInstance() { - return null; - } - - @Transient - @Override - public boolean isDirty() { - return false; - } - - @Transient - @Override - public void clearDirty() { - - } -} http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java new file mode 100644 index 0000000..84f5ccb --- /dev/null +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.cassandra.serializers; + +import org.apache.gora.cassandra.bean.CassandraKey; +import org.apache.gora.cassandra.bean.ClusterKeyField; +import org.apache.gora.cassandra.bean.Field; +import org.apache.gora.cassandra.bean.KeySpace; +import org.apache.gora.cassandra.bean.PartitionKeyField; +import org.apache.gora.cassandra.store.CassandraMapping; +import org.apache.gora.persistency.Persistent; + +import java.util.List; +import java.util.Map; + +/** + * This class is used create Cassandra Queries. + */ +class CassandraQueryFactory { + + /** + * This method returns the CQL query to create key space. + * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/create_keyspace_r.html + * + * @param mapping Cassandra Mapping + * @return CQL Query + */ + static String getCreateKeySpaceQuery(CassandraMapping mapping) { + KeySpace keySpace = mapping.getKeySpace(); + StringBuilder stringBuffer = new StringBuilder(); + stringBuffer.append("CREATE KEYSPACE IF NOT EXISTS ").append(keySpace.getName()).append(" WITH REPLICATION = { 'class' : "); + KeySpace.PlacementStrategy placementStrategy = keySpace.getPlacementStrategy(); + stringBuffer.append("'").append(placementStrategy).append("'").append(", ").append("'"); + switch (placementStrategy) { + case SimpleStrategy: + stringBuffer.append("replication_factor").append("'").append(" : ").append(keySpace.getReplicationFactor()).append(" }"); + break; + case NetworkTopologyStrategy: + boolean isCommaNeeded = false; + for (Map.Entry<String, Integer> entry : keySpace.getDataCenters().entrySet()) { + if (isCommaNeeded) { + stringBuffer.append(", '"); + } + stringBuffer.append(entry.getKey()).append("'").append(" : ").append(entry.getValue()); + isCommaNeeded = true; + } + stringBuffer.append(" }"); + break; + } + + if (keySpace.isDurableWritesEnabled()) { + stringBuffer.append(" AND DURABLE_WRITES = ").append(keySpace.isDurableWritesEnabled()); + } + return stringBuffer.toString(); + } + + /** + * This method returns the CQL query to table. + * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/create_table_r.html + * <p> + * Trick : To have a consistency of the order of the columns, first we append partition keys, second cluster keys and finally other columns. + * It's very much needed to follow the same order in other CRUD operations as well. + * + * @param mapping Cassandra mapping + * @return CQL + */ + static String getCreateTableQuery(CassandraMapping mapping) { + StringBuilder stringBuffer = new StringBuilder(); + stringBuffer.append("CREATE TABLE IF NOT EXISTS ").append(mapping.getKeySpace().getName()).append(".").append(mapping.getCoreName()).append(" ("); + boolean isCommaNeeded = false; + CassandraKey cassandraKey = mapping.getCassandraKey(); + // appending Cassandra key columns into db schema + if (cassandraKey != null) { + for (PartitionKeyField partitionKeyField : cassandraKey.getPartitionKeyFields()) { + if (partitionKeyField.isComposite()) { + for (Field compositeField : partitionKeyField.getFields()) { + stringBuffer = processFields(stringBuffer, compositeField, isCommaNeeded); + } + + } else { + stringBuffer = processFields(stringBuffer, partitionKeyField, isCommaNeeded); + } + isCommaNeeded = true; + } + for (ClusterKeyField clusterKeyField : cassandraKey.getClusterKeyFields()) { + stringBuffer = processFields(stringBuffer, clusterKeyField, isCommaNeeded); + } + } + // appending Other columns + for (Field field : mapping.getFieldList()) { + if (isCommaNeeded) { + stringBuffer.append(", "); + } + stringBuffer.append(field.getColumnName()).append(" ").append(field.getType()); + boolean isStaticColumn = Boolean.parseBoolean(field.getProperty("static")); + boolean isPrimaryKey = Boolean.parseBoolean(field.getProperty("primarykey")); + if (isStaticColumn) { + stringBuffer.append(" STATIC"); + } + if (isPrimaryKey) { + stringBuffer.append(" PRIMARY KEY "); + } + isCommaNeeded = true; + } + + if (cassandraKey != null) { + List<PartitionKeyField> pkey = cassandraKey.getPartitionKeyFields(); + if (pkey != null) { + stringBuffer.append(", PRIMARY KEY ("); + boolean isCommaNeededToApply = false; + for (PartitionKeyField keyField : pkey) { + if (isCommaNeededToApply) { + stringBuffer.append(","); + } + if (keyField.isComposite()) { + stringBuffer.append("("); + boolean isCommaNeededHere = false; + for (Field field : keyField.getFields()) { + if (isCommaNeededHere) { + stringBuffer.append(", "); + } + stringBuffer.append(field.getColumnName()); + isCommaNeededHere = true; + } + stringBuffer.append(")"); + } else { + stringBuffer.append(keyField.getColumnName()); + } + isCommaNeededToApply = true; + } + stringBuffer.append(")"); + } + } + + stringBuffer.append(")"); + boolean isWithNeeded = true; + if (Boolean.parseBoolean(mapping.getProperty("compactStorage"))) { + stringBuffer.append(" WITH COMPACT STORAGE "); + isWithNeeded = false; + } + + String id = mapping.getProperty("id"); + if (id != null) { + if (isWithNeeded) { + stringBuffer.append(" WITH "); + } else { + stringBuffer.append(" AND "); + } + stringBuffer.append("ID = '").append(id).append("'"); + isWithNeeded = false; + } + if (cassandraKey != null) { + List<ClusterKeyField> clusterKeyFields = cassandraKey.getClusterKeyFields(); + if (clusterKeyFields != null) { + if (isWithNeeded) { + stringBuffer.append(" WITH "); + } else { + stringBuffer.append(" AND "); + } + stringBuffer.append(" CLUSTERING ORDER BY ("); + boolean isCommaNeededToApply = false; + for (ClusterKeyField keyField : clusterKeyFields) { + if (isCommaNeededToApply) { + stringBuffer.append(", "); + } + stringBuffer.append(keyField.getColumnName()).append(" "); + if (keyField.getOrder() != null) { + stringBuffer.append(keyField.getOrder()); + } + isCommaNeededToApply = true; + } + stringBuffer.append(")"); + } + } + return stringBuffer.toString(); + } + + private static StringBuilder processFields(StringBuilder stringBuilder, Field field, boolean isCommaNeeded) { + if (isCommaNeeded) { + stringBuilder.append(", "); + } + stringBuilder.append(field.getColumnName()).append(" ").append(field.getType()); + boolean isStaticColumn = Boolean.parseBoolean(field.getProperty("static")); + if (isStaticColumn) { + stringBuilder.append(" STATIC"); + } + return stringBuilder; + } + + /** + * This method returns the CQL query to drop table. + * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/drop_table_r.html + * + * @param mapping Cassandra Mapping + * @return CQL query + */ + static String getDropTableQuery(CassandraMapping mapping) { + return "DROP TABLE IF EXISTS " + mapping.getKeySpace().getName() + "." + mapping.getCoreName(); + } + + /** + * This method returns the CQL query to drop key space. + * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/drop_keyspace_r.html + * + * @param mapping Cassandra Mapping + * @return CQL query + */ + static String getDropKeySpaceQuery(CassandraMapping mapping) { + return "DROP KEYSPACE IF EXISTS " + mapping.getKeySpace().getName(); + } + + /** + * This method returns the CQL query to truncate (removes all the data) in the table. + * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/truncate_r.html + * + * @param mapping Cassandra Mapping + * @return CQL query + */ + static String getTruncateTableQuery(CassandraMapping mapping) { + return "TRUNCATE TABLE " + mapping.getKeySpace().getName() + "." + mapping.getCoreName(); + } + + /** + * + * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/insert_r.html + * @return + */ + static String getInsertDataQuery(CassandraMapping mapping, Object obj) { +// ( (Persistent) obj).getS + StringBuilder stringBuffer = new StringBuilder(); +// o +return null; + } +} http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java new file mode 100644 index 0000000..09272ce --- /dev/null +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.cassandra.serializers; + +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.TableMetadata; +import org.apache.gora.cassandra.store.CassandraClient; +import org.apache.gora.cassandra.store.CassandraMapping; +import org.apache.gora.cassandra.store.CassandraStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Locale; + +/** + * Created by madhawa on 6/26/17. + */ +public abstract class CassandraSerializer<K, T> { + CassandraClient client; + + private Class<K> keyClass; + + private Class<T> persistentClass; + + private CassandraMapping mapping; + + private static final Logger LOG = LoggerFactory.getLogger(CassandraStore.class); + + CassandraSerializer(CassandraClient cc, Class<K> keyClass, Class<T> persistantClass, CassandraMapping mapping) { + this.keyClass = keyClass; + this.persistentClass = persistantClass; + this.client = cc; + this.mapping = mapping; + } + + public void createSchema() { + LOG.debug("creating Cassandra keyspace {}", mapping.getKeySpace().getName()); + this.client.getSession().execute(CassandraQueryFactory.getCreateKeySpaceQuery(mapping)); + LOG.debug("creating Cassandra column family / table {}", mapping.getCoreName()); + this.client.getSession().execute(CassandraQueryFactory.getCreateTableQuery(mapping)); + } + + public void deleteSchema() { + LOG.debug("dropping Cassandra table {}", mapping.getCoreName()); + this.client.getSession().execute(CassandraQueryFactory.getDropTableQuery(mapping)); + LOG.debug("dropping Cassandra keyspace {}", mapping.getKeySpace().getName()); + this.client.getSession().execute(CassandraQueryFactory.getDropKeySpaceQuery(mapping)); + } + + public void close() { + this.client.close(); + } + + public void truncateSchema() { + LOG.debug("truncating Cassandra table {}", mapping.getCoreName()); + this.client.getSession().execute(CassandraQueryFactory.getTruncateTableQuery(mapping)); + } + + public boolean schemaExists() { + KeyspaceMetadata keyspace = this.client.getCluster().getMetadata().getKeyspace(mapping.getKeySpace().getName()); + if (keyspace != null) { + TableMetadata table = keyspace.getTable(mapping.getCoreName()); + return table != null; + } else { + return false; + } + } + + public static <K, T> CassandraSerializer getSerializer(CassandraClient cc, String type, final Class<K> keyClass, final Class<T> persistentClass, CassandraMapping mapping) { + CassandraStore.SerializerType serType = type.isEmpty() ? CassandraStore.SerializerType.NATIVE : CassandraStore.SerializerType.valueOf(type.toUpperCase(Locale.ENGLISH)); + CassandraSerializer ser; + switch (serType) { + case AVRO: + ser = new AvroSerializer(cc,keyClass, persistentClass, mapping); + break; + case NATIVE: + default: + ser = new NativeSerializer(cc, keyClass, persistentClass, mapping); + + } + return ser; + } + + public abstract void put(K key, T value); + + public abstract T get(K key); + + public abstract boolean delete(K key); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java new file mode 100644 index 0000000..0c30cba --- /dev/null +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.cassandra.serializers; + +import com.datastax.driver.mapping.Mapper; +import com.datastax.driver.mapping.MappingManager; +import org.apache.gora.cassandra.persistent.CassandraNativePersistent; +import org.apache.gora.cassandra.store.CassandraClient; +import org.apache.gora.cassandra.store.CassandraMapping; + +/** + * Created by madhawa on 6/26/17. + */ +public class NativeSerializer<K, T extends CassandraNativePersistent> extends CassandraSerializer { + + private Mapper<T> mapper; + + + + + @Override + public void put(Object key, Object value) { + mapper.save((T)value); + } + + @Override + public T get(Object key) { + return mapper.get(key); + } + + @Override + public boolean delete(Object key) { + mapper.delete(key); + return true; + } + + + + public NativeSerializer(CassandraClient cassandraClient, Class<K> keyClass, Class<T> persistentClass, CassandraMapping mapping) { + super(cassandraClient, keyClass, persistentClass, mapping); + this.createSchema(); + MappingManager mappingManager = new MappingManager(cassandraClient.getSession()); + mapper = mappingManager.mapper(persistentClass); + + } +} http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java new file mode 100644 index 0000000..847343e --- /dev/null +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java @@ -0,0 +1,368 @@ +package org.apache.gora.cassandra.store; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.HostDistance; +import com.datastax.driver.core.PoolingOptions; +import com.datastax.driver.core.ProtocolOptions; +import com.datastax.driver.core.ProtocolVersion; +import com.datastax.driver.core.QueryOptions; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SocketOptions; +import com.datastax.driver.core.TypeCodec; +import com.datastax.driver.core.policies.ConstantReconnectionPolicy; +import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy; +import com.datastax.driver.core.policies.DefaultRetryPolicy; +import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy; +import com.datastax.driver.core.policies.ExponentialReconnectionPolicy; +import com.datastax.driver.core.policies.FallthroughRetryPolicy; +import com.datastax.driver.core.policies.LatencyAwarePolicy; +import com.datastax.driver.core.policies.LoggingRetryPolicy; +import com.datastax.driver.core.policies.RoundRobinPolicy; +import com.datastax.driver.core.policies.TokenAwarePolicy; +import org.jdom.Document; +import org.jdom.Element; +import org.jdom.JDOMException; +import org.jdom.input.SAXBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + * Created by madhawa on 6/28/17. + */ +public class CassandraClient { + + public static final Logger LOG = LoggerFactory.getLogger(CassandraClient.class); + + + private Cluster cluster; + + public Session getSession() { + return session; + } + + public Cluster getCluster() { + return cluster; + } + + private Session session; + + + public void initialize(Properties properties) throws Exception { + Cluster.Builder builder = Cluster.builder(); + List<String> codecs = readCustomCodec(properties); + builder = populateSettings(builder, properties); + this.cluster = builder.build(); + if (codecs != null) { + registerCustomCodecs(codecs); + } + this.session = this.cluster.connect(); + } + + + private Cluster.Builder populateSettings(Cluster.Builder builder, Properties properties) { + String serversParam = properties.getProperty(CassandraStoreParameters.CASSANDRA_SERVERS); + String[] servers = serversParam.split(","); + for (String server : servers) { + builder = builder.addContactPoint(server); + } + String portProp = properties.getProperty(CassandraStoreParameters.PORT); + if (portProp != null) { + builder = builder.withPort(Integer.parseInt(portProp)); + } + String clusterNameProp = properties.getProperty(CassandraStoreParameters.CLUSTER_NAME); + if (clusterNameProp != null) { + builder = builder.withClusterName(clusterNameProp); + } + String compressionProp = properties.getProperty(CassandraStoreParameters.COMPRESSION); + if (compressionProp != null) { + builder = builder.withCompression(ProtocolOptions.Compression.valueOf(compressionProp)); + } + builder = this.populateCredentials(properties, builder); + builder = this.populateLoadBalancingProp(properties, builder); + String enableJMXProp = properties.getProperty(CassandraStoreParameters.ENABLE_JMX_REPORTING); + if (!Boolean.parseBoolean(enableJMXProp)) { + builder = builder.withoutJMXReporting(); + } + String enableMetricsProp = properties.getProperty(CassandraStoreParameters.ENABLE_METRICS); + if (!Boolean.parseBoolean(enableMetricsProp)) { + builder = builder.withoutMetrics(); + } + builder = this.populatePoolingSettings(properties, builder); + String versionProp = properties.getProperty(CassandraStoreParameters.PROTOCOL_VERSION); + if (versionProp != null) { + builder = builder.withProtocolVersion(ProtocolVersion.fromInt(Integer.parseInt(versionProp))); + } + builder = this.populateQueryOptions(properties, builder); + builder = this.populateReconnectPolicy(properties, builder); + builder = this.populateRetrytPolicy(properties, builder); + builder = this.populateSocketOptions(properties, builder); + String enableSSLProp = properties.getProperty(CassandraStoreParameters.ENABLE_SSL); + if (enableSSLProp != null) { + if (Boolean.parseBoolean(enableSSLProp)) { + builder = builder.withSSL(); + } + } + return builder; + } + + + private Cluster.Builder populateLoadBalancingProp(Properties properties, Cluster.Builder builder) { + String loadBalancingProp = properties.getProperty(CassandraStoreParameters.LOAD_BALANCING_POLICY); + if (loadBalancingProp != null) { + switch (loadBalancingProp) { + case "LatencyAwareRoundRobinPolicy": + builder = builder.withLoadBalancingPolicy(LatencyAwarePolicy.builder(new RoundRobinPolicy()).build()); + break; + case "RoundRobinPolicy": + builder = builder.withLoadBalancingPolicy(new RoundRobinPolicy()); + break; + case "DCAwareRoundRobinPolicy": { + String dataCenter = properties.getProperty(CassandraStoreParameters.DATA_CENTER); + boolean allowRemoteDCsForLocalConsistencyLevel = Boolean.parseBoolean( + properties.getProperty(CassandraStoreParameters.ALLOW_REMOTE_DCS_FOR_LOCAL_CONSISTENCY_LEVEL)); + if (dataCenter != null && !dataCenter.isEmpty()) { + if (allowRemoteDCsForLocalConsistencyLevel) { + builder = builder.withLoadBalancingPolicy( + DCAwareRoundRobinPolicy.builder().withLocalDc(dataCenter) + .allowRemoteDCsForLocalConsistencyLevel().build()); + } else { + builder = builder.withLoadBalancingPolicy( + DCAwareRoundRobinPolicy.builder().withLocalDc(dataCenter).build()); + } + } else { + if (allowRemoteDCsForLocalConsistencyLevel) { + builder = builder.withLoadBalancingPolicy( + (DCAwareRoundRobinPolicy.builder().allowRemoteDCsForLocalConsistencyLevel().build())); + } else { + builder = builder.withLoadBalancingPolicy((DCAwareRoundRobinPolicy.builder().build())); + } + } + break; + } + case "TokenAwareRoundRobinPolicy": + builder = builder.withLoadBalancingPolicy(new TokenAwarePolicy(new RoundRobinPolicy())); + break; + case "TokenAwareDCAwareRoundRobinPolicy": { + String dataCenter = properties.getProperty(CassandraStoreParameters.DATA_CENTER); + boolean allowRemoteDCsForLocalConsistencyLevel = Boolean.parseBoolean( + properties.getProperty(CassandraStoreParameters.ALLOW_REMOTE_DCS_FOR_LOCAL_CONSISTENCY_LEVEL)); + if (dataCenter != null && !dataCenter.isEmpty()) { + if (allowRemoteDCsForLocalConsistencyLevel) { + builder = builder.withLoadBalancingPolicy(new TokenAwarePolicy( + DCAwareRoundRobinPolicy.builder().withLocalDc(dataCenter) + .allowRemoteDCsForLocalConsistencyLevel().build())); + } else { + builder = builder.withLoadBalancingPolicy(new TokenAwarePolicy( + DCAwareRoundRobinPolicy.builder().withLocalDc(dataCenter).build())); + } + } else { + if (allowRemoteDCsForLocalConsistencyLevel) { + builder = builder.withLoadBalancingPolicy(new TokenAwarePolicy( + DCAwareRoundRobinPolicy.builder().allowRemoteDCsForLocalConsistencyLevel().build())); + } else { + builder = builder.withLoadBalancingPolicy( + new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build())); + } + } + break; + } + default: + LOG.error("Unsupported Cassandra load balancing policy: {} ", loadBalancingProp); + break; + } + } + return builder; + } + + private Cluster.Builder populateCredentials(Properties properties, Cluster.Builder builder) { + String usernameProp = properties.getProperty(CassandraStoreParameters.USERNAME); + String passwordProp = properties.getProperty(CassandraStoreParameters.PASSWORD); + if (usernameProp != null) { + builder = builder.withCredentials(usernameProp, passwordProp); + } + return builder; + } + + private Cluster.Builder populatePoolingSettings(Properties properties, Cluster.Builder builder) { + String localCoreConnectionsPerHost = properties.getProperty(CassandraStoreParameters.LOCAL_CORE_CONNECTIONS_PER_HOST); + String remoteCoreConnectionsPerHost = properties.getProperty(CassandraStoreParameters.REMOTE_CORE_CONNECTIONS_PER_HOST); + String localMaxConnectionsPerHost = properties.getProperty(CassandraStoreParameters.LOCAL_MAX_CONNECTIONS_PER_HOST); + String remoteMaxConnectionsPerHost = properties.getProperty(CassandraStoreParameters.REMOTE_MAX_CONNECTIONS_PER_HOST); + String localNewConnectionThreshold = properties.getProperty(CassandraStoreParameters.LOCAL_NEW_CONNECTION_THRESHOLD); + String remoteNewConnectionThreshold = properties.getProperty(CassandraStoreParameters.REMOTE_NEW_CONNECTION_THRESHOLD); + String localMaxRequestsPerConnection = properties.getProperty(CassandraStoreParameters.LOCAL_MAX_REQUESTS_PER_CONNECTION); + String remoteMaxRequestsPerConnection = properties.getProperty(CassandraStoreParameters.REMOTE_MAX_REQUESTS_PER_CONNECTION); + PoolingOptions options = new PoolingOptions(); + if (localCoreConnectionsPerHost != null) { + options.setCoreConnectionsPerHost(HostDistance.LOCAL, Integer.parseInt(localCoreConnectionsPerHost)); + } + if (remoteCoreConnectionsPerHost != null) { + options.setCoreConnectionsPerHost(HostDistance.REMOTE, Integer.parseInt(remoteCoreConnectionsPerHost)); + } + if (localMaxConnectionsPerHost != null) { + options.setMaxConnectionsPerHost(HostDistance.LOCAL, Integer.parseInt(localMaxConnectionsPerHost)); + } + if (remoteMaxConnectionsPerHost != null) { + options.setMaxConnectionsPerHost(HostDistance.REMOTE, Integer.parseInt(remoteMaxConnectionsPerHost)); + } + if (localNewConnectionThreshold != null) { + options.setNewConnectionThreshold(HostDistance.LOCAL, Integer.parseInt(localNewConnectionThreshold)); + } + if (remoteNewConnectionThreshold != null) { + options.setNewConnectionThreshold(HostDistance.REMOTE, Integer.parseInt(remoteNewConnectionThreshold)); + } + if (localMaxRequestsPerConnection != null) { + options.setMaxRequestsPerConnection(HostDistance.LOCAL, Integer.parseInt(localMaxRequestsPerConnection)); + } + if (remoteMaxRequestsPerConnection != null) { + options.setMaxRequestsPerConnection(HostDistance.REMOTE, Integer.parseInt(remoteMaxRequestsPerConnection)); + } + builder = builder.withPoolingOptions(options); + return builder; + } + + private Cluster.Builder populateQueryOptions(Properties properties, Cluster.Builder builder) { + String consistencyLevelProp = properties.getProperty(CassandraStoreParameters.CONSISTENCY_LEVEL); + String serialConsistencyLevelProp = properties.getProperty(CassandraStoreParameters.SERIAL_CONSISTENCY_LEVEL); + String fetchSize = properties.getProperty(CassandraStoreParameters.FETCH_SIZE); + QueryOptions options = new QueryOptions(); + if (consistencyLevelProp != null) { + options.setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevelProp)); + } + if (serialConsistencyLevelProp != null) { + options.setSerialConsistencyLevel(ConsistencyLevel.valueOf(serialConsistencyLevelProp)); + } + if (fetchSize != null) { + options.setFetchSize(Integer.parseInt(fetchSize)); + } + return builder.withQueryOptions(options); + } + + private Cluster.Builder populateReconnectPolicy(Properties properties, Cluster.Builder builder) { + String reconnectionPolicy = properties.getProperty(CassandraStoreParameters.RECONNECTION_POLICY); + if (reconnectionPolicy != null) { + switch (reconnectionPolicy) { + case "ConstantReconnectionPolicy": { + String constantReconnectionPolicyDelay = properties.getProperty(CassandraStoreParameters.CONSTANT_RECONNECTION_POLICY_DELAY); + ConstantReconnectionPolicy policy = new ConstantReconnectionPolicy(Long.parseLong(constantReconnectionPolicyDelay)); + builder = builder.withReconnectionPolicy(policy); + break; + } + case "ExponentialReconnectionPolicy": { + String exponentialReconnectionPolicyBaseDelay = properties.getProperty(CassandraStoreParameters.EXPONENTIAL_RECONNECTION_POLICY_BASE_DELAY); + String exponentialReconnectionPolicyMaxDelay = properties.getProperty(CassandraStoreParameters.EXPONENTIAL_RECONNECTION_POLICY_MAX_DELAY); + + ExponentialReconnectionPolicy policy = new ExponentialReconnectionPolicy(Long.parseLong(exponentialReconnectionPolicyBaseDelay), + Long.parseLong(exponentialReconnectionPolicyMaxDelay)); + builder = builder.withReconnectionPolicy(policy); + break; + } + default: + LOG.error("Unsupported reconnection policy : {} ", reconnectionPolicy); + } + } + return builder; + } + + private Cluster.Builder populateRetrytPolicy(Properties properties, Cluster.Builder builder) { + String retryPolicy = properties.getProperty(CassandraStoreParameters.RETRY_POLICY); + if (retryPolicy != null) { + switch (retryPolicy) { + case "DefaultRetryPolicy": + builder = builder.withRetryPolicy(DefaultRetryPolicy.INSTANCE); + break; + case "DowngradingConsistencyRetryPolicy": + builder = builder.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE); + break; + case "FallthroughRetryPolicy": + builder = builder.withRetryPolicy(FallthroughRetryPolicy.INSTANCE); + break; + case "LoggingDefaultRetryPolicy": + builder = builder.withRetryPolicy(new LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE)); + break; + case "LoggingDowngradingConsistencyRetryPolicy": + builder = builder.withRetryPolicy(new LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)); + break; + case "LoggingFallthroughRetryPolicy": + builder = builder.withRetryPolicy(new LoggingRetryPolicy(FallthroughRetryPolicy.INSTANCE)); + break; + default: + LOG.error("Unsupported retry policy : {} ", retryPolicy); + break; + } + } + return builder; + } + + private Cluster.Builder populateSocketOptions(Properties properties, Cluster.Builder builder) { + String connectionTimeoutMillisProp = properties.getProperty(CassandraStoreParameters.CONNECTION_TIMEOUT_MILLIS); + String keepAliveProp = properties.getProperty(CassandraStoreParameters.KEEP_ALIVE); + String readTimeoutMillisProp = properties.getProperty(CassandraStoreParameters.READ_TIMEOUT_MILLIS); + String receiveBufferSizeProp = properties.getProperty(CassandraStoreParameters.RECEIVER_BUFFER_SIZE); + String reuseAddress = properties.getProperty(CassandraStoreParameters.REUSE_ADDRESS); + String sendBufferSize = properties.getProperty(CassandraStoreParameters.SEND_BUFFER_SIZE); + String soLinger = properties.getProperty(CassandraStoreParameters.SO_LINGER); + String tcpNoDelay = properties.getProperty(CassandraStoreParameters.TCP_NODELAY); + SocketOptions options = new SocketOptions(); + if (connectionTimeoutMillisProp != null) { + options.setConnectTimeoutMillis(Integer.parseInt(connectionTimeoutMillisProp)); + } + if (keepAliveProp != null) { + options.setKeepAlive(Boolean.parseBoolean(keepAliveProp)); + } + if (readTimeoutMillisProp != null) { + options.setReadTimeoutMillis(Integer.parseInt(readTimeoutMillisProp)); + } + if (receiveBufferSizeProp != null) { + options.setReceiveBufferSize(Integer.parseInt(receiveBufferSizeProp)); + } + if (reuseAddress != null) { + options.setReuseAddress(Boolean.parseBoolean(reuseAddress)); + } + if (sendBufferSize != null) { + options.setSendBufferSize(Integer.parseInt(sendBufferSize)); + } + if (soLinger != null) { + options.setSoLinger(Integer.parseInt(soLinger)); + } + if (tcpNoDelay != null) { + options.setTcpNoDelay(Boolean.parseBoolean(tcpNoDelay)); + } + return builder.withSocketOptions(options); + } + + + private List<String> readCustomCodec(Properties properties) throws JDOMException, IOException { + String filename = properties.getProperty(CassandraStoreParameters.CUSTOM_CODEC_FILE); + if (filename != null) { + List<String> codecs = new ArrayList<>(); + SAXBuilder builder = new SAXBuilder(); + Document doc = builder.build(getClass().getClassLoader().getResourceAsStream(filename)); + List<Element> codecElementList = doc.getRootElement().getChildren("codec"); + for (Element codec : codecElementList) { + codecs.add(codec.getValue()); + } + return codecs; + } + return null; + } + + + public void close() { + this.session.close(); + this.cluster.close(); + } + + private void registerCustomCodecs(List<String> codecs) throws Exception { + for (String codec : codecs) { + this.cluster.getConfiguration().getCodecRegistry().register((TypeCodec<?>) Class.forName(codec).newInstance()); + } + } + +} http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java new file mode 100644 index 0000000..bb8c6ad --- /dev/null +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java @@ -0,0 +1,234 @@ +package org.apache.gora.cassandra.store; + +import org.apache.gora.cassandra.bean.CassandraKey; +import org.apache.gora.cassandra.bean.ClusterKeyField; +import org.apache.gora.cassandra.bean.Field; +import org.apache.gora.cassandra.bean.KeySpace; +import org.apache.gora.cassandra.bean.PartitionKeyField; +import org.apache.gora.persistency.Persistent; +import org.jdom.Attribute; +import org.jdom.Document; +import org.jdom.Element; +import org.jdom.input.SAXBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; + +/** + * Created by madhawa on 6/28/17. + */ +public class CassandraMappingBuilder<K, T extends Persistent> { + + private static final Logger LOG = LoggerFactory.getLogger(CassandraMappingBuilder.class); + + + private CassandraStore dataStore; + + + /** + * Constructor for builder to create the mapper. + * + * @param store + */ + public CassandraMappingBuilder(final CassandraStore<K, T> store) { + this.dataStore = store; + } + + /** + * In this method we reads the mapping file and creates the Cassandra Mapping. + * + * @param filename mapping file name + * @return @{@link CassandraMapping} + * @throws IOException + */ + @SuppressWarnings("all") + public CassandraMapping readMapping(String filename) throws IOException { + CassandraMapping map = new CassandraMapping(); + Class keyClass = dataStore.getKeyClass(); + Class persistentClass = dataStore. getPersistentClass(); + try { + SAXBuilder builder = new SAXBuilder(); + Document doc = builder.build(getClass().getClassLoader().getResourceAsStream(filename)); + + List<Element> keyspaces = doc.getRootElement().getChildren("keyspace"); + List<Element> classes = doc.getRootElement().getChildren("class"); + List<Element> keys = doc.getRootElement().getChildren("cassandraKey"); + + boolean classMatched = false; + for (Element classElement : classes) { + if (classElement.getAttributeValue("keyClass").equals( + keyClass.getCanonicalName()) + && classElement.getAttributeValue("name").equals( + persistentClass.getCanonicalName())) { + + classMatched = true; + String tableName = classElement.getAttributeValue("table"); + map.setCoreName(tableName); + + List classAttributes = classElement.getAttributes(); + for (Object anAttributeList : classAttributes) { + Attribute attribute = (Attribute) anAttributeList; + String attributeName = attribute.getName(); + String attributeValue = attribute.getValue(); + map.addProperty(attributeName, attributeValue); + } + + List<Element> fields = classElement.getChildren("field"); + + for (Element field : fields) { + Field cassandraField = new Field(); + + List fieldAttributes = field.getAttributes(); + processAttributes(fieldAttributes, cassandraField); + map.addCassandraField(cassandraField); + } + break; + } + LOG.warn("Check that 'keyClass' and 'name' parameters in gora-solr-mapping.xml " + + "match with intended values. A mapping mismatch has been found therefore " + + "no mapping has been initialized for class mapping at position " + + " {} in mapping file.", classes.indexOf(classElement)); + } + if (!classMatched) { + LOG.error("Check that 'keyClass' and 'name' parameters in {} no mapping has been initialized for {} class mapping", filename, persistentClass); + } + + String keyspaceName = map.getProperty("keyspace"); + if (keyspaceName != null) { + KeySpace keyspace; + for (Element keyspaceElement : keyspaces) { + if (keyspaceName.equals(keyspaceElement.getAttributeValue("name"))) { + keyspace = new KeySpace(); + List fieldAttributes = keyspaceElement.getAttributes(); + for (Object attributeObject : fieldAttributes) { + Attribute attribute = (Attribute) attributeObject; + String attributeName = attribute.getName(); + String attributeValue = attribute.getValue(); + switch (attributeName) { + case "name": + keyspace.setName(attributeValue); + break; + case "durableWrite": + keyspace.setDurableWritesEnabled(Boolean.parseBoolean(attributeValue)); + break; + default: + keyspace.addProperty(attributeName, attributeValue); + break; + } + } + Element placementStrategy = keyspaceElement.getChild("placementStrategy"); + switch (KeySpace.PlacementStrategy.valueOf(placementStrategy.getAttributeValue("name"))) { + case SimpleStrategy: + keyspace.setPlacementStrategy(KeySpace.PlacementStrategy.SimpleStrategy); + keyspace.setReplicationFactor(Integer.parseInt(placementStrategy.getAttributeValue("replication_factor"))); + break; + case NetworkTopologyStrategy: + List<Element> dataCenters = placementStrategy.getChildren("datacenter"); + keyspace.setPlacementStrategy(KeySpace.PlacementStrategy.NetworkTopologyStrategy); + for (Element dataCenter : dataCenters) { + String dataCenterName = dataCenter.getAttributeValue("name"); + Integer dataCenterReplicationFactor = Integer.valueOf(dataCenter.getAttributeValue("replication_factor")); + keyspace.addDataCenter(dataCenterName, dataCenterReplicationFactor); + } + break; + } + map.setKeySpace(keyspace); + break; + } + + } + + } + + for (Element key : keys) { + if (keyClass.getName().equals(key.getAttributeValue("name"))) { + CassandraKey cassandraKey = new CassandraKey(keyClass.getName()); + Element partitionKeys = key.getChild("partitionKey"); + Element clusterKeys = key.getChild("clusterKey"); + List<Element> partitionKeyFields = partitionKeys.getChildren("field"); + List<Element> partitionCompositeKeyFields = partitionKeys.getChildren("compositeKey"); + // process non composite partition keys + for (Element partitionKeyField : partitionKeyFields) { + PartitionKeyField fieldKey = new PartitionKeyField(); + List fieldAttributes = partitionKeyField.getAttributes(); + processAttributes(fieldAttributes, fieldKey); + cassandraKey.addPartitionKeyField(fieldKey); + } + // process composite partitions keys + for (Element partitionCompositeKeyField : partitionCompositeKeyFields) { + PartitionKeyField compositeFieldKey = new PartitionKeyField(); + compositeFieldKey.setComposite(true); + List<Element> compositeKeyFields = partitionCompositeKeyField.getChildren("field"); + for (Element partitionKeyField : compositeKeyFields) { + PartitionKeyField fieldKey = new PartitionKeyField(); + List fieldAttributes = partitionKeyField.getAttributes(); + processAttributes(fieldAttributes, fieldKey); + compositeFieldKey.addField(fieldKey); + } + cassandraKey.addPartitionKeyField(compositeFieldKey); + } + + //process cluster keys + List<Element> clusterKeyFields = clusterKeys.getChildren("field"); + for (Element clusterKeyField : clusterKeyFields) { + ClusterKeyField keyField = new ClusterKeyField(); + List fieldAttributes = clusterKeyField.getAttributes(); + for (Object anAttributeList : fieldAttributes) { + Attribute attribute = (Attribute) anAttributeList; + String attributeName = attribute.getName(); + String attributeValue = attribute.getValue(); + switch (attributeName) { + case "name": + keyField.setFieldName(attributeValue); + break; + case "column": + keyField.setColumnName(attributeValue); + break; + case "type": + keyField.setType(attributeValue); + break; + case "order": + keyField.setOrder(ClusterKeyField.Order.valueOf(attributeValue.toUpperCase(Locale.ENGLISH))); + break; + default: + keyField.addProperty(attributeName, attributeValue); + break; + } + } + cassandraKey.addClusterKeyField(keyField); + } + map.setCassandraKey(cassandraKey); + } + } + } catch (Exception ex) { + throw new IOException(ex); + } + return map; + } + + private void processAttributes(List<Element> attributes, Field fieldKey) { + for (Object anAttributeList : attributes) { + Attribute attribute = (Attribute) anAttributeList; + String attributeName = attribute.getName(); + String attributeValue = attribute.getValue(); + switch (attributeName) { + case "name": + fieldKey.setFieldName(attributeValue); + break; + case "column": + fieldKey.setColumnName(attributeValue); + break; + case "type": + fieldKey.setType(attributeValue); + break; + default: + fieldKey.addProperty(attributeName, attributeValue); + break; + } + } + } + +} http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraQueryFactory.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraQueryFactory.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraQueryFactory.java deleted file mode 100644 index fc90c5f..0000000 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraQueryFactory.java +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gora.cassandra.store; - -import org.apache.gora.cassandra.bean.CassandraKey; -import org.apache.gora.cassandra.bean.ClusterKeyField; -import org.apache.gora.cassandra.bean.Field; -import org.apache.gora.cassandra.bean.KeySpace; -import org.apache.gora.cassandra.bean.PartitionKeyField; - -import java.util.List; -import java.util.Map; - -/** - * This class is used create Cassandra Queries. - */ -class CassandraQueryFactory { - - /** - * This method returns the CQL query to create key space. - * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/create_keyspace_r.html - * - * @param mapping Cassandra Mapping - * @return CQL Query - */ - static String getCreateKeySpaceQuery(CassandraMapping mapping) { - KeySpace keySpace = mapping.getKeySpace(); - StringBuilder stringBuffer = new StringBuilder(); - stringBuffer.append("CREATE KEYSPACE IF NOT EXISTS ").append(keySpace.getName()).append(" WITH REPLICATION = { 'class' : "); - KeySpace.PlacementStrategy placementStrategy = keySpace.getPlacementStrategy(); - stringBuffer.append("'").append(placementStrategy).append("'").append(", ").append("'"); - switch (placementStrategy) { - case SimpleStrategy: - stringBuffer.append("replication_factor").append("'").append(" : ").append(keySpace.getReplicationFactor()).append(" }"); - break; - case NetworkTopologyStrategy: - boolean isCommaNeeded = false; - for (Map.Entry<String, Integer> entry : keySpace.getDataCenters().entrySet()) { - if (isCommaNeeded) { - stringBuffer.append(", '"); - } - stringBuffer.append(entry.getKey()).append("'").append(" : ").append(entry.getValue()); - isCommaNeeded = true; - } - stringBuffer.append(" }"); - break; - } - - if (keySpace.isDurableWritesEnabled()) { - stringBuffer.append(" AND DURABLE_WRITES = ").append(keySpace.isDurableWritesEnabled()); - } - return stringBuffer.toString(); - } - - static String getCreateTableQuery(CassandraMapping mapping) { - StringBuilder stringBuffer = new StringBuilder(); - stringBuffer.append("CREATE TABLE IF NOT EXISTS ").append(mapping.getKeySpace().getName()).append(".").append(mapping.getCoreName()).append(" ("); - boolean isCommaNeeded = false; - CassandraKey cassandraKey = mapping.getCassandraKey(); - // appending Cassandra key columns into db schema - if (cassandraKey != null) { - for (PartitionKeyField partitionKeyField : cassandraKey.getPartitionKeyFields()) { - if (partitionKeyField.isComposite()) { - for (Field compositeField : partitionKeyField.getFields()) { - stringBuffer = processFields(stringBuffer, compositeField, isCommaNeeded); - } - - } else { - stringBuffer = processFields(stringBuffer, partitionKeyField, isCommaNeeded); - } - isCommaNeeded = true; - } - for (ClusterKeyField clusterKeyField : cassandraKey.getClusterKeyFields()) { - stringBuffer = processFields(stringBuffer, clusterKeyField, isCommaNeeded); - } - } - // appending Other columns - for (Field field : mapping.getFieldList()) { - if (isCommaNeeded) { - stringBuffer.append(", "); - } - stringBuffer.append(field.getColumnName()).append(" ").append(field.getType()); - boolean isStaticColumn = Boolean.parseBoolean(field.getProperty("static")); - boolean isPrimaryKey = Boolean.parseBoolean(field.getProperty("primarykey")); - if (isStaticColumn) { - stringBuffer.append(" STATIC"); - } - if (isPrimaryKey) { - stringBuffer.append(" PRIMARY KEY "); - } - isCommaNeeded = true; - } - - if (cassandraKey != null) { - List<PartitionKeyField> pkey = cassandraKey.getPartitionKeyFields(); - if (pkey != null) { - stringBuffer.append(", PRIMARY KEY ("); - boolean isCommaNeededToApply = false; - for (PartitionKeyField keyField : pkey) { - if (isCommaNeededToApply) { - stringBuffer.append(","); - } - if (keyField.isComposite()) { - stringBuffer.append("("); - boolean isCommaNeededHere = false; - for (Field field : keyField.getFields()) { - if (isCommaNeededHere) { - stringBuffer.append(", "); - } - stringBuffer.append(field.getColumnName()); - isCommaNeededHere = true; - } - stringBuffer.append(")"); - } else { - stringBuffer.append(keyField.getColumnName()); - } - isCommaNeededToApply = true; - } - stringBuffer.append(")"); - } - } - - stringBuffer.append(")"); - boolean isWithNeeded = true; - if (Boolean.parseBoolean(mapping.getProperty("compactStorage"))) { - stringBuffer.append(" WITH COMPACT STORAGE "); - isWithNeeded = false; - } - - String id = mapping.getProperty("id"); - if (id != null) { - if (isWithNeeded) { - stringBuffer.append(" WITH "); - } else { - stringBuffer.append(" AND "); - } - stringBuffer.append("ID = '").append(id).append("'"); - isWithNeeded = false; - } - if (cassandraKey != null) { - List<ClusterKeyField> clusterKeyFields = cassandraKey.getClusterKeyFields(); - if (clusterKeyFields != null) { - if (isWithNeeded) { - stringBuffer.append(" WITH "); - } else { - stringBuffer.append(" AND "); - } - stringBuffer.append(" CLUSTERING ORDER BY ("); - boolean isCommaNeededToApply = false; - for (ClusterKeyField keyField : clusterKeyFields) { - if (isCommaNeededToApply) { - stringBuffer.append(", "); - } - stringBuffer.append(keyField.getColumnName()).append(" "); - if (keyField.getOrder() != null) { - stringBuffer.append(keyField.getOrder()); - } - isCommaNeededToApply = true; - } - stringBuffer.append(")"); - } - } - return stringBuffer.toString(); - } - - private static StringBuilder processFields(StringBuilder stringBuilder, Field field, boolean isCommaNeeded) { - if (isCommaNeeded) { - stringBuilder.append(", "); - } - stringBuilder.append(field.getColumnName()).append(" ").append(field.getType()); - boolean isStaticColumn = Boolean.parseBoolean(field.getProperty("static")); - if (isStaticColumn) { - stringBuilder.append(" STATIC"); - } - return stringBuilder; - } - - /** - * This method returns the CQL query to drop table. - * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/drop_table_r.html - * - * @param mapping Cassandra Mapping - * @return CQL query - */ - static String getDropTableQuery(CassandraMapping mapping) { - return "DROP TABLE IF EXISTS " + mapping.getKeySpace().getName() + "." + mapping.getCoreName(); - } - - /** - * This method returns the CQL query to drop key space. - * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/drop_keyspace_r.html - * - * @param mapping Cassandra Mapping - * @return CQL query - */ - static String getDropKeySpaceQuery(CassandraMapping mapping) { - return "DROP KEYSPACE IF EXISTS " + mapping.getKeySpace().getName(); - } - - /** - * This method returns the CQL query to truncate (removes all the data) in the table. - * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/truncate_r.html - * - * @param mapping Cassandra Mapping - * @return CQL query - */ - static String getTruncateTableQuery(CassandraMapping mapping) { - return "TRUNCATE TABLE " + mapping.getKeySpace().getName() + "." + mapping.getCoreName(); - } -}
