IGNITE-3609 Utilize Cassandra logged batches for transactions. - Fixes #1111.
Signed-off-by: Alexey Kuznetsov <akuznet...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3b8aca64 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3b8aca64 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3b8aca64 Branch: refs/heads/master Commit: 3b8aca64b8ebe6ba21f5d02f50cf69ad46dbbc95 Parents: 00576d8 Author: Igor <irud...@gmail.com> Authored: Fri Sep 30 14:39:30 2016 +0700 Committer: Alexey Kuznetsov <akuznet...@apache.org> Committed: Fri Sep 30 14:39:30 2016 +0700 ---------------------------------------------------------------------- .../store/cassandra/CassandraCacheStore.java | 112 ++++-- .../store/cassandra/common/CassandraHelper.java | 29 +- .../store/cassandra/persistence/PojoField.java | 9 +- .../cassandra/persistence/PojoValueField.java | 2 - .../cassandra/session/CassandraSession.java | 10 + .../cassandra/session/CassandraSessionImpl.java | 113 +++++- .../session/transaction/BaseMutation.java | 68 ++++ .../session/transaction/DeleteMutation.java | 57 +++ .../cassandra/session/transaction/Mutation.java | 64 +++ .../session/transaction/WriteMutation.java | 60 +++ .../session/transaction/package-info.java | 21 + .../tests/CassandraDirectPersistenceTest.java | 396 ++++++++++++++++--- .../ignite/tests/CassandraLocalServer.java | 58 +++ .../apache/ignite/tests/DDLGeneratorTest.java | 35 +- .../ignite/tests/IgnitePersistentStoreTest.java | 265 +++++++++++++ .../org/apache/ignite/tests/pojos/Product.java | 123 ++++++ .../apache/ignite/tests/pojos/ProductOrder.java | 148 +++++++ .../ignite/tests/utils/CacheStoreHelper.java | 19 +- .../ignite/tests/utils/TestCacheSession.java | 12 +- .../ignite/tests/utils/TestTransaction.java | 133 +++++++ .../apache/ignite/tests/utils/TestsHelper.java | 299 +++++++++++++- .../tests/persistence/pojo/ignite-config.xml | 41 +- .../ignite/tests/persistence/pojo/order.xml | 21 + .../ignite/tests/persistence/pojo/product.xml | 21 + .../store/src/test/resources/tests.properties | 15 + 25 files changed, 2005 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java index 6aef0c4..aead39a 100644 --- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java @@ -22,8 +22,10 @@ import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Row; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; +import java.util.HashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -41,6 +43,9 @@ import org.apache.ignite.cache.store.cassandra.session.CassandraSession; import org.apache.ignite.cache.store.cassandra.session.ExecutionAssistant; import org.apache.ignite.cache.store.cassandra.session.GenericBatchExecutionAssistant; import org.apache.ignite.cache.store.cassandra.session.LoadCacheCustomQueryWorker; +import org.apache.ignite.cache.store.cassandra.session.transaction.DeleteMutation; +import org.apache.ignite.cache.store.cassandra.session.transaction.Mutation; +import org.apache.ignite.cache.store.cassandra.session.transaction.WriteMutation; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.logger.NullLogger; @@ -54,14 +59,16 @@ import org.apache.ignite.resources.LoggerResource; * @param <V> Ignite cache value type. */ public class CassandraCacheStore<K, V> implements CacheStore<K, V> { - /** Connection attribute property name. */ - private static final String ATTR_CONN_PROP = "CASSANDRA_STORE_CONNECTION"; + /** Buffer to store mutations performed withing transaction. */ + private static final String TRANSACTION_BUFFER = "CASSANDRA_TRANSACTION_BUFFER"; /** Auto-injected store session. */ + @SuppressWarnings("unused") @CacheStoreSessionResource private CacheStoreSession storeSes; /** Auto-injected logger instance. */ + @SuppressWarnings("unused") @LoggerResource private IgniteLogger log; @@ -127,12 +134,22 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> { /** {@inheritDoc} */ @Override public void sessionEnd(boolean commit) throws CacheWriterException { - if (storeSes == null || storeSes.transaction() == null) + if (!storeSes.isWithinTransaction()) return; - CassandraSession cassandraSes = (CassandraSession) storeSes.properties().remove(ATTR_CONN_PROP); + List<Mutation> mutations = mutations(); + if (mutations == null || mutations.isEmpty()) + return; - U.closeQuiet(cassandraSes); + CassandraSession ses = getCassandraSession(); + + try { + ses.execute(mutations); + } + finally { + mutations.clear(); + U.closeQuiet(ses); + } } /** {@inheritDoc} */ @@ -182,7 +199,7 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> { }); } finally { - closeCassandraSession(ses); + U.closeQuiet(ses); } } @@ -235,7 +252,7 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> { }, keys); } finally { - closeCassandraSession(ses); + U.closeQuiet(ses); } } @@ -244,6 +261,11 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> { if (entry == null || entry.getKey() == null) return; + if (storeSes.isWithinTransaction()) { + accumulate(new WriteMutation(entry, cassandraTable(), controller)); + return; + } + CassandraSession ses = getCassandraSession(); try { @@ -285,7 +307,7 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> { }); } finally { - closeCassandraSession(ses); + U.closeQuiet(ses); } } @@ -294,6 +316,13 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> { if (entries == null || entries.isEmpty()) return; + if (storeSes.isWithinTransaction()) { + for (Cache.Entry<?, ?> entry : entries) + accumulate(new WriteMutation(entry, cassandraTable(), controller)); + + return; + } + CassandraSession ses = getCassandraSession(); try { @@ -331,7 +360,7 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> { }, entries); } finally { - closeCassandraSession(ses); + U.closeQuiet(ses); } } @@ -340,6 +369,11 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> { if (key == null) return; + if (storeSes.isWithinTransaction()) { + accumulate(new DeleteMutation(key, cassandraTable(), controller)); + return; + } + CassandraSession ses = getCassandraSession(); try { @@ -382,7 +416,7 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> { }); } finally { - closeCassandraSession(ses); + U.closeQuiet(ses); } } @@ -391,6 +425,13 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> { if (keys == null || keys.isEmpty()) return; + if (storeSes.isWithinTransaction()) { + for (Object key : keys) + accumulate(new DeleteMutation(key, cassandraTable(), controller)); + + return; + } + CassandraSession ses = getCassandraSession(); try { @@ -422,7 +463,7 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> { }, keys); } finally { - closeCassandraSession(ses); + U.closeQuiet(ses); } } @@ -433,36 +474,43 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> { * @return Cassandra session wrapper. */ private CassandraSession getCassandraSession() { - if (storeSes == null || storeSes.transaction() == null) - return dataSrc.session(log != null ? log : new NullLogger()); - - CassandraSession ses = (CassandraSession) storeSes.properties().get(ATTR_CONN_PROP); - - if (ses == null) { - ses = dataSrc.session(log != null ? log : new NullLogger()); - storeSes.properties().put(ATTR_CONN_PROP, ses); - } + return dataSrc.session(log != null ? log : new NullLogger()); + } - return ses; + /** + * Returns table name to use for all Cassandra based operations (READ/WRITE/DELETE). + * + * @return Table name. + */ + private String cassandraTable() { + return controller.getPersistenceSettings().getTable() != null ? + controller.getPersistenceSettings().getTable() : storeSes.cacheName().trim().toLowerCase(); } /** - * Releases Cassandra related resources. + * Accumulates mutation in the transaction buffer. * - * @param ses Cassandra session wrapper. + * @param mutation Mutation operation. */ - private void closeCassandraSession(CassandraSession ses) { - if (ses != null && (storeSes == null || storeSes.transaction() == null)) - U.closeQuiet(ses); + private void accumulate(Mutation mutation) { + //noinspection unchecked + List<Mutation> mutations = (List<Mutation>)storeSes.properties().get(TRANSACTION_BUFFER); + + if (mutations == null) { + mutations = new LinkedList<>(); + storeSes.properties().put(TRANSACTION_BUFFER, mutations); + } + + mutations.add(mutation); } /** - * Returns table name to use for all Cassandra based operations (READ/WRITE/DELETE). + * Returns all the mutations performed withing transaction. * - * @return Table name. + * @return Mutations */ - private String cassandraTable() { - return controller.getPersistenceSettings().getTable() != null ? - controller.getPersistenceSettings().getTable() : storeSes.cacheName().toLowerCase(); + private List<Mutation> mutations() { + //noinspection unchecked + return (List<Mutation>)storeSes.properties().get(TRANSACTION_BUFFER); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/CassandraHelper.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/CassandraHelper.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/CassandraHelper.java index 9066112..badd5df 100644 --- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/CassandraHelper.java +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/CassandraHelper.java @@ -20,9 +20,13 @@ package org.apache.ignite.cache.store.cassandra.common; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.DataType; import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.DriverException; import com.datastax.driver.core.exceptions.InvalidQueryException; import com.datastax.driver.core.exceptions.NoHostAvailableException; import com.datastax.driver.core.exceptions.ReadTimeoutException; + +import java.net.InetSocketAddress; +import java.util.Map; import java.util.regex.Pattern; import org.apache.ignite.internal.util.typedef.internal.U; @@ -36,8 +40,15 @@ public class CassandraHelper { /** Cassandra error message if trying to create table inside nonexistent keyspace. */ private static final Pattern KEYSPACE_EXIST_ERROR2 = Pattern.compile("Cannot add table '[0-9a-zA-Z_]+' to non existing keyspace.*"); + /** Cassandra error message if trying to create table inside nonexistent keyspace. */ + private static final Pattern KEYSPACE_EXIST_ERROR3 = Pattern.compile("Error preparing query, got ERROR INVALID: " + + "Keyspace [0-9a-zA-Z_]+ does not exist"); + + /** Cassandra error message if specified table doesn't exist. */ + private static final Pattern TABLE_EXIST_ERROR1 = Pattern.compile("unconfigured table [0-9a-zA-Z_]+"); + /** Cassandra error message if specified table doesn't exist. */ - private static final Pattern TABLE_EXIST_ERROR = Pattern.compile("unconfigured table [0-9a-zA-Z_]+"); + private static final String TABLE_EXIST_ERROR2 = "Error preparing query, got ERROR INVALID: unconfigured table"; /** Cassandra error message if trying to use prepared statement created from another session. */ private static final String PREP_STATEMENT_CLUSTER_INSTANCE_ERROR = "You may have used a PreparedStatement that " + @@ -85,11 +96,25 @@ public class CassandraHelper { public static boolean isTableAbsenceError(Throwable e) { while (e != null) { if (e instanceof InvalidQueryException && - (TABLE_EXIST_ERROR.matcher(e.getMessage()).matches() || + (TABLE_EXIST_ERROR1.matcher(e.getMessage()).matches() || KEYSPACE_EXIST_ERROR1.matcher(e.getMessage()).matches() || KEYSPACE_EXIST_ERROR2.matcher(e.getMessage()).matches())) return true; + if (e instanceof NoHostAvailableException && ((NoHostAvailableException) e).getErrors() != null) { + NoHostAvailableException ex = (NoHostAvailableException)e; + + for (Map.Entry<InetSocketAddress, Throwable> entry : ex.getErrors().entrySet()) { + //noinspection ThrowableResultOfMethodCallIgnored + Throwable error = entry.getValue(); + + if (error instanceof DriverException && + (error.getMessage().contains(TABLE_EXIST_ERROR2) || + KEYSPACE_EXIST_ERROR3.matcher(error.getMessage()).matches())) + return true; + } + } + e = e.getCause(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java index d708a34..78e75a9 100644 --- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java @@ -85,11 +85,10 @@ public abstract class PojoField implements Serializable { public PojoField(PropertyDescriptor desc) { this.name = desc.getName(); - QuerySqlField sqlField = desc.getReadMethod() != null ? - desc.getReadMethod().getAnnotation(QuerySqlField.class) : - desc.getWriteMethod() == null ? - null : - desc.getWriteMethod().getAnnotation(QuerySqlField.class); + QuerySqlField sqlField = desc.getReadMethod() != null && + desc.getReadMethod().getAnnotation(QuerySqlField.class) != null ? + desc.getReadMethod().getAnnotation(QuerySqlField.class) : + desc.getWriteMethod() == null ? null : desc.getWriteMethod().getAnnotation(QuerySqlField.class); col = sqlField != null && sqlField.name() != null && !sqlField.name().trim().isEmpty() ? sqlField.name() : name.toLowerCase(); http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java index c3512c3..3e636c0 100644 --- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java @@ -146,7 +146,5 @@ public class PojoValueField extends PojoField { * @param sqlField {@link QuerySqlField} annotation. */ protected void init(QuerySqlField sqlField) { - if (sqlField.index()) - isIndexed = true; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java index 506982f..b0e50ec 100644 --- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java @@ -17,7 +17,10 @@ package org.apache.ignite.cache.store.cassandra.session; +import org.apache.ignite.cache.store.cassandra.session.transaction.Mutation; + import java.io.Closeable; +import java.util.List; /** * Wrapper around Cassandra driver session, to automatically handle: @@ -57,4 +60,11 @@ public interface CassandraSession extends Closeable { * @param assistant execution assistance to perform the main operation logic. */ public void execute(BatchLoaderAssistant assistant); + + /** + * Executes all the mutations performed withing Ignite transaction against Cassandra database. + * + * @param mutations Mutations. + */ + public void execute(List<Mutation> mutations); } http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java index d2c9e97..4857fa4 100644 --- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java @@ -43,6 +43,7 @@ import org.apache.ignite.cache.store.cassandra.common.CassandraHelper; import org.apache.ignite.cache.store.cassandra.common.RandomSleeper; import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings; import org.apache.ignite.cache.store.cassandra.session.pool.SessionPool; +import org.apache.ignite.cache.store.cassandra.session.transaction.Mutation; import org.apache.ignite.internal.processors.cache.CacheEntryImpl; /** @@ -162,7 +163,8 @@ public class CassandraSessionImpl implements CassandraSession { throw new IgniteException(errorMsg, e); } - sleeper.sleep(); + if (!CassandraHelper.isTableAbsenceError(error)) + sleeper.sleep(); attempt++; } @@ -320,7 +322,8 @@ public class CassandraSessionImpl implements CassandraSession { handlePreparedStatementClusterError(prepStatEx); } - sleeper.sleep(); + if (!CassandraHelper.isTableAbsenceError(error)) + sleeper.sleep(); attempt++; } @@ -402,6 +405,103 @@ public class CassandraSessionImpl implements CassandraSession { } /** {@inheritDoc} */ + @Override public void execute(List<Mutation> mutations) { + if (mutations == null || mutations.isEmpty()) + return; + + Throwable error = null; + String errorMsg = "Failed to apply " + mutations.size() + " mutations performed withing Ignite " + + "transaction into Cassandra"; + + int attempt = 0; + boolean tableExistenceRequired = false; + Map<String, PreparedStatement> statements = new HashMap<>(); + Map<String, KeyValuePersistenceSettings> tableSettings = new HashMap<>(); + RandomSleeper sleeper = newSleeper(); + + incrementSessionRefs(); + + try { + while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) { + error = null; + + if (attempt != 0) { + log.warning("Trying " + (attempt + 1) + " attempt to apply " + mutations.size() + " mutations " + + "performed withing Ignite transaction into Cassandra"); + } + + try { + BatchStatement batch = new BatchStatement(); + + // accumulating all the mutations into one Cassandra logged batch + for (Mutation mutation : mutations) { + String key = mutation.getTable() + mutation.getClass().getName(); + PreparedStatement st = statements.get(key); + + if (st == null) { + st = prepareStatement(mutation.getTable(), mutation.getStatement(), + mutation.getPersistenceSettings(), mutation.tableExistenceRequired()); + + if (st != null) + statements.put(key, st); + } + + if (st != null) + batch.add(mutation.bindStatement(st)); + + if (attempt == 0) { + if (mutation.tableExistenceRequired()) { + tableExistenceRequired = true; + + if (!tableSettings.containsKey(mutation.getTable())) + tableSettings.put(mutation.getTable(), mutation.getPersistenceSettings()); + } + } + } + + // committing logged batch into Cassandra + if (batch.size() > 0) + session().execute(tuneStatementExecutionOptions(batch)); + + return; + } catch (Throwable e) { + error = e; + + if (CassandraHelper.isTableAbsenceError(e)) { + if (tableExistenceRequired) { + for (Map.Entry<String, KeyValuePersistenceSettings> entry : tableSettings.entrySet()) + handleTableAbsenceError(entry.getKey(), entry.getValue()); + } + else + return; + } else if (CassandraHelper.isHostsAvailabilityError(e)) { + if (handleHostsAvailabilityError(e, attempt, errorMsg)) + statements.clear(); + } else if (CassandraHelper.isPreparedStatementClusterError(e)) { + handlePreparedStatementClusterError(e); + statements.clear(); + } else { + // For an error which we don't know how to handle, we will not try next attempts and terminate. + throw new IgniteException(errorMsg, e); + } + } + + if (!CassandraHelper.isTableAbsenceError(error)) + sleeper.sleep(); + + attempt++; + } + } catch (Throwable e) { + error = e; + } finally { + decrementSessionRefs(); + } + + log.error(errorMsg, error); + throw new IgniteException(errorMsg, error); + } + + /** {@inheritDoc} */ @Override public synchronized void close() throws IOException { if (decrementSessionRefs() == 0 && ses != null) { SessionPool.put(this, ses); @@ -517,7 +617,8 @@ public class CassandraSessionImpl implements CassandraSession { error = e; } - sleeper.sleep(); + if (!CassandraHelper.isTableAbsenceError(error)) + sleeper.sleep(); attempt++; } @@ -585,7 +686,7 @@ public class CassandraSessionImpl implements CassandraSession { log.info("-----------------------------------------------------------------------"); log.info("Creating Cassandra table '" + tableFullName + "'"); log.info("-----------------------------------------------------------------------\n\n" + - tableFullName + "\n"); + settings.getTableDDLStatement(table) + "\n"); log.info("-----------------------------------------------------------------------"); session().execute(settings.getTableDDLStatement(table)); log.info("Cassandra table '" + tableFullName + "' was successfully created"); @@ -634,10 +735,14 @@ public class CassandraSessionImpl implements CassandraSession { while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) { try { + log.info("-----------------------------------------------------------------------"); log.info("Creating indexes for Cassandra table '" + tableFullName + "'"); + log.info("-----------------------------------------------------------------------"); for (String statement : indexDDLStatements) { try { + log.info(statement); + log.info("-----------------------------------------------------------------------"); session().execute(statement); } catch (AlreadyExistsException ignored) { http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/BaseMutation.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/BaseMutation.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/BaseMutation.java new file mode 100644 index 0000000..2625e87 --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/BaseMutation.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra.session.transaction; + +import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings; +import org.apache.ignite.cache.store.cassandra.persistence.PersistenceController; + +/** + * Base class to inherit from to implement specific mutations operation. + */ +public abstract class BaseMutation implements Mutation { + /** Cassandra table to use. */ + private final String table; + + /** Persistence controller to be utilized for mutation. */ + private final PersistenceController ctrl; + + /** + * Creates instance of mutation operation. + * + * @param table Cassandra table which should be used for the mutation. + * @param ctrl Persistence controller to use. + */ + public BaseMutation(String table, PersistenceController ctrl) { + if (table == null || table.trim().isEmpty()) + throw new IllegalArgumentException("Table name should be specified"); + + if (ctrl == null) + throw new IllegalArgumentException("Persistence controller should be specified"); + + this.table = table; + this.ctrl = ctrl; + } + + /** {@inheritDoc} */ + @Override public String getTable() { + return table; + } + + /** {@inheritDoc} */ + @Override public KeyValuePersistenceSettings getPersistenceSettings() { + return ctrl.getPersistenceSettings(); + } + + /** + * Service method to get persistence controller instance + * + * @return Persistence controller to use for the mutation + */ + protected PersistenceController controller() { + return ctrl; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/DeleteMutation.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/DeleteMutation.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/DeleteMutation.java new file mode 100644 index 0000000..79c0bfe --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/DeleteMutation.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra.session.transaction; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.PreparedStatement; +import org.apache.ignite.cache.store.cassandra.persistence.PersistenceController; + +/** + * Mutation which deletes object from Cassandra. + */ +public class DeleteMutation extends BaseMutation { + /** Ignite cache key of the object which should be deleted. */ + private final Object key; + + /** + * Creates instance of delete mutation operation. + * + * @param key Ignite cache key of the object which should be deleted. + * @param table Cassandra table which should be used for the mutation. + * @param ctrl Persistence controller to use. + */ + public DeleteMutation(Object key, String table, PersistenceController ctrl) { + super(table, ctrl); + this.key = key; + } + + /** {@inheritDoc} */ + @Override public boolean tableExistenceRequired() { + return false; + } + + /** {@inheritDoc} */ + @Override public String getStatement() { + return controller().getDeleteStatement(getTable()); + } + + /** {@inheritDoc} */ + @Override public BoundStatement bindStatement(PreparedStatement statement) { + return controller().bindKey(statement, key); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/Mutation.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/Mutation.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/Mutation.java new file mode 100644 index 0000000..cb014f8 --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/Mutation.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra.session.transaction; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.PreparedStatement; +import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings; + +/** + * Provides information about particular mutation operation performed withing transaction. + */ +public interface Mutation { + /** + * Cassandra table to use for an operation. + * + * @return Table name. + */ + public String getTable(); + + /** + * Indicates if Cassandra tables existence is required for this operation. + * + * @return {@code true} true if table existence required. + */ + public boolean tableExistenceRequired(); + + /** + * Returns Ignite cache key/value persistence settings. + * + * @return persistence settings. + */ + public KeyValuePersistenceSettings getPersistenceSettings(); + + /** + * Returns unbind CLQ statement for to be executed. + * + * @return Unbind CQL statement. + */ + public String getStatement(); + + /** + * Binds prepared statement to current Cassandra session. + * + * @param statement Statement. + * @param obj Parameters for statement binding. + * @return Bounded statement. + */ + public BoundStatement bindStatement(PreparedStatement statement); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/WriteMutation.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/WriteMutation.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/WriteMutation.java new file mode 100644 index 0000000..3c74378 --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/WriteMutation.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra.session.transaction; + +import javax.cache.Cache; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.PreparedStatement; + +import org.apache.ignite.cache.store.cassandra.persistence.PersistenceController; + +/** + * Mutation which writes(inserts) object into Cassandra. + */ +public class WriteMutation extends BaseMutation { + /** Ignite cache entry to be inserted into Cassandra. */ + private final Cache.Entry entry; + + /** + * Creates instance of delete mutation operation. + * + * @param entry Ignite cache entry to be inserted into Cassandra. + * @param table Cassandra table which should be used for the mutation. + * @param ctrl Persistence controller to use. + */ + public WriteMutation(Cache.Entry entry, String table, PersistenceController ctrl) { + super(table, ctrl); + this.entry = entry; + } + + /** {@inheritDoc} */ + @Override public boolean tableExistenceRequired() { + return true; + } + + /** {@inheritDoc} */ + @Override public String getStatement() { + return controller().getWriteStatement(getTable()); + } + + /** {@inheritDoc} */ + @Override public BoundStatement bindStatement(PreparedStatement statement) { + return controller().bindKeyValue(statement, entry.getKey(), entry.getValue()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/package-info.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/package-info.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/package-info.java new file mode 100644 index 0000000..7141845 --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains mutations implementation, to store changes made inside Ignite transaction + */ +package org.apache.ignite.cache.store.cassandra.session.transaction; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraDirectPersistenceTest.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraDirectPersistenceTest.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraDirectPersistenceTest.java index 9974898..f9e9649 100644 --- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraDirectPersistenceTest.java +++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraDirectPersistenceTest.java @@ -18,14 +18,21 @@ package org.apache.ignite.tests; import java.util.Collection; +import java.util.List; import java.util.Map; import org.apache.ignite.cache.store.CacheStore; import org.apache.ignite.internal.processors.cache.CacheEntryImpl; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.tests.pojos.Person; import org.apache.ignite.tests.pojos.PersonId; +import org.apache.ignite.tests.pojos.Product; +import org.apache.ignite.tests.pojos.ProductOrder; import org.apache.ignite.tests.utils.CacheStoreHelper; import org.apache.ignite.tests.utils.CassandraHelper; +import org.apache.ignite.tests.utils.TestCacheSession; +import org.apache.ignite.tests.utils.TestTransaction; import org.apache.ignite.tests.utils.TestsHelper; +import org.apache.ignite.transactions.Transaction; import org.apache.log4j.Logger; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -113,31 +120,31 @@ public class CassandraDirectPersistenceTest { LOGGER.info("Running PRIMITIVE strategy write tests"); - LOGGER.info("Running single operation write tests"); + LOGGER.info("Running single write operation tests"); store1.write(longEntries.iterator().next()); store2.write(strEntries.iterator().next()); - LOGGER.info("Single operation write tests passed"); + LOGGER.info("Single write operation tests passed"); - LOGGER.info("Running bulk operation write tests"); + LOGGER.info("Running bulk write operation tests"); store1.writeAll(longEntries); store2.writeAll(strEntries); - LOGGER.info("Bulk operation write tests passed"); + LOGGER.info("Bulk write operation tests passed"); LOGGER.info("PRIMITIVE strategy write tests passed"); LOGGER.info("Running PRIMITIVE strategy read tests"); - LOGGER.info("Running single operation read tests"); + LOGGER.info("Running single read operation tests"); LOGGER.info("Running real keys read tests"); Long longVal = (Long)store1.load(longEntries.iterator().next().getKey()); if (!longEntries.iterator().next().getValue().equals(longVal)) - throw new RuntimeException("Long values was incorrectly deserialized from Cassandra"); + throw new RuntimeException("Long values were incorrectly deserialized from Cassandra"); String strVal = (String)store2.load(strEntries.iterator().next().getKey()); if (!strEntries.iterator().next().getValue().equals(strVal)) - throw new RuntimeException("String values was incorrectly deserialized from Cassandra"); + throw new RuntimeException("String values were incorrectly deserialized from Cassandra"); LOGGER.info("Running fake keys read tests"); @@ -149,31 +156,31 @@ public class CassandraDirectPersistenceTest { if (strVal != null) throw new RuntimeException("String value with fake key '-1' was found in Cassandra"); - LOGGER.info("Single operation read tests passed"); + LOGGER.info("Single read operation tests passed"); - LOGGER.info("Running bulk operation read tests"); + LOGGER.info("Running bulk read operation tests"); LOGGER.info("Running real keys read tests"); Map longValues = store1.loadAll(TestsHelper.getKeys(longEntries)); if (!TestsHelper.checkCollectionsEqual(longValues, longEntries)) - throw new RuntimeException("Long values was incorrectly deserialized from Cassandra"); + throw new RuntimeException("Long values were incorrectly deserialized from Cassandra"); Map strValues = store2.loadAll(TestsHelper.getKeys(strEntries)); if (!TestsHelper.checkCollectionsEqual(strValues, strEntries)) - throw new RuntimeException("String values was incorrectly deserialized from Cassandra"); + throw new RuntimeException("String values were incorrectly deserialized from Cassandra"); LOGGER.info("Running fake keys read tests"); longValues = store1.loadAll(fakeLongKeys); if (!TestsHelper.checkCollectionsEqual(longValues, longEntries)) - throw new RuntimeException("Long values was incorrectly deserialized from Cassandra"); + throw new RuntimeException("Long values were incorrectly deserialized from Cassandra"); strValues = store2.loadAll(fakeStrKeys); if (!TestsHelper.checkCollectionsEqual(strValues, strEntries)) - throw new RuntimeException("String values was incorrectly deserialized from Cassandra"); + throw new RuntimeException("String values were incorrectly deserialized from Cassandra"); - LOGGER.info("Bulk operation read tests passed"); + LOGGER.info("Bulk read operation tests passed"); LOGGER.info("PRIMITIVE strategy read tests passed"); @@ -219,53 +226,53 @@ public class CassandraDirectPersistenceTest { LOGGER.info("Running BLOB strategy write tests"); - LOGGER.info("Running single operation write tests"); + LOGGER.info("Running single write operation tests"); store1.write(longEntries.iterator().next()); store2.write(personEntries.iterator().next()); store3.write(personEntries.iterator().next()); - LOGGER.info("Single operation write tests passed"); + LOGGER.info("Single write operation tests passed"); - LOGGER.info("Running bulk operation write tests"); + LOGGER.info("Running bulk write operation tests"); store1.writeAll(longEntries); store2.writeAll(personEntries); store3.writeAll(personEntries); - LOGGER.info("Bulk operation write tests passed"); + LOGGER.info("Bulk write operation tests passed"); LOGGER.info("BLOB strategy write tests passed"); LOGGER.info("Running BLOB strategy read tests"); - LOGGER.info("Running single operation read tests"); + LOGGER.info("Running single read operation tests"); Long longVal = (Long)store1.load(longEntries.iterator().next().getKey()); if (!longEntries.iterator().next().getValue().equals(longVal)) - throw new RuntimeException("Long values was incorrectly deserialized from Cassandra"); + throw new RuntimeException("Long values were incorrectly deserialized from Cassandra"); Person personVal = (Person)store2.load(personEntries.iterator().next().getKey()); if (!personEntries.iterator().next().getValue().equals(personVal)) - throw new RuntimeException("Person values was incorrectly deserialized from Cassandra"); + throw new RuntimeException("Person values were incorrectly deserialized from Cassandra"); personVal = (Person)store3.load(personEntries.iterator().next().getKey()); if (!personEntries.iterator().next().getValue().equals(personVal)) - throw new RuntimeException("Person values was incorrectly deserialized from Cassandra"); + throw new RuntimeException("Person values were incorrectly deserialized from Cassandra"); - LOGGER.info("Single operation read tests passed"); + LOGGER.info("Single read operation tests passed"); - LOGGER.info("Running bulk operation read tests"); + LOGGER.info("Running bulk read operation tests"); Map longValues = store1.loadAll(TestsHelper.getKeys(longEntries)); if (!TestsHelper.checkCollectionsEqual(longValues, longEntries)) - throw new RuntimeException("Long values was incorrectly deserialized from Cassandra"); + throw new RuntimeException("Long values were incorrectly deserialized from Cassandra"); Map personValues = store2.loadAll(TestsHelper.getKeys(personEntries)); if (!TestsHelper.checkPersonCollectionsEqual(personValues, personEntries, false)) - throw new RuntimeException("Person values was incorrectly deserialized from Cassandra"); + throw new RuntimeException("Person values were incorrectly deserialized from Cassandra"); personValues = store3.loadAll(TestsHelper.getKeys(personEntries)); if (!TestsHelper.checkPersonCollectionsEqual(personValues, personEntries, false)) - throw new RuntimeException("Person values was incorrectly deserialized from Cassandra"); + throw new RuntimeException("Person values were incorrectly deserialized from Cassandra"); - LOGGER.info("Bulk operation read tests passed"); + LOGGER.info("Bulk read operation tests passed"); LOGGER.info("BLOB strategy read tests passed"); @@ -303,69 +310,99 @@ public class CassandraDirectPersistenceTest { new ClassPathResource("org/apache/ignite/tests/persistence/pojo/persistence-settings-4.xml"), CassandraHelper.getAdminDataSrc()); + CacheStore productStore = CacheStoreHelper.createCacheStore("product", + new ClassPathResource("org/apache/ignite/tests/persistence/pojo/product.xml"), + CassandraHelper.getAdminDataSrc()); + + CacheStore orderStore = CacheStoreHelper.createCacheStore("order", + new ClassPathResource("org/apache/ignite/tests/persistence/pojo/order.xml"), + CassandraHelper.getAdminDataSrc()); + Collection<CacheEntryImpl<Long, Person>> entries1 = TestsHelper.generateLongsPersonsEntries(); Collection<CacheEntryImpl<PersonId, Person>> entries2 = TestsHelper.generatePersonIdsPersonsEntries(); Collection<CacheEntryImpl<PersonId, Person>> entries3 = TestsHelper.generatePersonIdsPersonsEntries(); + Collection<CacheEntryImpl<Long, Product>> productEntries = TestsHelper.generateProductEntries(); + Collection<CacheEntryImpl<Long, ProductOrder>> orderEntries = TestsHelper.generateOrderEntries(); LOGGER.info("Running POJO strategy write tests"); - LOGGER.info("Running single operation write tests"); + LOGGER.info("Running single write operation tests"); store1.write(entries1.iterator().next()); store2.write(entries2.iterator().next()); store3.write(entries3.iterator().next()); store4.write(entries3.iterator().next()); - LOGGER.info("Single operation write tests passed"); + productStore.write(productEntries.iterator().next()); + orderStore.write(orderEntries.iterator().next()); + LOGGER.info("Single write operation tests passed"); - LOGGER.info("Running bulk operation write tests"); + LOGGER.info("Running bulk write operation tests"); store1.writeAll(entries1); store2.writeAll(entries2); store3.writeAll(entries3); store4.writeAll(entries3); - LOGGER.info("Bulk operation write tests passed"); + productStore.writeAll(productEntries); + orderStore.writeAll(orderEntries); + LOGGER.info("Bulk write operation tests passed"); LOGGER.info("POJO strategy write tests passed"); LOGGER.info("Running POJO strategy read tests"); - LOGGER.info("Running single operation read tests"); + LOGGER.info("Running single read operation tests"); Person person = (Person)store1.load(entries1.iterator().next().getKey()); if (!entries1.iterator().next().getValue().equalsPrimitiveFields(person)) - throw new RuntimeException("Person values was incorrectly deserialized from Cassandra"); + throw new RuntimeException("Person values were incorrectly deserialized from Cassandra"); person = (Person)store2.load(entries2.iterator().next().getKey()); if (!entries2.iterator().next().getValue().equalsPrimitiveFields(person)) - throw new RuntimeException("Person values was incorrectly deserialized from Cassandra"); + throw new RuntimeException("Person values were incorrectly deserialized from Cassandra"); person = (Person)store3.load(entries3.iterator().next().getKey()); if (!entries3.iterator().next().getValue().equals(person)) - throw new RuntimeException("Person values was incorrectly deserialized from Cassandra"); + throw new RuntimeException("Person values were incorrectly deserialized from Cassandra"); person = (Person)store4.load(entries3.iterator().next().getKey()); if (!entries3.iterator().next().getValue().equals(person)) - throw new RuntimeException("Person values was incorrectly deserialized from Cassandra"); + throw new RuntimeException("Person values were incorrectly deserialized from Cassandra"); - LOGGER.info("Single operation read tests passed"); + Product product = (Product)productStore.load(productEntries.iterator().next().getKey()); + if (!productEntries.iterator().next().getValue().equals(product)) + throw new RuntimeException("Product values were incorrectly deserialized from Cassandra"); - LOGGER.info("Running bulk operation read tests"); + ProductOrder order = (ProductOrder)orderStore.load(orderEntries.iterator().next().getKey()); + if (!orderEntries.iterator().next().getValue().equals(order)) + throw new RuntimeException("Order values were incorrectly deserialized from Cassandra"); + + LOGGER.info("Single read operation tests passed"); + + LOGGER.info("Running bulk read operation tests"); Map persons = store1.loadAll(TestsHelper.getKeys(entries1)); if (!TestsHelper.checkPersonCollectionsEqual(persons, entries1, true)) - throw new RuntimeException("Person values was incorrectly deserialized from Cassandra"); + throw new RuntimeException("Person values were incorrectly deserialized from Cassandra"); persons = store2.loadAll(TestsHelper.getKeys(entries2)); if (!TestsHelper.checkPersonCollectionsEqual(persons, entries2, true)) - throw new RuntimeException("Person values was incorrectly deserialized from Cassandra"); + throw new RuntimeException("Person values were incorrectly deserialized from Cassandra"); persons = store3.loadAll(TestsHelper.getKeys(entries3)); if (!TestsHelper.checkPersonCollectionsEqual(persons, entries3, false)) - throw new RuntimeException("Person values was incorrectly deserialized from Cassandra"); + throw new RuntimeException("Person values were incorrectly deserialized from Cassandra"); persons = store4.loadAll(TestsHelper.getKeys(entries3)); if (!TestsHelper.checkPersonCollectionsEqual(persons, entries3, false)) - throw new RuntimeException("Person values was incorrectly deserialized from Cassandra"); + throw new RuntimeException("Person values were incorrectly deserialized from Cassandra"); + + Map products = productStore.loadAll(TestsHelper.getKeys(productEntries)); + if (!TestsHelper.checkProductCollectionsEqual(products, productEntries)) + throw new RuntimeException("Product values were incorrectly deserialized from Cassandra"); + + Map orders = orderStore.loadAll(TestsHelper.getKeys(orderEntries)); + if (!TestsHelper.checkOrderCollectionsEqual(orders, orderEntries)) + throw new RuntimeException("Order values were incorrectly deserialized from Cassandra"); - LOGGER.info("Bulk operation read tests passed"); + LOGGER.info("Bulk read operation tests passed"); LOGGER.info("POJO strategy read tests passed"); @@ -383,6 +420,277 @@ public class CassandraDirectPersistenceTest { store4.delete(entries3.iterator().next().getKey()); store4.deleteAll(TestsHelper.getKeys(entries3)); + productStore.delete(productEntries.iterator().next().getKey()); + productStore.deleteAll(TestsHelper.getKeys(productEntries)); + + orderStore.delete(orderEntries.iterator().next().getKey()); + orderStore.deleteAll(TestsHelper.getKeys(orderEntries)); + LOGGER.info("POJO strategy delete tests passed"); } + + /** */ + @Test + @SuppressWarnings("unchecked") + public void pojoStrategyTransactionTest() { + Map<Object, Object> sessionProps = U.newHashMap(1); + Transaction sessionTx = new TestTransaction(); + + CacheStore productStore = CacheStoreHelper.createCacheStore("product", + new ClassPathResource("org/apache/ignite/tests/persistence/pojo/product.xml"), + CassandraHelper.getAdminDataSrc(), new TestCacheSession("product", sessionTx, sessionProps)); + + CacheStore orderStore = CacheStoreHelper.createCacheStore("order", + new ClassPathResource("org/apache/ignite/tests/persistence/pojo/order.xml"), + CassandraHelper.getAdminDataSrc(), new TestCacheSession("order", sessionTx, sessionProps)); + + List<CacheEntryImpl<Long, Product>> productEntries = TestsHelper.generateProductEntries(); + Map<Long, List<CacheEntryImpl<Long, ProductOrder>>> ordersPerProduct = + TestsHelper.generateOrdersPerProductEntries(productEntries, 2); + + Collection<Long> productIds = TestsHelper.getProductIds(productEntries); + Collection<Long> orderIds = TestsHelper.getOrderIds(ordersPerProduct); + + LOGGER.info("Running POJO strategy transaction write tests"); + + LOGGER.info("Running single write operation tests"); + + CassandraHelper.dropTestKeyspaces(); + + Product product = productEntries.iterator().next().getValue(); + ProductOrder order = ordersPerProduct.get(product.getId()).iterator().next().getValue(); + + productStore.write(productEntries.iterator().next()); + orderStore.write(ordersPerProduct.get(product.getId()).iterator().next()); + + if (productStore.load(product.getId()) != null || orderStore.load(order.getId()) != null) { + throw new RuntimeException("Single write operation test failed. Transaction wasn't committed yet, but " + + "objects were already persisted into Cassandra"); + } + + Map<Long, Product> products = (Map<Long, Product>)productStore.loadAll(productIds); + Map<Long, ProductOrder> orders = (Map<Long, ProductOrder>)orderStore.loadAll(orderIds); + + if ((products != null && !products.isEmpty()) || (orders != null && !orders.isEmpty())) { + throw new RuntimeException("Single write operation test failed. Transaction wasn't committed yet, but " + + "objects were already persisted into Cassandra"); + } + + //noinspection deprecation + orderStore.sessionEnd(true); + //noinspection deprecation + productStore.sessionEnd(true); + + Product product1 = (Product)productStore.load(product.getId()); + ProductOrder order1 = (ProductOrder)orderStore.load(order.getId()); + + if (product1 == null || order1 == null) { + throw new RuntimeException("Single write operation test failed. Transaction was committed, but " + + "no objects were persisted into Cassandra"); + } + + if (!product.equals(product1) || !order.equals(order1)) { + throw new RuntimeException("Single write operation test failed. Transaction was committed, but " + + "objects were incorrectly persisted/loaded to/from Cassandra"); + } + + products = (Map<Long, Product>)productStore.loadAll(productIds); + orders = (Map<Long, ProductOrder>)orderStore.loadAll(orderIds); + + if (products == null || products.isEmpty() || orders == null || orders.isEmpty()) { + throw new RuntimeException("Single write operation test failed. Transaction was committed, but " + + "no objects were persisted into Cassandra"); + } + + if (products.size() > 1 || orders.size() > 1) { + throw new RuntimeException("Single write operation test failed. There were committed more objects " + + "into Cassandra than expected"); + } + + product1 = products.entrySet().iterator().next().getValue(); + order1 = orders.entrySet().iterator().next().getValue(); + + if (!product.equals(product1) || !order.equals(order1)) { + throw new RuntimeException("Single write operation test failed. Transaction was committed, but " + + "objects were incorrectly persisted/loaded to/from Cassandra"); + } + + LOGGER.info("Single write operation tests passed"); + + LOGGER.info("Running bulk write operation tests"); + + CassandraHelper.dropTestKeyspaces(); + sessionProps.clear(); + + productStore.writeAll(productEntries); + + for (Long productId : ordersPerProduct.keySet()) + orderStore.writeAll(ordersPerProduct.get(productId)); + + for (Long productId : productIds) { + if (productStore.load(productId) != null) { + throw new RuntimeException("Bulk write operation test failed. Transaction wasn't committed yet, but " + + "objects were already persisted into Cassandra"); + } + } + + for (Long orderId : orderIds) { + if (orderStore.load(orderId) != null) { + throw new RuntimeException("Bulk write operation test failed. Transaction wasn't committed yet, but " + + "objects were already persisted into Cassandra"); + } + } + + products = (Map<Long, Product>)productStore.loadAll(productIds); + orders = (Map<Long, ProductOrder>)orderStore.loadAll(orderIds); + + if ((products != null && !products.isEmpty()) || (orders != null && !orders.isEmpty())) { + throw new RuntimeException("Bulk write operation test failed. Transaction wasn't committed yet, but " + + "objects were already persisted into Cassandra"); + } + + //noinspection deprecation + productStore.sessionEnd(true); + //noinspection deprecation + orderStore.sessionEnd(true); + + for (CacheEntryImpl<Long, Product> entry : productEntries) { + product = (Product)productStore.load(entry.getKey()); + + if (!entry.getValue().equals(product)) { + throw new RuntimeException("Bulk write operation test failed. Transaction was committed, but " + + "not all objects were persisted into Cassandra"); + } + } + + for (Long productId : ordersPerProduct.keySet()) { + for (CacheEntryImpl<Long, ProductOrder> entry : ordersPerProduct.get(productId)) { + order = (ProductOrder)orderStore.load(entry.getKey()); + + if (!entry.getValue().equals(order)) { + throw new RuntimeException("Bulk write operation test failed. Transaction was committed, but " + + "not all objects were persisted into Cassandra"); + } + } + } + + products = (Map<Long, Product>)productStore.loadAll(productIds); + orders = (Map<Long, ProductOrder>)orderStore.loadAll(orderIds); + + if (products == null || products.isEmpty() || orders == null || orders.isEmpty()) { + throw new RuntimeException("Bulk write operation test failed. Transaction was committed, but " + + "no objects were persisted into Cassandra"); + } + + if (products.size() < productIds.size() || orders.size() < orderIds.size()) { + throw new RuntimeException("Bulk write operation test failed. There were committed less objects " + + "into Cassandra than expected"); + } + + if (products.size() > productIds.size() || orders.size() > orderIds.size()) { + throw new RuntimeException("Bulk write operation test failed. There were committed more objects " + + "into Cassandra than expected"); + } + + for (CacheEntryImpl<Long, Product> entry : productEntries) { + product = products.get(entry.getKey()); + + if (!entry.getValue().equals(product)) { + throw new RuntimeException("Bulk write operation test failed. Transaction was committed, but " + + "some objects were incorrectly persisted/loaded to/from Cassandra"); + } + } + + for (Long productId : ordersPerProduct.keySet()) { + for (CacheEntryImpl<Long, ProductOrder> entry : ordersPerProduct.get(productId)) { + order = orders.get(entry.getKey()); + + if (!entry.getValue().equals(order)) { + throw new RuntimeException("Bulk write operation test failed. Transaction was committed, but " + + "some objects were incorrectly persisted/loaded to/from Cassandra"); + } + } + } + + LOGGER.info("Bulk write operation tests passed"); + + LOGGER.info("POJO strategy transaction write tests passed"); + + LOGGER.info("Running POJO strategy transaction delete tests"); + + LOGGER.info("Running single delete tests"); + + sessionProps.clear(); + + Product deletedProduct = productEntries.remove(0).getValue(); + ProductOrder deletedOrder = ordersPerProduct.get(deletedProduct.getId()).remove(0).getValue(); + + productStore.delete(deletedProduct.getId()); + orderStore.delete(deletedOrder.getId()); + + if (productStore.load(deletedProduct.getId()) == null || orderStore.load(deletedOrder.getId()) == null) { + throw new RuntimeException("Single delete operation test failed. Transaction wasn't committed yet, but " + + "objects were already deleted from Cassandra"); + } + + products = (Map<Long, Product>)productStore.loadAll(productIds); + orders = (Map<Long, ProductOrder>)orderStore.loadAll(orderIds); + + if (products.size() != productIds.size() || orders.size() != orderIds.size()) { + throw new RuntimeException("Single delete operation test failed. Transaction wasn't committed yet, but " + + "objects were already deleted from Cassandra"); + } + + //noinspection deprecation + productStore.sessionEnd(true); + //noinspection deprecation + orderStore.sessionEnd(true); + + if (productStore.load(deletedProduct.getId()) != null || orderStore.load(deletedOrder.getId()) != null) { + throw new RuntimeException("Single delete operation test failed. Transaction was committed, but " + + "objects were not deleted from Cassandra"); + } + + products = (Map<Long, Product>)productStore.loadAll(productIds); + orders = (Map<Long, ProductOrder>)orderStore.loadAll(orderIds); + + if (products.get(deletedProduct.getId()) != null || orders.get(deletedOrder.getId()) != null) { + throw new RuntimeException("Single delete operation test failed. Transaction was committed, but " + + "objects were not deleted from Cassandra"); + } + + LOGGER.info("Single delete tests passed"); + + LOGGER.info("Running bulk delete tests"); + + sessionProps.clear(); + + productStore.deleteAll(productIds); + orderStore.deleteAll(orderIds); + + products = (Map<Long, Product>)productStore.loadAll(productIds); + orders = (Map<Long, ProductOrder>)orderStore.loadAll(orderIds); + + if (products == null || products.isEmpty() || orders == null || orders.isEmpty()) { + throw new RuntimeException("Bulk delete operation test failed. Transaction wasn't committed yet, but " + + "objects were already deleted from Cassandra"); + } + + //noinspection deprecation + orderStore.sessionEnd(true); + //noinspection deprecation + productStore.sessionEnd(true); + + products = (Map<Long, Product>)productStore.loadAll(productIds); + orders = (Map<Long, ProductOrder>)orderStore.loadAll(orderIds); + + if ((products != null && !products.isEmpty()) || (orders != null && !orders.isEmpty())) { + throw new RuntimeException("Bulk delete operation test failed. Transaction was committed, but " + + "objects were not deleted from Cassandra"); + } + + LOGGER.info("Bulk delete tests passed"); + + LOGGER.info("POJO strategy transaction delete tests passed"); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraLocalServer.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraLocalServer.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraLocalServer.java new file mode 100644 index 0000000..fc54e5b --- /dev/null +++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraLocalServer.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tests; + +import org.apache.ignite.tests.utils.CassandraHelper; +import org.apache.log4j.Logger; + +/** + * Simple helper class to run Cassandra on localhost + */ +public class CassandraLocalServer { + /** */ + private static final Logger LOGGER = Logger.getLogger(CassandraLocalServer.class.getName()); + + /** */ + public static void main(String[] args) { + try { + CassandraHelper.startEmbeddedCassandra(LOGGER); + } + catch (Throwable e) { + throw new RuntimeException("Failed to start embedded Cassandra instance", e); + } + + LOGGER.info("Testing admin connection to Cassandra"); + CassandraHelper.testAdminConnection(); + + LOGGER.info("Testing regular connection to Cassandra"); + CassandraHelper.testRegularConnection(); + + LOGGER.info("Dropping all artifacts from previous tests execution session"); + CassandraHelper.dropTestKeyspaces(); + + while (true) { + try { + System.out.println("Cassandra server running"); + Thread.sleep(10000); + } + catch (Throwable e) { + throw new RuntimeException("Cassandra server terminated", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/test/java/org/apache/ignite/tests/DDLGeneratorTest.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/DDLGeneratorTest.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/DDLGeneratorTest.java index 43b6d3c..6465580 100644 --- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/DDLGeneratorTest.java +++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/DDLGeneratorTest.java @@ -25,33 +25,30 @@ import org.junit.Test; * DDLGenerator test. */ public class DDLGeneratorTest { - private static final String URL1 = "org/apache/ignite/tests/persistence/primitive/persistence-settings-1.xml"; - private static final String URL2 = "org/apache/ignite/tests/persistence/pojo/persistence-settings-3.xml"; - private static final String URL3 = "org/apache/ignite/tests/persistence/pojo/persistence-settings-4.xml"; + private static final String[] RESOURCES = new String[] { + "org/apache/ignite/tests/persistence/primitive/persistence-settings-1.xml", + "org/apache/ignite/tests/persistence/pojo/persistence-settings-3.xml", + "org/apache/ignite/tests/persistence/pojo/persistence-settings-4.xml", + "org/apache/ignite/tests/persistence/pojo/product.xml", + "org/apache/ignite/tests/persistence/pojo/order.xml" + }; @Test @SuppressWarnings("unchecked") /** */ public void generatorTest() { - ClassLoader clsLdr = DDLGeneratorTest.class.getClassLoader(); - - URL url1 = clsLdr.getResource(URL1); - if (url1 == null) - throw new IllegalStateException("Failed to find resource: " + URL1); + String[] files = new String[RESOURCES.length]; - URL url2 = clsLdr.getResource(URL2); - if (url2 == null) - throw new IllegalStateException("Failed to find resource: " + URL2); + ClassLoader clsLdr = DDLGeneratorTest.class.getClassLoader(); - URL url3 = clsLdr.getResource(URL3); - if (url3 == null) - throw new IllegalStateException("Failed to find resource: " + URL3); + for (int i = 0; i < RESOURCES.length; i++) { + URL url = clsLdr.getResource(RESOURCES[i]); + if (url == null) + throw new IllegalStateException("Failed to find resource: " + RESOURCES[i]); - String file1 = url1.getFile(); - String file2 = url2.getFile(); - String file3 = url3.getFile(); + files[i] = url.getFile(); + } - DDLGenerator.main(new String[]{file1, file2, file3}); + DDLGenerator.main(files); } - }