Fixed "IGNITE-4205 CassandraCacheStore should start IiteThread threads in loadCache() method"
Signed-off-by: nikolay_tikhonov <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2a818d36 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2a818d36 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2a818d36 Branch: refs/heads/ignite-5232-1.7.2 Commit: 2a818d36395dd1af23acf444adf396b2e2edbede Parents: 43bcc15 Author: Konstantin Dudkov <[email protected]> Authored: Mon May 22 16:28:07 2017 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Mon May 22 16:28:07 2017 +0300 ---------------------------------------------------------------------- .../store/cassandra/CassandraCacheStore.java | 13 ++- .../ignite/tests/IgnitePersistentStoreTest.java | 62 ++++++++++++++ .../persistence/loadall_blob/ignite-config.xml | 90 ++++++++++++++++++++ .../loadall_blob/persistence-settings.xml | 29 +++++++ .../store/jdbc/CacheAbstractJdbcStore.java | 6 +- 5 files changed, 198 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2a818d36/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java index 2e1d3ea..dabf1b0 100644 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java +++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java @@ -31,6 +31,7 @@ import java.util.concurrent.Future; import javax.cache.Cache; import javax.cache.integration.CacheLoaderException; import javax.cache.integration.CacheWriterException; +import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.store.CacheStore; @@ -47,7 +48,9 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.logger.NullLogger; import org.apache.ignite.resources.CacheStoreSessionResource; +import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.resources.LoggerResource; +import org.apache.ignite.thread.IgniteThreadFactory; /** * Implementation of {@link CacheStore} backed by Cassandra database. @@ -59,6 +62,14 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> { /** Connection attribute property name. */ private static final String ATTR_CONN_PROP = "CASSANDRA_STORE_CONNECTION"; + /** Thread name. */ + private static final String CACHE_LOADER_THREAD_NAME = "cassandra-cache-loader"; + + /** Auto-injected ignite instance. */ + @SuppressWarnings("unused") + @IgniteInstanceResource + private Ignite ignite; + /** Auto-injected store session. */ @CacheStoreSessionResource private CacheStoreSession storeSes; @@ -99,7 +110,7 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> { Collection<Future<?>> futs = new ArrayList<>(args.length); try { - pool = Executors.newFixedThreadPool(maxPoolSize); + pool = Executors.newFixedThreadPool(maxPoolSize, new IgniteThreadFactory(ignite.name(), CACHE_LOADER_THREAD_NAME)); CassandraSession ses = getCassandraSession(); http://git-wip-us.apache.org/repos/asf/ignite/blob/2a818d36/modules/cassandra/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java b/modules/cassandra/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java index 51d0885..75dff66 100644 --- a/modules/cassandra/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java +++ b/modules/cassandra/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java @@ -23,9 +23,11 @@ import java.util.Map; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.Ignition; +import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.store.CacheStore; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.processors.cache.CacheEntryImpl; import org.apache.ignite.tests.pojos.Person; import org.apache.ignite.tests.pojos.PersonId; @@ -237,6 +239,35 @@ public class IgnitePersistentStoreTest { } } + + /** */ + @Test + public void blobBinaryLoadCacheTest() { + Ignition.stopAll(true); + + try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml")) { + IgniteCache<Long, PojoPerson> personCache = ignite.getOrCreateCache("cache2"); + + assert ignite.configuration().getMarshaller() instanceof BinaryMarshaller; + + personCache.put(1L, new PojoPerson(1, "name")); + + assert personCache.withKeepBinary().get(1L) instanceof BinaryObject; + } + + Ignition.stopAll(true); + + try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml")) { + IgniteCache<Long, PojoPerson> personCache = ignite.getOrCreateCache("cache2"); + + personCache.loadCache(null, null); + + PojoPerson person = personCache.get(1L); + + LOGGER.info("loadCache tests passed"); + } + } + /** */ @Test public void pojoStrategyTest() { @@ -377,4 +408,35 @@ public class IgnitePersistentStoreTest { LOGGER.info("loadCache test passed"); } + + + /** */ + public static class PojoPerson { + /** */ + private int id; + + /** */ + private String name; + + /** */ + public PojoPerson() { + // No-op. + } + + /** */ + public PojoPerson(int id, String name) { + this.id = id; + this.name = name; + } + + /** */ + public int getId() { + return id; + } + + /** */ + public String getName() { + return name; + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/2a818d36/modules/cassandra/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml b/modules/cassandra/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml new file mode 100644 index 0000000..115e263 --- /dev/null +++ b/modules/cassandra/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml @@ -0,0 +1,90 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans + http://www.springframework.org/schema/beans/spring-beans.xsd"> + + <!-- Cassandra connection settings --> + <import resource="classpath:org/apache/ignite/tests/cassandra/connection-settings.xml"/> + + <!-- Persistence settings for 'cache2' --> + <bean id="cache2_persistence_settings" + class="org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings"> + <constructor-arg type="org.springframework.core.io.Resource" + value="classpath:org/apache/ignite/tests/persistence/loadall_blob/persistence-settings.xml"/> + </bean> + + <!-- Ignite configuration --> + <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> + + <property name="marshaller"> + <bean class="org.apache.ignite.internal.binary.BinaryMarshaller"/> + </property> + + <property name="binaryConfiguration"> + <bean class="org.apache.ignite.configuration.BinaryConfiguration"> + <property name="compactFooter" value="false"/> + </bean> + </property> + + <property name="cacheConfiguration"> + <list> + <!-- Configuring persistence for "cache2" cache --> + <bean class="org.apache.ignite.configuration.CacheConfiguration"> + <property name="name" value="cache2"/> + <property name="readThrough" value="true"/> + <property name="writeThrough" value="true"/> + <property name="storeKeepBinary" value="true"/> + <property name="cacheStoreFactory"> + <bean class="org.apache.ignite.cache.store.cassandra.CassandraCacheStoreFactory"> + <property name="dataSourceBean" value="cassandraAdminDataSource"/> + <property name="persistenceSettingsBean" value="cache2_persistence_settings"/> + </bean> + </property> + </bean> + </list> + </property> + + <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. --> + <property name="discoverySpi"> + <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> + <property name="ipFinder"> + <!-- + Ignite provides several options for automatic discovery that can be used + instead os static IP based discovery. For information on all options refer + to our documentation: http://apacheignite.readme.io/docs/cluster-config + --> + <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. --> + <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"> + <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">--> + <property name="addresses"> + <list> + <!-- In distributed environment, replace with actual host IP address. --> + <value>127.0.0.1:47500..47509</value> + </list> + </property> + </bean> + </property> + </bean> + </property> + </bean> +</beans> http://git-wip-us.apache.org/repos/asf/ignite/blob/2a818d36/modules/cassandra/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/persistence-settings.xml ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/persistence-settings.xml b/modules/cassandra/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/persistence-settings.xml new file mode 100644 index 0000000..e872201 --- /dev/null +++ b/modules/cassandra/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/persistence-settings.xml @@ -0,0 +1,29 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<persistence keyspace="test1" table="blob_test3"> + <!-- By default Java standard serialization is going to be used --> + <keyPersistence class="java.lang.Long" + strategy="BLOB" + column="key"/> + + <!-- Kryo serialization specified to be used --> + <valuePersistence class="org.apache.ignite.tests.pojos.Person" + strategy="BLOB" + serializer="org.apache.ignite.cache.store.cassandra.serializer.JavaSerializer" + column="value"/> +</persistence> http://git-wip-us.apache.org/repos/asf/ignite/blob/2a818d36/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java index e211fad..817b1a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java @@ -72,6 +72,7 @@ import org.apache.ignite.lifecycle.LifecycleAware; import org.apache.ignite.resources.CacheStoreSessionResource; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.resources.LoggerResource; +import org.apache.ignite.thread.IgniteThreadFactory; import org.apache.ignite.transactions.Transaction; import org.jetbrains.annotations.Nullable; @@ -121,6 +122,9 @@ import static org.apache.ignite.cache.store.jdbc.JdbcTypesTransformer.NUMERIC_TY * </pre> */ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, LifecycleAware { + /** Thread name. */ + private static final String CACHE_LOADER_THREAD_NAME = "jdbc-cache-loader"; + /** Connection attribute property name. */ protected static final String ATTR_CONN_PROP = "JDBC_STORE_CONNECTION"; @@ -730,7 +734,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>, String cacheName = session().cacheName(); try { - pool = Executors.newFixedThreadPool(maxPoolSize); + pool = Executors.newFixedThreadPool(maxPoolSize, new IgniteThreadFactory(ignite.name(), CACHE_LOADER_THREAD_NAME)); Collection<Future<?>> futs = new ArrayList<>();
