Merge remote-tracking branch 'remotes/community/ignite-1.7.4' into ignite-1.8.2
# Conflicts: # modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java # modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java # modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java # modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequestHandler.java # modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java # modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java # modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java # modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java # modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/64247b92 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/64247b92 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/64247b92 Branch: refs/heads/master Commit: 64247b9228451e46abb8029e09c7fc6ed4e16d2d Parents: 147277d 8dd4ada Author: sboikov <[email protected]> Authored: Mon Dec 19 15:54:39 2016 +0300 Committer: sboikov <[email protected]> Committed: Mon Dec 19 15:54:39 2016 +0300 ---------------------------------------------------------------------- .../store/cassandra/CassandraCacheStore.java | 9 +- .../store/cassandra/datasource/DataSource.java | 9 + .../rest/RestProcessorMultiStartSelfTest.java | 48 +- .../java/org/apache/ignite/IgniteServices.java | 16 + .../apache/ignite/IgniteSystemProperties.java | 6 + .../rendezvous/RendezvousAffinityFunction.java | 80 ++- .../ignite/cache/store/CacheStoreAdapter.java | 6 + .../cache/store/jdbc/CacheJdbcPojoStore.java | 19 +- .../store/jdbc/JdbcTypesDefaultTransformer.java | 112 ++-- .../apache/ignite/internal/IgniteKernal.java | 28 +- .../ignite/internal/IgniteServicesImpl.java | 9 +- .../internal/binary/BinaryClassDescriptor.java | 12 +- .../ignite/internal/binary/BinaryUtils.java | 10 +- .../binary/builder/BinaryObjectBuilderImpl.java | 11 +- .../discovery/GridDiscoveryManager.java | 118 +--- .../affinity/GridAffinityProcessor.java | 2 +- .../processors/cache/CacheLockCandidates.java | 42 ++ .../cache/CacheLockCandidatesList.java | 71 +++ .../cache/CacheStoreBalancingWrapper.java | 6 + .../processors/cache/GridCacheAdapter.java | 8 +- .../processors/cache/GridCacheEntryEx.java | 3 +- .../cache/GridCacheLoaderWriterStore.java | 6 + .../processors/cache/GridCacheMapEntry.java | 117 +++- .../processors/cache/GridCacheMvcc.java | 376 +++++++---- .../processors/cache/GridCacheMvccCallback.java | 4 +- .../cache/GridCacheMvccCandidate.java | 80 +-- .../processors/cache/GridCacheMvccManager.java | 19 +- .../GridCachePartitionExchangeManager.java | 157 ++--- .../processors/cache/GridCachePreloader.java | 11 +- .../cache/GridCachePreloaderAdapter.java | 5 +- .../processors/cache/GridCacheProcessor.java | 10 +- .../processors/cache/GridCacheUtils.java | 17 - .../binary/CacheObjectBinaryProcessorImpl.java | 3 +- .../CacheDataStructuresManager.java | 6 +- .../distributed/GridDistributedCacheEntry.java | 303 +++------ .../dht/GridClientPartitionTopology.java | 120 ++-- .../distributed/dht/GridDhtCacheEntry.java | 32 +- .../distributed/dht/GridDhtLockFuture.java | 34 +- .../dht/GridDhtPartitionTopology.java | 28 +- .../dht/GridDhtPartitionTopologyImpl.java | 284 +++++---- .../dht/GridDhtTransactionalCacheAdapter.java | 1 - .../distributed/dht/GridDhtTxPrepareFuture.java | 5 +- .../colocated/GridDhtColocatedLockFuture.java | 8 +- .../dht/preloader/GridDhtPartitionDemander.java | 230 ++++--- .../dht/preloader/GridDhtPartitionFullMap.java | 18 +- .../GridDhtPartitionsExchangeFuture.java | 56 +- .../dht/preloader/GridDhtPreloader.java | 9 +- .../distributed/near/GridNearCacheEntry.java | 44 +- .../distributed/near/GridNearLockFuture.java | 3 +- .../near/GridNearTransactionalCache.java | 5 +- .../cache/local/GridLocalCacheEntry.java | 173 ++---- .../cache/local/GridLocalLockFuture.java | 2 +- .../cache/query/GridCacheQueryManager.java | 22 +- .../cache/transactions/IgniteTxHandler.java | 2 +- .../cache/transactions/IgniteTxManager.java | 5 +- .../closure/GridClosureProcessor.java | 31 +- .../internal/processors/job/GridJobWorker.java | 76 ++- .../processors/odbc/OdbcRequestHandler.java | 14 +- .../platform/PlatformContextImpl.java | 2 +- .../dotnet/PlatformDotNetCacheStore.java | 11 + .../platform/services/PlatformServices.java | 2 +- .../platform/utils/PlatformUtils.java | 28 + .../processors/rest/GridRestProcessor.java | 15 + .../service/GridServiceProcessor.java | 15 +- .../processors/service/GridServiceProxy.java | 18 +- .../processors/task/GridTaskWorker.java | 7 + .../internal/visor/query/VisorQueryJob.java | 2 +- .../ignite/marshaller/jdk/JdkMarshaller.java | 4 +- .../optimized/OptimizedMarshaller.java | 8 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 41 +- .../tcp/internal/TcpDiscoveryStatistics.java | 4 + .../resources/META-INF/classnames.properties | 86 ++- .../AbstractAffinityFunctionSelfTest.java | 2 +- .../jdbc/JdbcTypesDefaultTransformerTest.java | 283 +++++++++ .../IgniteComputeTopologyExceptionTest.java | 5 +- .../binary/BinaryMarshallerSelfTest.java | 66 ++ .../GridDiscoveryManagerAliveCacheSelfTest.java | 2 +- .../CacheSerializableTransactionsTest.java | 604 +++++++++++++++++- .../cache/GridCacheMvccFlagsTest.java | 8 +- .../cache/GridCacheMvccPartitionedSelfTest.java | 334 ++++++++-- .../processors/cache/GridCacheMvccSelfTest.java | 212 +++---- .../GridCachePartitionedAffinitySpreadTest.java | 7 +- .../processors/cache/GridCacheTestEntryEx.java | 77 +-- ...heapCacheMetricsForClusterGroupSelfTest.java | 141 +++++ .../cache/OffheapCacheOnClientsTest.java | 143 +++++ .../distributed/dht/GridCacheDhtTestUtils.java | 232 ------- .../GridCacheRebalancingSyncSelfTest.java | 2 + .../CacheOffHeapAndSwapMetricsSelfTest.java | 621 ------------------- ...LocalCacheOffHeapAndSwapMetricsSelfTest.java | 621 +++++++++++++++++++ .../closure/GridClosureSerializationTest.java | 177 ++++++ ...gniteServiceProxyTimeoutInitializedTest.java | 284 +++++++++ .../loadtests/hashmap/GridHashMapLoadTest.java | 7 +- .../ignite/testsuites/IgniteBasicTestSuite.java | 2 + .../IgniteCacheMetricsSelfTestSuite.java | 6 +- .../ignite/testsuites/IgniteCacheTestSuite.java | 2 + .../testsuites/IgniteCacheTestSuite2.java | 2 + .../testsuites/IgniteKernalSelfTestSuite.java | 2 + .../resources/META-INF/classnames.properties | 114 ++++ .../processors/query/h2/IgniteH2Indexing.java | 77 ++- .../h2/twostep/GridReduceQueryExecutor.java | 14 +- ...niteCachePartitionedFieldsQuerySelfTest.java | 25 + 101 files changed, 4783 insertions(+), 2473 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java ---------------------------------------------------------------------- diff --cc modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java index 9058837,0000000..b4bed0d mode 100644,000000..100644 --- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java @@@ -1,519 -1,0 +1,522 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Row; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import javax.cache.Cache; +import javax.cache.integration.CacheLoaderException; +import javax.cache.integration.CacheWriterException; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.cache.store.CacheStoreSession; +import org.apache.ignite.cache.store.cassandra.datasource.DataSource; +import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings; +import org.apache.ignite.cache.store.cassandra.persistence.PersistenceController; +import org.apache.ignite.cache.store.cassandra.session.CassandraSession; +import org.apache.ignite.cache.store.cassandra.session.ExecutionAssistant; +import org.apache.ignite.cache.store.cassandra.session.GenericBatchExecutionAssistant; +import org.apache.ignite.cache.store.cassandra.session.LoadCacheCustomQueryWorker; - import org.apache.ignite.cache.store.cassandra.session.transaction.DeleteMutation; - import org.apache.ignite.cache.store.cassandra.session.transaction.Mutation; - import org.apache.ignite.cache.store.cassandra.session.transaction.WriteMutation; ++import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.logger.NullLogger; +import org.apache.ignite.resources.CacheStoreSessionResource; +import org.apache.ignite.resources.LoggerResource; + +/** + * Implementation of {@link CacheStore} backed by Cassandra database. + * + * @param <K> Ignite cache key type. + * @param <V> Ignite cache value type. + */ +public class CassandraCacheStore<K, V> implements CacheStore<K, V> { + /** Buffer to store mutations performed withing transaction. */ + private static final String TRANSACTION_BUFFER = "CASSANDRA_TRANSACTION_BUFFER"; + + /** Auto-injected store session. */ + @SuppressWarnings("unused") + @CacheStoreSessionResource + private CacheStoreSession storeSes; + + /** Auto-injected logger instance. */ + @SuppressWarnings("unused") + @LoggerResource + private IgniteLogger log; + + /** Cassandra data source. */ + private DataSource dataSrc; + + /** Max workers thread count. These threads are responsible for load cache. */ + private int maxPoolSize = Runtime.getRuntime().availableProcessors(); + + /** Controller component responsible for serialization logic. */ + private final PersistenceController controller; + + /** + * Store constructor. + * + * @param dataSrc Data source. + * @param settings Persistence settings for Ignite key and value objects. + * @param maxPoolSize Max workers thread count. + */ + public CassandraCacheStore(DataSource dataSrc, KeyValuePersistenceSettings settings, int maxPoolSize) { + this.dataSrc = dataSrc; + this.controller = new PersistenceController(settings); + this.maxPoolSize = maxPoolSize; + } + + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure<K, V> clo, Object... args) throws CacheLoaderException { + if (clo == null) + return; + + if (args == null || args.length == 0) + args = new String[] {"select * from " + controller.getPersistenceSettings().getKeyspace() + "." + cassandraTable() + ";"}; + + ExecutorService pool = null; + + Collection<Future<?>> futs = new ArrayList<>(args.length); + + try { + pool = Executors.newFixedThreadPool(maxPoolSize); + + CassandraSession ses = getCassandraSession(); + + for (Object obj : args) { + if (obj == null || !(obj instanceof String) || !((String)obj).trim().toLowerCase().startsWith("select")) + continue; + + futs.add(pool.submit(new LoadCacheCustomQueryWorker<>(ses, (String) obj, controller, log, clo))); + } + + for (Future<?> fut : futs) + U.get(fut); + + if (log != null && log.isDebugEnabled() && storeSes != null) + log.debug("Cache loaded from db: " + storeSes.cacheName()); + } + catch (IgniteCheckedException e) { + if (storeSes != null) + throw new CacheLoaderException("Failed to load Ignite cache: " + storeSes.cacheName(), e.getCause()); + else + throw new CacheLoaderException("Failed to load cache", e.getCause()); + } + finally { + U.shutdownNow(getClass(), pool, log); + } + } + + /** {@inheritDoc} */ + @Override public void sessionEnd(boolean commit) throws CacheWriterException { + if (!storeSes.isWithinTransaction()) + return; + + List<Mutation> mutations = mutations(); + if (mutations == null || mutations.isEmpty()) + return; + + CassandraSession ses = getCassandraSession(); + + try { + ses.execute(mutations); + } + finally { + mutations.clear(); + U.closeQuiet(ses); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings({"unchecked"}) + @Override public V load(final K key) throws CacheLoaderException { + if (key == null) + return null; + + CassandraSession ses = getCassandraSession(); + + try { + return ses.execute(new ExecutionAssistant<V>() { + /** {@inheritDoc} */ + @Override public boolean tableExistenceRequired() { + return false; + } + + /** {@inheritDoc} */ + @Override public String getTable() { + return cassandraTable(); + } + + /** {@inheritDoc} */ + @Override public String getStatement() { + return controller.getLoadStatement(cassandraTable(), false); + } + + /** {@inheritDoc} */ + @Override public BoundStatement bindStatement(PreparedStatement statement) { + return controller.bindKey(statement, key); + } + + /** {@inheritDoc} */ + @Override public KeyValuePersistenceSettings getPersistenceSettings() { + return controller.getPersistenceSettings(); + } + + /** {@inheritDoc} */ + @Override public String operationName() { + return "READ"; + } + + /** {@inheritDoc} */ + @Override public V process(Row row) { + return row == null ? null : (V)controller.buildValueObject(row); + } + }); + } + finally { + U.closeQuiet(ses); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public Map<K, V> loadAll(Iterable<? extends K> keys) throws CacheLoaderException { + if (keys == null || !keys.iterator().hasNext()) + return new HashMap<>(); + + CassandraSession ses = getCassandraSession(); + + try { + return ses.execute(new GenericBatchExecutionAssistant<Map<K, V>, K>() { + private Map<K, V> data = new HashMap<>(); + + /** {@inheritDoc} */ + @Override public String getTable() { + return cassandraTable(); + } + + /** {@inheritDoc} */ + @Override public String getStatement() { + return controller.getLoadStatement(cassandraTable(), true); + } + + /** {@inheritDoc} */ + @Override public BoundStatement bindStatement(PreparedStatement statement, K key) { + return controller.bindKey(statement, key); + } + + /** {@inheritDoc} */ + @Override public KeyValuePersistenceSettings getPersistenceSettings() { + return controller.getPersistenceSettings(); + } + + /** {@inheritDoc} */ + @Override public String operationName() { + return "BULK_READ"; + } + + /** {@inheritDoc} */ + @Override public Map<K, V> processedData() { + return data; + } + + /** {@inheritDoc} */ + @Override protected void process(Row row) { + data.put((K)controller.buildKeyObject(row), (V)controller.buildValueObject(row)); + } + }, keys); + } + finally { + U.closeQuiet(ses); + } + } + + /** {@inheritDoc} */ + @Override public void write(final Cache.Entry<? extends K, ? extends V> entry) throws CacheWriterException { + if (entry == null || entry.getKey() == null) + return; + + if (storeSes.isWithinTransaction()) { + accumulate(new WriteMutation(entry, cassandraTable(), controller)); + return; + } + + CassandraSession ses = getCassandraSession(); + + try { + ses.execute(new ExecutionAssistant<Void>() { + /** {@inheritDoc} */ + @Override public boolean tableExistenceRequired() { + return true; + } + + /** {@inheritDoc} */ + @Override public String getTable() { + return cassandraTable(); + } + + /** {@inheritDoc} */ + @Override public String getStatement() { + return controller.getWriteStatement(cassandraTable()); + } + + /** {@inheritDoc} */ + @Override public BoundStatement bindStatement(PreparedStatement statement) { + return controller.bindKeyValue(statement, entry.getKey(), entry.getValue()); + } + + /** {@inheritDoc} */ + @Override public KeyValuePersistenceSettings getPersistenceSettings() { + return controller.getPersistenceSettings(); + } + + /** {@inheritDoc} */ + @Override public String operationName() { + return "WRITE"; + } + + /** {@inheritDoc} */ + @Override public Void process(Row row) { + return null; + } + }); + } + finally { + U.closeQuiet(ses); + } + } + + /** {@inheritDoc} */ + @Override public void writeAll(Collection<Cache.Entry<? extends K, ? extends V>> entries) throws CacheWriterException { + if (entries == null || entries.isEmpty()) + return; + + if (storeSes.isWithinTransaction()) { + for (Cache.Entry<?, ?> entry : entries) + accumulate(new WriteMutation(entry, cassandraTable(), controller)); + + return; + } + + CassandraSession ses = getCassandraSession(); + + try { + ses.execute(new GenericBatchExecutionAssistant<Void, Cache.Entry<? extends K, ? extends V>>() { + /** {@inheritDoc} */ + @Override public String getTable() { + return cassandraTable(); + } + + /** {@inheritDoc} */ + @Override public String getStatement() { + return controller.getWriteStatement(cassandraTable()); + } + + /** {@inheritDoc} */ + @Override public BoundStatement bindStatement(PreparedStatement statement, + Cache.Entry<? extends K, ? extends V> entry) { + return controller.bindKeyValue(statement, entry.getKey(), entry.getValue()); + } + + /** {@inheritDoc} */ + @Override public KeyValuePersistenceSettings getPersistenceSettings() { + return controller.getPersistenceSettings(); + } + + /** {@inheritDoc} */ + @Override public String operationName() { + return "BULK_WRITE"; + } + + /** {@inheritDoc} */ + @Override public boolean tableExistenceRequired() { + return true; + } + }, entries); + } + finally { + U.closeQuiet(ses); + } + } + + /** {@inheritDoc} */ + @Override public void delete(final Object key) throws CacheWriterException { + if (key == null) + return; + + if (storeSes.isWithinTransaction()) { + accumulate(new DeleteMutation(key, cassandraTable(), controller)); + return; + } + + CassandraSession ses = getCassandraSession(); + + try { + ses.execute(new ExecutionAssistant<Void>() { + /** {@inheritDoc} */ + @Override public boolean tableExistenceRequired() { + return false; + } + + /** {@inheritDoc} */ + @Override public String getTable() { + return cassandraTable(); + } + + /** {@inheritDoc} */ + @Override public String getStatement() { + return controller.getDeleteStatement(cassandraTable()); + } + + /** {@inheritDoc} */ + @Override public BoundStatement bindStatement(PreparedStatement statement) { + return controller.bindKey(statement, key); + } + + + /** {@inheritDoc} */ + @Override public KeyValuePersistenceSettings getPersistenceSettings() { + return controller.getPersistenceSettings(); + } + + /** {@inheritDoc} */ + @Override public String operationName() { + return "DELETE"; + } + + /** {@inheritDoc} */ + @Override public Void process(Row row) { + return null; + } + }); + } + finally { + U.closeQuiet(ses); + } + } + + /** {@inheritDoc} */ + @Override public void deleteAll(Collection<?> keys) throws CacheWriterException { + if (keys == null || keys.isEmpty()) + return; + + if (storeSes.isWithinTransaction()) { + for (Object key : keys) + accumulate(new DeleteMutation(key, cassandraTable(), controller)); + + return; + } + + CassandraSession ses = getCassandraSession(); + + try { + ses.execute(new GenericBatchExecutionAssistant<Void, Object>() { + /** {@inheritDoc} */ + @Override public String getTable() { + return cassandraTable(); + } + + /** {@inheritDoc} */ + @Override public String getStatement() { + return controller.getDeleteStatement(cassandraTable()); + } + + /** {@inheritDoc} */ + @Override public BoundStatement bindStatement(PreparedStatement statement, Object key) { + return controller.bindKey(statement, key); + } + + /** {@inheritDoc} */ + @Override public KeyValuePersistenceSettings getPersistenceSettings() { + return controller.getPersistenceSettings(); + } + + /** {@inheritDoc} */ + @Override public String operationName() { + return "BULK_DELETE"; + } + }, keys); + } + finally { + U.closeQuiet(ses); + } + } + + /** + * Gets Cassandra session wrapper or creates new if it doesn't exist. + * This wrapper hides all the low-level Cassandra interaction details by providing only high-level methods. + * + * @return Cassandra session wrapper. + */ + private CassandraSession getCassandraSession() { + return dataSrc.session(log != null ? log : new NullLogger()); + } + + /** + * Returns table name to use for all Cassandra based operations (READ/WRITE/DELETE). + * + * @return Table name. + */ + private String cassandraTable() { + return controller.getPersistenceSettings().getTable() != null ? + controller.getPersistenceSettings().getTable() : storeSes.cacheName().trim().toLowerCase(); + } + + /** + * Accumulates mutation in the transaction buffer. + * + * @param mutation Mutation operation. + */ + private void accumulate(Mutation mutation) { + //noinspection unchecked + List<Mutation> mutations = (List<Mutation>)storeSes.properties().get(TRANSACTION_BUFFER); + + if (mutations == null) { + mutations = new LinkedList<>(); + storeSes.properties().put(TRANSACTION_BUFFER, mutations); + } + + mutations.add(mutation); + } + + /** + * Returns all the mutations performed withing transaction. + * + * @return Mutations + */ + private List<Mutation> mutations() { + //noinspection unchecked + return (List<Mutation>)storeSes.properties().get(TRANSACTION_BUFFER); + } ++ ++ /** {@inheritDoc} */ ++ @Override public String toString() { ++ return S.toString(CassandraCacheStore.class, this); ++ } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java ---------------------------------------------------------------------- diff --cc modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java index f582aac,0000000..1ba3c7d mode 100644,000000..100644 --- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java @@@ -1,647 -1,0 +1,656 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra.datasource; + +import com.datastax.driver.core.AuthProvider; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.NettyOptions; +import com.datastax.driver.core.PoolingOptions; +import com.datastax.driver.core.ProtocolOptions; +import com.datastax.driver.core.ProtocolVersion; +import com.datastax.driver.core.SSLOptions; +import com.datastax.driver.core.SocketOptions; +import com.datastax.driver.core.policies.AddressTranslator; +import com.datastax.driver.core.policies.LoadBalancingPolicy; +import com.datastax.driver.core.policies.ReconnectionPolicy; +import com.datastax.driver.core.policies.RetryPolicy; +import com.datastax.driver.core.policies.SpeculativeExecutionPolicy; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.Serializable; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.LinkedList; +import java.util.List; +import java.util.UUID; + +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.store.cassandra.session.CassandraSession; +import org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl; +import org.apache.ignite.internal.util.typedef.internal.U; ++import org.apache.ignite.internal.util.tostring.GridToStringExclude; ++import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * Data source abstraction to specify configuration of the Cassandra session to be used. + */ +public class DataSource implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Null object, used as a replacement for those Cassandra connection options which + * don't support serialization (RetryPolicy, LoadBalancingPolicy and etc). + */ + private static final UUID NULL_OBJECT = UUID.fromString("45ffae47-3193-5910-84a2-048fe65735d9"); + + /** Number of rows to immediately fetch in CQL statement execution. */ + private Integer fetchSize; + + /** Consistency level for READ operations. */ + private ConsistencyLevel readConsistency; + + /** Consistency level for WRITE operations. */ + private ConsistencyLevel writeConsistency; + + /** Username to use for authentication. */ ++ @GridToStringExclude + private String user; + + /** Password to use for authentication. */ ++ @GridToStringExclude + private String pwd; + + /** Port to use for Cassandra connection. */ + private Integer port; + + /** List of contact points to connect to Cassandra cluster. */ + private List<InetAddress> contactPoints; + + /** List of contact points with ports to connect to Cassandra cluster. */ + private List<InetSocketAddress> contactPointsWithPorts; + + /** Maximum time to wait for schema agreement before returning from a DDL query. */ + private Integer maxSchemaAgreementWaitSeconds; + + /** The native protocol version to use. */ + private Integer protoVer; + + /** Compression to use for the transport. */ + private String compression; + + /** Use SSL for communications with Cassandra. */ + private Boolean useSSL; + + /** Enables metrics collection. */ + private Boolean collectMetrix; + + /** Enables JMX reporting of the metrics. */ + private Boolean jmxReporting; + + /** Credentials to use for authentication. */ + private Credentials creds; + + /** Load balancing policy to use. */ + private LoadBalancingPolicy loadBalancingPlc; + + /** Reconnection policy to use. */ + private ReconnectionPolicy reconnectionPlc; + + /** Retry policy to use. */ + private RetryPolicy retryPlc; + + /** Address translator to use. */ + private AddressTranslator addrTranslator; + + /** Speculative execution policy to use. */ + private SpeculativeExecutionPolicy speculativeExecutionPlc; + + /** Authentication provider to use. */ + private AuthProvider authProvider; + + /** SSL options to use. */ + private SSLOptions sslOptions; + + /** Connection pooling options to use. */ + private PoolingOptions poolingOptions; + + /** Socket options to use. */ + private SocketOptions sockOptions; + + /** Netty options to use for connection. */ + private NettyOptions nettyOptions; + + /** Cassandra session wrapper instance. */ + private volatile CassandraSession ses; + + /** + * Sets user name to use for authentication. + * + * @param user user name + */ + @SuppressWarnings("UnusedDeclaration") + public void setUser(String user) { + this.user = user; + + invalidate(); + } + + /** + * Sets password to use for authentication. + * + * @param pwd password + */ + @SuppressWarnings("UnusedDeclaration") + public void setPassword(String pwd) { + this.pwd = pwd; + + invalidate(); + } + + /** + * Sets port to use for Cassandra connection. + * + * @param port port + */ + @SuppressWarnings("UnusedDeclaration") + public void setPort(int port) { + this.port = port; + + invalidate(); + } + + /** + * Sets list of contact points to connect to Cassandra cluster. + * + * @param points contact points + */ + public void setContactPoints(String... points) { + if (points == null || points.length == 0) + return; + + for (String point : points) { + if (point.contains(":")) { + if (contactPointsWithPorts == null) + contactPointsWithPorts = new LinkedList<>(); + + String[] chunks = point.split(":"); + + try { + contactPointsWithPorts.add(InetSocketAddress.createUnresolved(chunks[0].trim(), Integer.parseInt(chunks[1].trim()))); + } + catch (Throwable e) { + throw new IllegalArgumentException("Incorrect contact point '" + point + "' specified for Cassandra cache storage", e); + } + } + else { + if (contactPoints == null) + contactPoints = new LinkedList<>(); + + try { + contactPoints.add(InetAddress.getByName(point)); + } + catch (Throwable e) { + throw new IllegalArgumentException("Incorrect contact point '" + point + "' specified for Cassandra cache storage", e); + } + } + } + + invalidate(); + } + + /** Sets maximum time to wait for schema agreement before returning from a DDL query. */ + @SuppressWarnings("UnusedDeclaration") + public void setMaxSchemaAgreementWaitSeconds(int seconds) { + maxSchemaAgreementWaitSeconds = seconds; + + invalidate(); + } + + /** + * Sets the native protocol version to use. + * + * @param ver version number + */ + @SuppressWarnings("UnusedDeclaration") + public void setProtocolVersion(int ver) { + protoVer = ver; + + invalidate(); + } + + /** + * Sets compression algorithm to use for the transport. + * + * @param compression Compression algorithm. + */ + @SuppressWarnings("UnusedDeclaration") + public void setCompression(String compression) { + this.compression = compression == null || compression.trim().isEmpty() ? null : compression.trim(); + + try { + if (this.compression != null) + ProtocolOptions.Compression.valueOf(this.compression); + } + catch (Throwable e) { + throw new IgniteException("Incorrect compression '" + compression + "' specified for Cassandra connection", e); + } + + invalidate(); + } + + /** + * Enables SSL for communications with Cassandra. + * + * @param use Flag to enable/disable SSL. + */ + @SuppressWarnings("UnusedDeclaration") + public void setUseSSL(boolean use) { + useSSL = use; + + invalidate(); + } + + /** + * Enables metrics collection. + * + * @param collect Flag to enable/disable metrics collection. + */ + @SuppressWarnings("UnusedDeclaration") + public void setCollectMetrix(boolean collect) { + collectMetrix = collect; + + invalidate(); + } + + /** + * Enables JMX reporting of the metrics. + * + * @param enableReporting Flag to enable/disable JMX reporting. + */ + @SuppressWarnings("UnusedDeclaration") + public void setJmxReporting(boolean enableReporting) { + jmxReporting = enableReporting; + + invalidate(); + } + + /** + * Sets number of rows to immediately fetch in CQL statement execution. + * + * @param size Number of rows to fetch. + */ + @SuppressWarnings("UnusedDeclaration") + public void setFetchSize(int size) { + fetchSize = size; + + invalidate(); + } + + /** + * Set consistency level for READ operations. + * + * @param level Consistency level. + */ + public void setReadConsistency(String level) { + readConsistency = parseConsistencyLevel(level); + + invalidate(); + } + + /** + * Set consistency level for WRITE operations. + * + * @param level Consistency level. + */ + public void setWriteConsistency(String level) { + writeConsistency = parseConsistencyLevel(level); + + invalidate(); + } + + /** + * Sets credentials to use for authentication. + * + * @param creds Credentials. + */ + public void setCredentials(Credentials creds) { + this.creds = creds; + + invalidate(); + } + + /** + * Sets load balancing policy. + * + * @param plc Load balancing policy. + */ + public void setLoadBalancingPolicy(LoadBalancingPolicy plc) { + loadBalancingPlc = plc; + + invalidate(); + } + + /** + * Sets reconnection policy. + * + * @param plc Reconnection policy. + */ + @SuppressWarnings("UnusedDeclaration") + public void setReconnectionPolicy(ReconnectionPolicy plc) { + reconnectionPlc = plc; + + invalidate(); + } + + /** + * Sets retry policy. + * + * @param plc Retry policy. + */ + @SuppressWarnings("UnusedDeclaration") + public void setRetryPolicy(RetryPolicy plc) { + retryPlc = plc; + + invalidate(); + } + + /** + * Sets address translator. + * + * @param translator Address translator. + */ + @SuppressWarnings("UnusedDeclaration") + public void setAddressTranslator(AddressTranslator translator) { + addrTranslator = translator; + + invalidate(); + } + + /** + * Sets speculative execution policy. + * + * @param plc Speculative execution policy. + */ + @SuppressWarnings("UnusedDeclaration") + public void setSpeculativeExecutionPolicy(SpeculativeExecutionPolicy plc) { + speculativeExecutionPlc = plc; + + invalidate(); + } + + /** + * Sets authentication provider. + * + * @param provider Authentication provider. + */ + @SuppressWarnings("UnusedDeclaration") + public void setAuthProvider(AuthProvider provider) { + authProvider = provider; + + invalidate(); + } + + /** + * Sets SSL options. + * + * @param options SSL options. + */ + @SuppressWarnings("UnusedDeclaration") + public void setSslOptions(SSLOptions options) { + sslOptions = options; + + invalidate(); + } + + /** + * Sets pooling options. + * + * @param options pooling options to use. + */ + @SuppressWarnings("UnusedDeclaration") + public void setPoolingOptions(PoolingOptions options) { + poolingOptions = options; + + invalidate(); + } + + /** + * Sets socket options to use. + * + * @param options Socket options. + */ + @SuppressWarnings("UnusedDeclaration") + public void setSocketOptions(SocketOptions options) { + sockOptions = options; + + invalidate(); + } + + /** + * Sets netty options to use. + * + * @param options netty options. + */ + @SuppressWarnings("UnusedDeclaration") + public void setNettyOptions(NettyOptions options) { + nettyOptions = options; + + invalidate(); + } + + /** + * Creates Cassandra session wrapper if it wasn't created yet and returns it + * + * @param log logger + * @return Cassandra session wrapper + */ + @SuppressWarnings("deprecation") + public synchronized CassandraSession session(IgniteLogger log) { + if (ses != null) + return ses; + + Cluster.Builder builder = Cluster.builder(); + + if (user != null) + builder = builder.withCredentials(user, pwd); + + if (port != null) + builder = builder.withPort(port); + + if (contactPoints != null) + builder = builder.addContactPoints(contactPoints); + + if (contactPointsWithPorts != null) + builder = builder.addContactPointsWithPorts(contactPointsWithPorts); + + if (maxSchemaAgreementWaitSeconds != null) + builder = builder.withMaxSchemaAgreementWaitSeconds(maxSchemaAgreementWaitSeconds); + + if (protoVer != null) + builder = builder.withProtocolVersion(ProtocolVersion.fromInt(protoVer)); + + if (compression != null) { + try { + builder = builder.withCompression(ProtocolOptions.Compression.valueOf(compression.trim().toLowerCase())); + } + catch (IllegalArgumentException e) { + throw new IgniteException("Incorrect compression option '" + compression + "' specified for Cassandra connection", e); + } + } + + if (useSSL != null && useSSL) + builder = builder.withSSL(); + + if (sslOptions != null) + builder = builder.withSSL(sslOptions); + + if (collectMetrix != null && !collectMetrix) + builder = builder.withoutMetrics(); + + if (jmxReporting != null && !jmxReporting) + builder = builder.withoutJMXReporting(); + + if (creds != null) + builder = builder.withCredentials(creds.getUser(), creds.getPassword()); + + if (loadBalancingPlc != null) + builder = builder.withLoadBalancingPolicy(loadBalancingPlc); + + if (reconnectionPlc != null) + builder = builder.withReconnectionPolicy(reconnectionPlc); + + if (retryPlc != null) + builder = builder.withRetryPolicy(retryPlc); + + if (addrTranslator != null) + builder = builder.withAddressTranslator(addrTranslator); + + if (speculativeExecutionPlc != null) + builder = builder.withSpeculativeExecutionPolicy(speculativeExecutionPlc); + + if (authProvider != null) + builder = builder.withAuthProvider(authProvider); + + if (poolingOptions != null) + builder = builder.withPoolingOptions(poolingOptions); + + if (sockOptions != null) + builder = builder.withSocketOptions(sockOptions); + + if (nettyOptions != null) + builder = builder.withNettyOptions(nettyOptions); + + return ses = new CassandraSessionImpl(builder, fetchSize, readConsistency, writeConsistency, log); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(fetchSize); + out.writeObject(readConsistency); + out.writeObject(writeConsistency); + U.writeString(out, user); + U.writeString(out, pwd); + out.writeObject(port); + out.writeObject(contactPoints); + out.writeObject(contactPointsWithPorts); + out.writeObject(maxSchemaAgreementWaitSeconds); + out.writeObject(protoVer); + U.writeString(out, compression); + out.writeObject(useSSL); + out.writeObject(collectMetrix); + out.writeObject(jmxReporting); + out.writeObject(creds); + writeObject(out, loadBalancingPlc); + writeObject(out, reconnectionPlc); + writeObject(out, addrTranslator); + writeObject(out, speculativeExecutionPlc); + writeObject(out, authProvider); + writeObject(out, sslOptions); + writeObject(out, poolingOptions); + writeObject(out, sockOptions); + writeObject(out, nettyOptions); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + fetchSize = (Integer)in.readObject(); + readConsistency = (ConsistencyLevel)in.readObject(); + writeConsistency = (ConsistencyLevel)in.readObject(); + user = U.readString(in); + pwd = U.readString(in); + port = (Integer)in.readObject(); + contactPoints = (List<InetAddress>)in.readObject(); + contactPointsWithPorts = (List<InetSocketAddress>)in.readObject(); + maxSchemaAgreementWaitSeconds = (Integer)in.readObject(); + protoVer = (Integer)in.readObject(); + compression = U.readString(in); + useSSL = (Boolean)in.readObject(); + collectMetrix = (Boolean)in.readObject(); + jmxReporting = (Boolean)in.readObject(); + creds = (Credentials)in.readObject(); + loadBalancingPlc = (LoadBalancingPolicy)readObject(in); + reconnectionPlc = (ReconnectionPolicy)readObject(in); + addrTranslator = (AddressTranslator)readObject(in); + speculativeExecutionPlc = (SpeculativeExecutionPolicy)readObject(in); + authProvider = (AuthProvider)readObject(in); + sslOptions = (SSLOptions)readObject(in); + poolingOptions = (PoolingOptions)readObject(in); + sockOptions = (SocketOptions)readObject(in); + nettyOptions = (NettyOptions)readObject(in); + } + + /** + * Helper method used to serialize class members + * @param out the stream to write the object to + * @param obj the object to be written + * @throws IOException Includes any I/O exceptions that may occur + */ + private void writeObject(ObjectOutput out, Object obj) throws IOException { + out.writeObject(obj == null || !(obj instanceof Serializable) ? NULL_OBJECT : obj); + } + + /** + * Helper method used to deserialize class members + * @param in the stream to read data from in order to restore the object + * @throws IOException Includes any I/O exceptions that may occur + * @throws ClassNotFoundException If the class for an object being restored cannot be found + * @return deserialized object + */ + private Object readObject(ObjectInput in) throws IOException, ClassNotFoundException { + Object obj = in.readObject(); + return NULL_OBJECT.equals(obj) ? null : obj; + } + + /** + * Parses consistency level provided as string. + * + * @param level consistency level string. + * + * @return consistency level. + */ + private ConsistencyLevel parseConsistencyLevel(String level) { + if (level == null) + return null; + + try { + return ConsistencyLevel.valueOf(level.trim().toUpperCase()); + } + catch (Throwable e) { + throw new IgniteException("Incorrect consistency level '" + level + "' specified for Cassandra connection", e); + } + } + + /** + * Invalidates session. + */ + private synchronized void invalidate() { + ses = null; + } ++ ++ /** {@inheritDoc} */ ++ @Override public String toString() { ++ return S.toString(DataSource.class, this); ++ } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/IgniteServices.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryClassDescriptor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryObjectBuilderImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 5f0b8a0,4d59d50..8187e8f --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@@ -3076,8 -3233,8 +3076,8 @@@ public abstract class GridCacheAdapter< } /** {@inheritDoc} */ - @Override public CacheMetrics clusterMetrics() { + @Override public final CacheMetrics clusterMetrics() { - return clusterMetrics(ctx.grid().cluster().forCacheNodes(ctx.name())); + return clusterMetrics(ctx.grid().cluster().forDataNodes(ctx.name())); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequestHandler.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index aeb3ef4,3690f35..d26242d --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@@ -1587,9 -1590,9 +1590,9 @@@ public class GridServiceProcessor exten else topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0); - depExe.submit(new BusyRunnable() { + depExe.execute(new BusyRunnable() { @Override public void run0() { - ClusterNode oldest = CU.oldestAliveCacheServerNode(cache.context().shared(), topVer); + ClusterNode oldest = ctx.discovery().oldestAliveCacheServerNode(topVer); if (oldest != null && oldest.isLocal()) { final Collection<GridServiceDeployment> retries = new ConcurrentLinkedQueue<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java index ee5b65c,43017db..8f8d78a --- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java @@@ -104,24 -104,9 +104,25 @@@ public abstract class AbstractAffinityF } /** + * @param backups Number of backups. * @throws Exception If failed. */ + public void testNullKeyForPartitionCalculation() throws Exception { + AffinityFunction aff = affinityFunction(); + + try { + aff.partition(null); + + fail("Should throw IllegalArgumentException due to NULL affinity key."); + } catch (IllegalArgumentException e) { + e.getMessage().contains("Null key is passed for a partition calculation. " + + "Make sure that an affinity key that is used is initialized properly."); + } + } + + /** + * @throws Exception If failed. + */ protected void checkNodeRemoved(int backups) throws Exception { checkNodeRemoved(backups, 1, 1); } http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java index 554bb3d,deec72a..1e73e79 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java @@@ -39,10 -39,7 +39,11 @@@ import org.apache.ignite.cache.store.jd import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreTest; import org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreMultithreadedSelfTest; import org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreSelfTest; +import org.apache.ignite.internal.managers.communication.IgniteCommunicationBalanceMultipleConnectionsTest; +import org.apache.ignite.internal.managers.communication.IgniteCommunicationBalanceTest; +import org.apache.ignite.internal.managers.communication.IgniteIoTestMessagesTest; +import org.apache.ignite.internal.managers.communication.IgniteVariousConnectionNumberTest; + import org.apache.ignite.cache.store.jdbc.JdbcTypesDefaultTransformerTest; import org.apache.ignite.internal.processors.cache.CacheAffinityCallSelfTest; import org.apache.ignite.internal.processors.cache.CacheDeferredDeleteSanitySelfTest; import org.apache.ignite.internal.processors.cache.CacheEntryProcessorCopySelfTest; http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java index 2d06f3a,350b715..b28619c --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java @@@ -66,7 -66,7 +66,8 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.service.IgniteServiceDeploymentClassLoadingDefaultMarshallerTest; import org.apache.ignite.internal.processors.service.IgniteServiceDeploymentClassLoadingJdkMarshallerTest; import org.apache.ignite.internal.processors.service.IgniteServiceDeploymentClassLoadingOptimizedMarshallerTest; +import org.apache.ignite.internal.processors.service.IgniteServiceDynamicCachesSelfTest; + import org.apache.ignite.internal.processors.service.IgniteServiceProxyTimeoutInitializedTest; import org.apache.ignite.internal.processors.service.IgniteServiceReassignmentTest; import org.apache.ignite.internal.processors.service.ServicePredicateAccessCacheTest; import org.apache.ignite.internal.util.GridStartupWithUndefinedIgniteHomeSelfTest; @@@ -142,7 -142,7 +143,8 @@@ public class IgniteKernalSelfTestSuite suite.addTestSuite(GridServiceProxyNodeStopSelfTest.class); suite.addTestSuite(GridServiceProxyClientReconnectSelfTest.class); suite.addTestSuite(IgniteServiceReassignmentTest.class); + suite.addTestSuite(IgniteServiceProxyTimeoutInitializedTest.class); + suite.addTestSuite(IgniteServiceDynamicCachesSelfTest.class); suite.addTestSuite(IgniteServiceDeploymentClassLoadingDefaultMarshallerTest.class); suite.addTestSuite(IgniteServiceDeploymentClassLoadingOptimizedMarshallerTest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 5df44db,362ddd8..c541185 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@@ -794,40 -774,29 +794,45 @@@ public class IgniteH2Indexing implement throws IgniteCheckedException { final Connection conn = connectionForSpace(spaceName); - initLocalQueryContext(conn, enforceJoinOrder, filters); + setupConnection(conn, false, enforceJoinOrder); - Prepared p = null; + final PreparedStatement stmt = preparedStatementWithParams(conn, qry, params, true); - try { - final PreparedStatement stmt = preparedStatementWithParams(conn, qry, params, true); ++ Prepared p = GridSqlQueryParser.prepared((JdbcPreparedStatement)stmt); + - p = GridSqlQueryParser.prepared((JdbcPreparedStatement) stmt); ++ if (!p.isQuery()) { ++ GridH2QueryContext.clearThreadLocal(); + - if (!p.isQuery()) { - GridH2QueryContext.clearThreadLocal(); ++ SqlFieldsQuery fldsQry = new SqlFieldsQuery(qry); + - SqlFieldsQuery fldsQry = new SqlFieldsQuery(qry); ++ if (params != null) ++ fldsQry.setArgs(params.toArray()); + - if (params != null) - fldsQry.setArgs(params.toArray()); ++ fldsQry.setEnforceJoinOrder(enforceJoinOrder); ++ fldsQry.setTimeout(timeout, TimeUnit.MILLISECONDS); + - fldsQry.setEnforceJoinOrder(enforceJoinOrder); - fldsQry.setTimeout(timeout, TimeUnit.MILLISECONDS); ++ return dmlProc.updateLocalSqlFields(spaceName, stmt, fldsQry, filters, cancel); ++ } + - return dmlProc.updateLocalSqlFields(spaceName, stmt, fldsQry, filters, cancel); - } + List<GridQueryFieldMetadata> meta; - List<GridQueryFieldMetadata> meta; + try { + meta = meta(stmt.getMetaData()); + } + catch (SQLException e) { + throw new IgniteCheckedException("Cannot prepare query metadata", e); + } - try { - meta = meta(stmt.getMetaData()); - } - catch (SQLException e) { - throw new IgniteCheckedException("Cannot prepare query metadata", e); - } + final GridH2QueryContext ctx = new GridH2QueryContext(nodeId, nodeId, 0, LOCAL) + .filter(filters).distributedJoins(false); - return new GridQueryFieldsResultAdapter(meta, null) { - @Override public GridCloseableIterator<List<?>> iterator() throws IgniteCheckedException{ + return new GridQueryFieldsResultAdapter(meta, null) { + @Override public GridCloseableIterator<List<?>> iterator() throws IgniteCheckedException { + assert GridH2QueryContext.get() == null; + + GridH2QueryContext.set(ctx); + + try { ResultSet rs = executeSqlQueryWithTimer(spaceName, stmt, conn, qry, params, timeout, cancel); return new FieldsIterator(rs); http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ----------------------------------------------------------------------
