Repository: ignite Updated Branches: refs/heads/master f74d51cbf -> 561d2cf04
Fixed "IGNITE-4205 CassandraCacheStore should start IgniteThread threads in loadCache() method" Signed-off-by: nikolay_tikhonov <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/561d2cf0 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/561d2cf0 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/561d2cf0 Branch: refs/heads/master Commit: 561d2cf048be59524acfbe2ac064d8b633b99c37 Parents: f74d51c Author: Konstantin Dudkov <[email protected]> Authored: Mon May 22 14:30:30 2017 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Mon May 22 14:30:30 2017 +0300 ---------------------------------------------------------------------- .../store/cassandra/CassandraCacheStore.java | 15 +++- .../ignite/tests/IgnitePersistentStoreTest.java | 62 +++++++++++++- .../persistence/loadall_blob/ignite-config.xml | 90 ++++++++++++++++++++ .../loadall_blob/persistence-settings.xml | 29 +++++++ .../store/jdbc/CacheAbstractJdbcStore.java | 6 +- 5 files changed, 198 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/561d2cf0/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java index 98c8b40..b438946 100644 --- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java @@ -23,16 +23,17 @@ import com.datastax.driver.core.Row; import com.datastax.driver.core.Statement; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.HashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import javax.cache.Cache; import javax.cache.integration.CacheLoaderException; import javax.cache.integration.CacheWriterException; +import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.store.CacheStore; @@ -52,7 +53,9 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.logger.NullLogger; import org.apache.ignite.resources.CacheStoreSessionResource; +import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.resources.LoggerResource; +import org.apache.ignite.thread.IgniteThreadFactory; /** * Implementation of {@link CacheStore} backed by Cassandra database. @@ -64,6 +67,14 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> { /** Buffer to store mutations performed withing transaction. */ private static final String TRANSACTION_BUFFER = "CASSANDRA_TRANSACTION_BUFFER"; + /** Thread name. */ + private static final String CACHE_LOADER_THREAD_NAME = "cassandra-cache-loader"; + + /** Auto-injected ignite instance. */ + @SuppressWarnings("unused") + @IgniteInstanceResource + private Ignite ignite; + /** Auto-injected store session. */ @SuppressWarnings("unused") @CacheStoreSessionResource @@ -109,7 +120,7 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> { Collection<Future<?>> futs = new ArrayList<>(args.length); try { - pool = Executors.newFixedThreadPool(maxPoolSize); + pool = Executors.newFixedThreadPool(maxPoolSize, new IgniteThreadFactory(ignite.name(), CACHE_LOADER_THREAD_NAME)); CassandraSession ses = getCassandraSession(); http://git-wip-us.apache.org/repos/asf/ignite/blob/561d2cf0/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java index c8c7139..feccb24 100644 --- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java +++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java @@ -25,9 +25,11 @@ import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteTransactions; import org.apache.ignite.Ignition; +import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.store.CacheStore; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.processors.cache.CacheEntryImpl; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.tests.pojos.Person; @@ -42,9 +44,9 @@ import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; import org.apache.log4j.Logger; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import org.junit.Assert; import org.springframework.core.io.ClassPathResource; /** @@ -247,6 +249,34 @@ public class IgnitePersistentStoreTest { /** */ @Test + public void blobBinaryLoadCacheTest() { + Ignition.stopAll(true); + + try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml")) { + IgniteCache<Long, PojoPerson> personCache = ignite.getOrCreateCache("cache2"); + + assert ignite.configuration().getMarshaller() instanceof BinaryMarshaller; + + personCache.put(1L, new PojoPerson(1, "name")); + + assert personCache.withKeepBinary().get(1L) instanceof BinaryObject; + } + + Ignition.stopAll(true); + + try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml")) { + IgniteCache<Long, PojoPerson> personCache = ignite.getOrCreateCache("cache2"); + + personCache.loadCache(null, null); + + PojoPerson person = personCache.get(1L); + + LOGGER.info("loadCache tests passed"); + } + } + + /** */ + @Test public void pojoStrategyTest() { Ignition.stopAll(true); @@ -673,4 +703,34 @@ public class IgnitePersistentStoreTest { " concurrency and " + isolation + " isolation level"); LOGGER.info("-----------------------------------------------------------------------------------"); } + + /** */ + public static class PojoPerson { + /** */ + private int id; + + /** */ + private String name; + + /** */ + public PojoPerson() { + // No-op. + } + + /** */ + public PojoPerson(int id, String name) { + this.id = id; + this.name = name; + } + + /** */ + public int getId() { + return id; + } + + /** */ + public String getName() { + return name; + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/561d2cf0/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml new file mode 100644 index 0000000..115e263 --- /dev/null +++ b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml @@ -0,0 +1,90 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans + http://www.springframework.org/schema/beans/spring-beans.xsd"> + + <!-- Cassandra connection settings --> + <import resource="classpath:org/apache/ignite/tests/cassandra/connection-settings.xml"/> + + <!-- Persistence settings for 'cache2' --> + <bean id="cache2_persistence_settings" + class="org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings"> + <constructor-arg type="org.springframework.core.io.Resource" + value="classpath:org/apache/ignite/tests/persistence/loadall_blob/persistence-settings.xml"/> + </bean> + + <!-- Ignite configuration --> + <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> + + <property name="marshaller"> + <bean class="org.apache.ignite.internal.binary.BinaryMarshaller"/> + </property> + + <property name="binaryConfiguration"> + <bean class="org.apache.ignite.configuration.BinaryConfiguration"> + <property name="compactFooter" value="false"/> + </bean> + </property> + + <property name="cacheConfiguration"> + <list> + <!-- Configuring persistence for "cache2" cache --> + <bean class="org.apache.ignite.configuration.CacheConfiguration"> + <property name="name" value="cache2"/> + <property name="readThrough" value="true"/> + <property name="writeThrough" value="true"/> + <property name="storeKeepBinary" value="true"/> + <property name="cacheStoreFactory"> + <bean class="org.apache.ignite.cache.store.cassandra.CassandraCacheStoreFactory"> + <property name="dataSourceBean" value="cassandraAdminDataSource"/> + <property name="persistenceSettingsBean" value="cache2_persistence_settings"/> + </bean> + </property> + </bean> + </list> + </property> + + <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. --> + <property name="discoverySpi"> + <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> + <property name="ipFinder"> + <!-- + Ignite provides several options for automatic discovery that can be used + instead os static IP based discovery. For information on all options refer + to our documentation: http://apacheignite.readme.io/docs/cluster-config + --> + <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. --> + <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"> + <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">--> + <property name="addresses"> + <list> + <!-- In distributed environment, replace with actual host IP address. --> + <value>127.0.0.1:47500..47509</value> + </list> + </property> + </bean> + </property> + </bean> + </property> + </bean> +</beans> http://git-wip-us.apache.org/repos/asf/ignite/blob/561d2cf0/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/persistence-settings.xml ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/persistence-settings.xml b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/persistence-settings.xml new file mode 100644 index 0000000..e872201 --- /dev/null +++ b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/persistence-settings.xml @@ -0,0 +1,29 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<persistence keyspace="test1" table="blob_test3"> + <!-- By default Java standard serialization is going to be used --> + <keyPersistence class="java.lang.Long" + strategy="BLOB" + column="key"/> + + <!-- Kryo serialization specified to be used --> + <valuePersistence class="org.apache.ignite.tests.pojos.Person" + strategy="BLOB" + serializer="org.apache.ignite.cache.store.cassandra.serializer.JavaSerializer" + column="value"/> +</persistence> http://git-wip-us.apache.org/repos/asf/ignite/blob/561d2cf0/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java index 46e9022..b1ec38d 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java @@ -69,6 +69,7 @@ import org.apache.ignite.lifecycle.LifecycleAware; import org.apache.ignite.resources.CacheStoreSessionResource; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.resources.LoggerResource; +import org.apache.ignite.thread.IgniteThreadFactory; import org.apache.ignite.transactions.Transaction; import org.jetbrains.annotations.Nullable; @@ -121,6 +122,9 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, /** Connection attribute property name. */ protected static final String ATTR_CONN_PROP = "JDBC_STORE_CONNECTION"; + /** Thread name. */ + private static final String CACHE_LOADER_THREAD_NAME = "jdbc-cache-loader"; + /** Built in Java types names. */ protected static final Collection<String> BUILT_IN_TYPES = new HashSet<>(); @@ -680,7 +684,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, String cacheName = session().cacheName(); try { - pool = Executors.newFixedThreadPool(maxPoolSize); + pool = Executors.newFixedThreadPool(maxPoolSize, new IgniteThreadFactory(ignite.name(), CACHE_LOADER_THREAD_NAME)); Collection<Future<?>> futs = new ArrayList<>();
