Merge branches 'ignite-471' and 'sprint-2' of
https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-471
Conflicts:
modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerResponse.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2f6f8cdf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2f6f8cdf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2f6f8cdf
Branch: refs/heads/ignite-471
Commit: 2f6f8cdf4bd3e182bb6e7153487ebb45185de603
Parents: f28b750 d8c0766
Author: Valentin Kulichenko <[email protected]>
Authored: Thu Mar 12 10:59:03 2015 -0700
Committer: Valentin Kulichenko <[email protected]>
Committed: Thu Mar 12 10:59:03 2015 -0700
----------------------------------------------------------------------
DEVNOTES.txt | 13 +-
LICENSE.txt | 2 +-
NOTICE.txt | 0
RELEASE_NOTES.txt | 0
assembly/dependencies-optional-fabric.xml | 2 +-
assembly/dependencies-optional-hadoop.xml | 2 +-
assembly/dependencies-schema-import.xml | 56 +
assembly/dependencies-schema-load.xml | 56 -
assembly/release-hadoop.xml | 16 +-
bin/ignite-schema-import.bat | 116 ++
bin/ignite-schema-import.sh | 78 +
bin/ignite-schema-load.bat | 116 --
bin/ignite-schema-load.sh | 78 -
config/hadoop/default-config.xml | 10 +-
docs/core-site.ignite.xml | 90 -
docs/hadoop_readme.md | 134 --
docs/hadoop_readme.pdf | Bin 82297 -> 0 bytes
docs/hive-site.ignite.xml | 37 -
docs/mapred-site.ignite.xml | 66 -
examples/config/example-cache.xml | 4 +-
examples/config/filesystem/example-igfs.xml | 13 +-
examples/config/store/example-database.script | 27 +
.../config/store/example-jdbc-pojo-store.xml | 142 ++
.../datagrid/CacheDataLoaderExample.java | 85 -
.../datagrid/CacheDataStreamerExample.java | 85 +
.../datagrid/CachePopularNumbersExample.java | 14 +-
.../store/CacheNodeWithStoreStartup.java | 33 +
.../datagrid/store/CacheStoreExample.java | 1 +
.../store/CacheStoreLoadDataExample.java | 10 +-
.../ignite/examples/datagrid/store/Person.java | 103 --
.../store/dummy/CacheDummyPersonStore.java | 2 +-
.../hibernate/CacheHibernatePersonStore.java | 2 +-
.../datagrid/store/hibernate/Person.hbm.xml | 2 +-
.../datagrid/store/hibernate/hibernate.cfg.xml | 3 -
.../store/jdbc/CacheJdbcPersonStore.java | 4 +-
.../store/jdbc/CacheJdbcPojoPersonStore.java | 96 ++
.../examples/datagrid/store/model/Person.java | 155 ++
.../datagrid/store/model/PersonKey.java | 97 ++
.../MemcacheRestExampleNodeStartup.java | 5 +-
.../ScalarCachePopularNumbersExample.scala | 49 +-
.../ignite/examples/CacheExamplesSelfTest.java | 6 +-
.../clients/src/test/resources/spring-cache.xml | 8 +-
.../ignite/codegen/MessageCodeGenerator.java | 1 +
modules/core/licenses/snaptree-bsd-license.txt | 35 +
modules/core/licenses/snaptree-license.txt | 35 -
modules/core/licenses/sun-bcl-license.txt | 50 -
.../src/main/java/org/apache/ignite/Ignite.java | 10 +-
.../java/org/apache/ignite/IgniteCache.java | 24 +
.../org/apache/ignite/IgniteDataLoader.java | 379 ----
.../org/apache/ignite/IgniteDataStreamer.java | 401 +++++
.../apache/ignite/cache/CachePreloadMode.java | 67 -
.../apache/ignite/cache/CacheRebalanceMode.java | 67 +
.../java/org/apache/ignite/cache/GridCache.java | 12 +-
.../store/jdbc/CacheAbstractJdbcStore.java | 33 +-
.../cache/store/jdbc/CacheJdbcPojoStore.java | 52 +-
.../apache/ignite/cluster/ClusterMetrics.java | 66 +-
.../configuration/CacheConfiguration.java | 231 ++-
.../configuration/FileSystemConfiguration.java | 55 +-
.../ignite/events/CachePreloadingEvent.java | 172 --
.../ignite/events/CacheRebalancingEvent.java | 172 ++
.../java/org/apache/ignite/events/Event.java | 2 +-
.../org/apache/ignite/events/EventType.java | 44 +-
.../igfs/IgfsIpcEndpointConfiguration.java | 241 +++
.../apache/ignite/igfs/IgfsIpcEndpointType.java | 29 +
.../ClusterLocalNodeMetricsMXBeanImpl.java | 5 +
.../ignite/internal/ClusterMetricsSnapshot.java | 26 +-
.../ignite/internal/GridKernalContext.java | 8 +-
.../ignite/internal/GridKernalContextImpl.java | 12 +-
.../org/apache/ignite/internal/GridTopic.java | 2 +-
.../apache/ignite/internal/IgniteKernal.java | 16 +-
.../org/apache/ignite/internal/IgnitionEx.java | 8 +-
.../communication/GridIoMessageFactory.java | 8 +-
.../processors/cache/GridCacheAdapter.java | 22 +-
.../processors/cache/GridCacheAttributes.java | 32 +-
.../processors/cache/GridCacheContext.java | 8 +-
.../processors/cache/GridCacheEventManager.java | 12 +-
.../cache/GridCacheEvictionManager.java | 6 +-
.../processors/cache/GridCacheMapEntry.java | 2 +-
.../GridCachePartitionExchangeManager.java | 6 +-
.../processors/cache/GridCacheProcessor.java | 74 +-
.../processors/cache/GridCacheUtils.java | 4 +-
.../processors/cache/IgniteCacheProxy.java | 7 +
.../GridDistributedCacheAdapter.java | 10 +-
.../distributed/dht/GridDhtLocalPartition.java | 4 +-
.../distributed/dht/GridDhtLockFuture.java | 4 +-
.../dht/GridDhtPartitionTopologyImpl.java | 14 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 10 +-
.../dht/preloader/GridDhtForceKeysFuture.java | 30 +-
.../preloader/GridDhtPartitionDemandPool.java | 78 +-
.../preloader/GridDhtPartitionSupplyPool.java | 22 +-
.../dht/preloader/GridDhtPreloader.java | 14 +-
.../transactions/IgniteTxLocalAdapter.java | 4 +-
.../version/GridCacheRawVersionedEntry.java | 4 +-
.../dataload/GridDataLoadCacheUpdaters.java | 206 ---
.../dataload/GridDataLoadRequest.java | 451 -----
.../dataload/GridDataLoadResponse.java | 166 --
.../dataload/GridDataLoadUpdateJob.java | 150 --
.../dataload/GridDataLoaderFuture.java | 62 -
.../dataload/GridDataLoaderProcessor.java | 308 ----
.../dataload/IgniteDataLoaderEntry.java | 170 --
.../dataload/IgniteDataLoaderImpl.java | 1392 ---------------
.../internal/processors/dataload/package.html | 24 -
.../datastreamer/DataStreamProcessor.java | 308 ++++
.../datastreamer/DataStreamerCacheUpdaters.java | 206 +++
.../datastreamer/DataStreamerEntry.java | 170 ++
.../datastreamer/DataStreamerFuture.java | 60 +
.../datastreamer/DataStreamerImpl.java | 1404 +++++++++++++++
.../datastreamer/DataStreamerRequest.java | 451 +++++
.../datastreamer/DataStreamerResponse.java | 166 ++
.../datastreamer/DataStreamerUpdateJob.java | 150 ++
.../processors/datastreamer/package.html | 24 +
.../dr/GridDrDataLoadCacheUpdater.java | 92 -
.../dr/IgniteDrDataStreamerCacheUpdater.java | 92 +
.../processors/igfs/IgfsDataManager.java | 25 +-
.../internal/processors/igfs/IgfsImpl.java | 2 +-
.../internal/processors/igfs/IgfsServer.java | 49 +-
.../processors/igfs/IgfsServerManager.java | 35 +-
.../processors/rest/GridRestCommand.java | 8 +-
.../processors/rest/GridRestProcessor.java | 2 -
.../processors/streamer/IgniteStreamerImpl.java | 4 +-
.../ignite/internal/util/IgniteUtils.java | 2 +-
.../util/ipc/IpcServerEndpointDeserializer.java | 66 -
.../visor/cache/VisorCacheCompactTask.java | 66 -
.../cache/VisorCachePreloadConfiguration.java | 16 +-
.../visor/node/VisorIgfsConfiguration.java | 5 +-
.../mxbean/ClusterLocalNodeMetricsMXBean.java | 4 +
.../spi/discovery/tcp/TcpDiscoverySpi.java | 11 +-
.../resources/META-INF/classnames.properties | 34 +-
.../resources/META-INF/licenses/apache-2.0.txt | 0
.../core/src/test/config/discovery-stress.xml | 2 +-
modules/core/src/test/config/example-cache.xml | 4 +-
modules/core/src/test/config/igfs-loopback.xml | 6 +-
modules/core/src/test/config/igfs-shmem.xml | 8 +-
.../config/load/dsi-49-server-production.xml | 4 +-
.../src/test/config/load/dsi-load-client.xml | 4 +-
.../src/test/config/load/dsi-load-server.xml | 4 +-
.../core/src/test/config/spring-cache-load.xml | 2 +-
.../core/src/test/config/spring-multicache.xml | 16 +-
.../test/config/websession/spring-cache-1.xml | 6 +-
.../test/config/websession/spring-cache-2.xml | 6 +-
.../test/config/websession/spring-cache-3.xml | 6 +-
.../ignite/internal/ClusterMetricsSelfTest.java | 2 +
.../GridDiscoveryManagerAliveCacheSelfTest.java | 4 +-
.../GridCacheAbstractFailoverSelfTest.java | 4 +-
.../GridCacheAbstractLocalStoreSelfTest.java | 8 +-
...acheAbstractUsersAffinityMapperSelfTest.java | 4 +-
.../cache/GridCacheBasicStoreAbstractTest.java | 4 +-
.../GridCacheConcurrentTxMultiNodeTest.java | 4 +-
...idCacheConfigurationConsistencySelfTest.java | 6 +-
...ridCacheConfigurationValidationSelfTest.java | 8 +-
.../cache/GridCacheDeploymentSelfTest.java | 4 +-
...idCacheGetAndTransformStoreAbstractTest.java | 4 +-
.../cache/GridCacheIncrementTransformTest.java | 4 +-
.../cache/GridCacheMultiUpdateLockSelfTest.java | 4 +-
.../GridCacheOrderedPreloadingSelfTest.java | 6 +-
.../cache/GridCacheP2PUndeploySelfTest.java | 8 +-
.../cache/GridCachePartitionedGetSelfTest.java | 4 +-
...hePartitionedProjectionAffinitySelfTest.java | 4 +-
.../GridCachePreloadingEvictionsSelfTest.java | 6 +-
.../GridCacheQueryInternalKeysSelfTest.java | 4 +-
.../cache/GridCacheSwapPreloadSelfTest.java | 4 +-
.../GridCacheValueBytesPreloadingSelfTest.java | 2 +-
...idCacheValueConsistencyAbstractSelfTest.java | 4 +-
.../IgniteCacheAbstractStopBusySelfTest.java | 2 +-
.../cache/IgniteCacheTxPreloadNoWriteTest.java | 111 ++
...tAllUpdateNonPreloadedPartitionSelfTest.java | 4 +-
...dCacheQueueMultiNodeConsistencySelfTest.java | 3 +-
.../IgniteCollectionAbstractTest.java | 4 +-
...GridCachePartitionedNodeRestartSelfTest.java | 4 +-
...idCachePartitionedNodeRestartTxSelfTest.java | 4 +-
...PartitionedQueueCreateMultiNodeSelfTest.java | 4 +-
.../GridCacheAbstractNodeRestartSelfTest.java | 6 +-
.../GridCacheAbstractPrimarySyncSelfTest.java | 4 +-
...acheEntrySetIterationPreloadingSelfTest.java | 2 +-
...heExpiredEntriesPreloadAbstractSelfTest.java | 6 +-
.../distributed/GridCacheLockAbstractTest.java | 4 +-
...dCacheMultithreadedFailoverAbstractTest.java | 4 +-
...dCachePartitionedAffinityFilterSelfTest.java | 4 +-
.../GridCachePreloadEventsAbstractSelfTest.java | 8 +-
.../GridCachePreloadLifecycleAbstractTest.java | 6 +-
...GridCachePreloadRestartAbstractSelfTest.java | 10 +-
...iteTxConsistencyRestartAbstractSelfTest.java | 6 +-
.../IgniteTxPreloadAbstractTest.java | 6 +-
.../dht/GridCacheAtomicNearCacheSelfTest.java | 4 +-
...dCacheColocatedTxSingleThreadedSelfTest.java | 4 +-
...GridCacheDhtEvictionNearReadersSelfTest.java | 4 +-
.../dht/GridCacheDhtEvictionSelfTest.java | 4 +-
.../dht/GridCacheDhtInternalEntrySelfTest.java | 4 +-
.../dht/GridCacheDhtMappingSelfTest.java | 4 +-
.../dht/GridCacheDhtPreloadBigDataSelfTest.java | 10 +-
.../dht/GridCacheDhtPreloadDelayedSelfTest.java | 26 +-
.../GridCacheDhtPreloadDisabledSelfTest.java | 8 +-
.../GridCacheDhtPreloadMessageCountTest.java | 4 +-
.../dht/GridCacheDhtPreloadPutGetSelfTest.java | 6 +-
.../dht/GridCacheDhtPreloadSelfTest.java | 28 +-
.../GridCacheDhtPreloadStartStopSelfTest.java | 12 +-
.../dht/GridCacheDhtPreloadUnloadSelfTest.java | 10 +-
...ePartitionedNearDisabledMetricsSelfTest.java | 4 +-
...idCachePartitionedPreloadEventsSelfTest.java | 8 +-
...dCachePartitionedTopologyChangeSelfTest.java | 4 +-
...ridCachePartitionedUnloadEventsSelfTest.java | 12 +-
...eAtomicInvalidPartitionHandlingSelfTest.java | 4 +-
...unctionExcludeNeighborsAbstractSelfTest.java | 4 +-
...GridCacheAtomicMultiNodeFullApiSelfTest.java | 4 +-
...idCacheAtomicPartitionedMetricsSelfTest.java | 4 +-
.../near/GridCacheNearEvictionSelfTest.java | 7 +-
.../near/GridCacheNearMultiGetSelfTest.java | 4 +-
.../near/GridCacheNearOnlyTopologySelfTest.java | 4 +-
.../GridCacheNearPartitionedClearSelfTest.java | 4 +-
.../near/GridCacheNearReadersSelfTest.java | 4 +-
.../near/GridCacheNearTxMultiNodeSelfTest.java | 4 +-
...AffinityExcludeNeighborsPerformanceTest.java | 4 +-
.../GridCachePartitionedAffinitySelfTest.java | 4 +-
.../GridCachePartitionedBasicOpSelfTest.java | 2 +-
.../near/GridCachePartitionedEventSelfTest.java | 4 +-
...idCachePartitionedHitsAndMissesSelfTest.java | 10 +-
.../GridCachePartitionedMetricsSelfTest.java | 4 +-
...achePartitionedMultiNodeCounterSelfTest.java | 4 +-
...achePartitionedMultiNodeFullApiSelfTest.java | 4 +-
.../GridCachePartitionedNodeRestartTest.java | 4 +-
...ePartitionedOptimisticTxNodeRestartTest.java | 4 +-
...achePartitionedPreloadLifecycleSelfTest.java | 4 +-
...hePartitionedQueryMultiThreadedSelfTest.java | 2 +-
.../GridCachePartitionedTxSalvageSelfTest.java | 2 +-
...achePartitionedTxSingleThreadedSelfTest.java | 4 +-
.../GridCacheReplicatedInvalidateSelfTest.java | 4 +-
.../GridCacheReplicatedNodeRestartSelfTest.java | 6 +-
.../GridCacheSyncReplicatedPreloadSelfTest.java | 6 +-
...CacheReplicatedPreloadLifecycleSelfTest.java | 4 +-
.../GridCacheReplicatedPreloadSelfTest.java | 16 +-
...eplicatedPreloadStartStopEventsSelfTest.java | 6 +-
.../GridCacheEvictionFilterSelfTest.java | 4 +-
.../GridCacheLruNearEvictionPolicySelfTest.java | 6 +-
...heNearOnlyLruNearEvictionPolicySelfTest.java | 6 +-
...ridCacheContinuousQueryAbstractSelfTest.java | 4 +-
...dCacheAbstractReduceFieldsQuerySelfTest.java | 4 +-
.../dataload/GridDataLoaderImplSelfTest.java | 208 ---
.../dataload/GridDataLoaderPerformanceTest.java | 197 ---
.../GridDataLoaderProcessorSelfTest.java | 971 -----------
.../DataStreamProcessorSelfTest.java | 970 +++++++++++
.../datastreamer/DataStreamerImplSelfTest.java | 205 +++
.../IgniteDataStreamerPerformanceTest.java | 197 +++
.../processors/igfs/IgfsAbstractSelfTest.java | 24 +-
...sCachePerBlockLruEvictionPolicySelfTest.java | 12 +-
.../processors/igfs/IgfsMetricsSelfTest.java | 12 +-
.../processors/igfs/IgfsModesSelfTest.java | 9 +-
...IpcEndpointRegistrationAbstractSelfTest.java | 21 +-
...dpointRegistrationOnLinuxAndMacSelfTest.java | 7 +-
...pcEndpointRegistrationOnWindowsSelfTest.java | 5 +-
.../processors/igfs/IgfsSizeSelfTest.java | 7 +-
.../IpcServerEndpointDeserializerSelfTest.java | 160 --
.../ipc/shmem/IpcSharedMemoryNodeStartup.java | 9 +-
.../loadtests/GridCacheMultiNodeLoadTest.java | 4 +-
.../capacity/spring-capacity-cache.xml | 4 +-
.../loadtests/colocation/GridTestMain.java | 4 +-
.../loadtests/colocation/spring-colocation.xml | 4 +-
.../GridCachePartitionedAtomicLongLoadTest.java | 2 +-
.../loadtests/discovery/GridGcTimeoutTest.java | 2 +-
.../mapper/GridContinuousMapperLoadTest1.java | 2 +-
.../mapper/GridContinuousMapperLoadTest2.java | 2 +-
.../GridP2PContinuousDeploymentSelfTest.java | 4 +-
.../tcp/GridCacheDhtLockBackupSelfTest.java | 4 +-
.../ignite/testframework/junits/IgniteMock.java | 4 +-
.../junits/common/GridCommonAbstractTest.java | 4 +-
.../ignite/testsuites/IgniteCacheTestSuite.java | 10 +-
.../ignite/testsuites/IgniteIgfsTestSuite.java | 2 -
.../webapp/META-INF/ignite-webapp-config.xml | 12 +-
modules/hadoop/config/core-site.ignite.xml | 90 +
modules/hadoop/config/hive-site.ignite.xml | 37 +
modules/hadoop/config/mapred-site.ignite.xml | 66 +
modules/hadoop/docs/hadoop_readme.md | 135 ++
modules/hadoop/docs/hadoop_readme.pdf | Bin 0 -> 82297 bytes
.../internal/processors/hadoop/HadoopSetup.java | 33 +-
.../hadoop/igfs/HadoopIgfsEndpoint.java | 5 +-
.../HadoopIgfs20FileSystemAbstractSelfTest.java | 4 +-
...Igfs20FileSystemLoopbackPrimarySelfTest.java | 14 +-
...oopIgfs20FileSystemShmemPrimarySelfTest.java | 14 +-
.../igfs/HadoopIgfsDualAbstractSelfTest.java | 25 +-
...oopSecondaryFileSystemConfigurationTest.java | 25 +-
.../apache/ignite/igfs/IgfsEventsTestSuite.java | 44 +-
.../igfs/IgfsNearOnlyMultiNodeSelfTest.java | 10 +-
.../IgniteHadoopFileSystemAbstractSelfTest.java | 16 +-
.../IgniteHadoopFileSystemClientSelfTest.java | 11 +-
...IgniteHadoopFileSystemHandshakeSelfTest.java | 14 +-
.../IgniteHadoopFileSystemIpcCacheSelfTest.java | 10 +-
...niteHadoopFileSystemLoggerStateSelfTest.java | 12 +-
...adoopFileSystemLoopbackAbstractSelfTest.java | 12 +-
...teHadoopFileSystemSecondaryModeSelfTest.java | 23 +-
...teHadoopFileSystemShmemAbstractSelfTest.java | 12 +-
.../hadoop/HadoopCommandLineTest.java | 16 +-
.../hadoop/HadoopPopularWordsTest.java | 4 +-
.../GridCacheAbstractFieldsQuerySelfTest.java | 4 +-
.../cache/GridCacheAbstractQuerySelfTest.java | 4 +-
.../cache/GridCacheCrossCacheQuerySelfTest.java | 4 +-
.../GridCacheCrossCacheQuerySelfTestNewApi.java | 4 +-
.../cache/GridIndexingWithNoopSwapSelfTest.java | 4 +-
.../near/GridCacheQueryNodeRestartSelfTest.java | 6 +-
.../query/h2/sql/GridQueryParsingTest.java | 5 +-
.../tcp/GridOrderedMessageCancelSelfTest.java | 4 +-
.../scala/org/apache/ignite/scalar/scalar.scala | 16 +-
modules/schema-import/pom.xml | 107 ++
modules/schema-import/readme.txt | 214 +++
.../main/java/media/data_connection_48x48.png | Bin 0 -> 4443 bytes
.../src/main/java/media/error_48x48.png | Bin 0 -> 4349 bytes
.../src/main/java/media/ignite_128x128.png | Bin 0 -> 4917 bytes
.../src/main/java/media/ignite_16x16.png | Bin 0 -> 608 bytes
.../src/main/java/media/ignite_24x24.png | Bin 0 -> 930 bytes
.../src/main/java/media/ignite_32x32.png | Bin 0 -> 1203 bytes
.../src/main/java/media/ignite_48x48.png | Bin 0 -> 1868 bytes
.../src/main/java/media/ignite_64x64.png | Bin 0 -> 2453 bytes
.../src/main/java/media/information_48x48.png | Bin 0 -> 4102 bytes
.../src/main/java/media/question_48x48.png | Bin 0 -> 3857 bytes
.../src/main/java/media/sign_warning_48x48.png | Bin 0 -> 2988 bytes
.../schema-import/src/main/java/media/style.css | 117 ++
.../src/main/java/media/text_tree_48x48.png | Bin 0 -> 2567 bytes
.../ignite/schema/generator/PojoGenerator.java | 414 +++++
.../schema/generator/SnippetGenerator.java | 138 ++
.../ignite/schema/generator/XmlGenerator.java | 347 ++++
.../ignite/schema/model/PojoDescriptor.java | 510 ++++++
.../apache/ignite/schema/model/PojoField.java | 420 +++++
.../schema/parser/DatabaseMetadataParser.java | 108 ++
.../apache/ignite/schema/parser/DbColumn.java | 76 +
.../apache/ignite/schema/parser/DbTable.java | 105 ++
.../parser/dialect/DB2MetadataDialect.java | 30 +
.../parser/dialect/DatabaseMetadataDialect.java | 78 +
.../parser/dialect/JdbcMetadataDialect.java | 141 ++
.../parser/dialect/OracleMetadataDialect.java | 281 +++
.../ignite/schema/ui/ConfirmCallable.java | 81 +
.../org/apache/ignite/schema/ui/Controls.java | 661 +++++++
.../org/apache/ignite/schema/ui/GridPaneEx.java | 177 ++
.../org/apache/ignite/schema/ui/MessageBox.java | 246 +++
.../apache/ignite/schema/ui/ModalDialog.java | 50 +
.../ignite/schema/ui/SchemaImportApp.java | 1615 ++++++++++++++++++
.../ignite/schema/ui/TextColumnValidator.java | 32 +
.../schema/test/AbstractSchemaImportTest.java | 134 ++
.../test/generator/PojoGeneratorTest.java | 70 +
.../schema/test/generator/XmlGeneratorTest.java | 50 +
.../apache/ignite/schema/test/model/Ignite.xml | 390 +++++
.../apache/ignite/schema/test/model/Objects.txt | 502 ++++++
.../ignite/schema/test/model/ObjectsKey.txt | 96 ++
.../ignite/schema/test/model/Primitives.txt | 506 ++++++
.../ignite/schema/test/model/PrimitivesKey.txt | 96 ++
.../test/parser/DbMetadataParserTest.java | 118 ++
.../testsuites/IgniteSchemaImportTestSuite.java | 41 +
modules/schema-load/pom.xml | 107 --
.../main/java/media/data_connection_48x48.png | Bin 4443 -> 0 bytes
.../src/main/java/media/error_48x48.png | Bin 4349 -> 0 bytes
.../src/main/java/media/ignite_128x128.png | Bin 4917 -> 0 bytes
.../src/main/java/media/ignite_16x16.png | Bin 608 -> 0 bytes
.../src/main/java/media/ignite_24x24.png | Bin 930 -> 0 bytes
.../src/main/java/media/ignite_32x32.png | Bin 1203 -> 0 bytes
.../src/main/java/media/ignite_48x48.png | Bin 1868 -> 0 bytes
.../src/main/java/media/ignite_64x64.png | Bin 2453 -> 0 bytes
.../src/main/java/media/information_48x48.png | Bin 4102 -> 0 bytes
.../src/main/java/media/question_48x48.png | Bin 3857 -> 0 bytes
.../src/main/java/media/sign_warning_48x48.png | Bin 2988 -> 0 bytes
.../schema-load/src/main/java/media/style.css | 117 --
.../src/main/java/media/text_tree_48x48.png | Bin 2567 -> 0 bytes
.../ignite/schema/generator/PojoGenerator.java | 414 -----
.../schema/generator/SnippetGenerator.java | 138 --
.../ignite/schema/generator/XmlGenerator.java | 347 ----
.../ignite/schema/model/PojoDescriptor.java | 510 ------
.../apache/ignite/schema/model/PojoField.java | 420 -----
.../schema/parser/DatabaseMetadataParser.java | 108 --
.../apache/ignite/schema/parser/DbColumn.java | 76 -
.../apache/ignite/schema/parser/DbTable.java | 105 --
.../parser/dialect/DB2MetadataDialect.java | 30 -
.../parser/dialect/DatabaseMetadataDialect.java | 78 -
.../parser/dialect/JdbcMetadataDialect.java | 141 --
.../parser/dialect/OracleMetadataDialect.java | 281 ---
.../ignite/schema/ui/ConfirmCallable.java | 81 -
.../org/apache/ignite/schema/ui/Controls.java | 661 -------
.../org/apache/ignite/schema/ui/GridPaneEx.java | 177 --
.../org/apache/ignite/schema/ui/MessageBox.java | 246 ---
.../apache/ignite/schema/ui/ModalDialog.java | 50 -
.../apache/ignite/schema/ui/SchemaLoadApp.java | 1615 ------------------
.../ignite/schema/ui/TextColumnValidator.java | 32 -
.../schema/load/AbstractSchemaLoaderTest.java | 134 --
.../load/generator/PojoGeneratorTest.java | 70 -
.../schema/load/generator/XmlGeneratorTest.java | 50 -
.../apache/ignite/schema/load/model/Ignite.xml | 390 -----
.../apache/ignite/schema/load/model/Objects.txt | 502 ------
.../ignite/schema/load/model/ObjectsKey.txt | 96 --
.../ignite/schema/load/model/Primitives.txt | 506 ------
.../ignite/schema/load/model/PrimitivesKey.txt | 96 --
.../load/parser/DbMetadataParserTest.java | 118 --
.../testsuites/IgniteSchemaLoadTestSuite.java | 41 -
.../org/apache/ignite/IgniteSpringBean.java | 4 +-
.../ignite/visor/commands/VisorConsole.scala | 3 +-
.../commands/cache/VisorCacheCommand.scala | 30 +-
.../cache/VisorCacheCompactCommand.scala | 151 --
.../commands/events/VisorEventsCommand.scala | 4 +-
.../commands/top/VisorTopologyCommand.scala | 7 +-
.../scala/org/apache/ignite/visor/visor.scala | 12 +-
.../cache/VisorCacheCompactCommandSpec.scala | 103 --
.../testsuites/VisorConsoleSelfTestSuite.scala | 3 +-
.../apache/ignite/visor/plugin/VisorPlugin.java | 8 +-
.../cache/IgniteSqlQueryBenchmark.java | 2 +-
.../cache/IgniteSqlQueryJoinBenchmark.java | 2 +-
pom.xml | 26 +-
.../basic-concepts/async-support.md | 0
.../basic-concepts/getting-started.md | 0
.../basic-concepts/ignite-life-cycel.md | 0
.../documentation/basic-concepts/maven-setup.md | 0
.../basic-concepts/what-is-ignite.md | 0
.../basic-concepts/zero-deployment.md | 0
wiki/documentation/clustering/aws-config.md | 0
wiki/documentation/clustering/cluster-config.md | 0
wiki/documentation/clustering/cluster-groups.md | 0
wiki/documentation/clustering/cluster.md | 0
.../documentation/clustering/leader-election.md | 0
wiki/documentation/clustering/network-config.md | 0
wiki/documentation/clustering/node-local-map.md | 0
.../documentation/compute-grid/checkpointing.md | 0
.../compute-grid/collocate-compute-and-data.md | 0
wiki/documentation/compute-grid/compute-grid.md | 0
.../documentation/compute-grid/compute-tasks.md | 0
.../compute-grid/distributed-closures.md | 0
.../compute-grid/executor-service.md | 0
.../compute-grid/fault-tolerance.md | 0
.../compute-grid/job-scheduling.md | 0
.../compute-grid/load-balancing.md | 0
.../data-grid/affinity-collocation.md | 0
.../data-grid/automatic-db-integration.md | 0
wiki/documentation/data-grid/cache-modes.md | 0
wiki/documentation/data-grid/cache-queries.md | 0
wiki/documentation/data-grid/data-grid.md | 0
wiki/documentation/data-grid/data-loading.md | 0
wiki/documentation/data-grid/evictions.md | 0
.../data-grid/hibernate-l2-cache.md | 0
wiki/documentation/data-grid/jcache.md | 0
wiki/documentation/data-grid/off-heap-memory.md | 0
.../documentation/data-grid/persistent-store.md | 0
wiki/documentation/data-grid/rebalancing.md | 0
wiki/documentation/data-grid/transactions.md | 0
.../data-grid/web-session-clustering.md | 0
.../distributed-data-structures/atomic-types.md | 0
.../countdownlatch.md | 0
.../distributed-data-structures/id-generator.md | 0
.../queue-and-set.md | 0
.../distributed-events/automatic-batching.md | 0
wiki/documentation/distributed-events/events.md | 0
.../distributed-file-system/igfs.md | 0
.../distributed-messaging/messaging.md | 0
wiki/documentation/http/configuration.md | 0
wiki/documentation/http/rest-api.md | 0
.../release-notes/release-notes.md | 0
.../service-grid/cluster-singletons.md | 0
.../service-grid/service-configuration.md | 0
.../service-grid/service-example.md | 0
wiki/documentation/service-grid/service-grid.md | 0
451 files changed, 16003 insertions(+), 15323 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2f6f8cdf/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2f6f8cdf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2f6f8cdf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2f6f8cdf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2f6f8cdf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyPool.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2f6f8cdf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheRawVersionedEntry.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2f6f8cdf/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --cc
modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 0000000,85624d4..9eac46e
mode 000000,100644..100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@@ -1,0 -1,307 +1,308 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.datastreamer;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.managers.communication.*;
+ import org.apache.ignite.internal.managers.deployment.*;
+ import org.apache.ignite.internal.processors.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.internal.util.worker.*;
+ import org.apache.ignite.marshaller.*;
+ import org.apache.ignite.thread.*;
+ import org.jetbrains.annotations.*;
+
++import java.nio.*;
+ import java.util.*;
+ import java.util.concurrent.*;
+
+ import static org.apache.ignite.internal.GridTopic.*;
+ import static
org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+
+ /**
+ *
+ */
+ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
+ /** Loaders map (access is not supposed to be highly concurrent). */
+ private Collection<DataStreamerImpl> ldrs = new GridConcurrentHashSet<>();
+
+ /** Busy lock. */
+ private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+
+ /** Flushing thread. */
+ private Thread flusher;
+
+ /** */
+ private final DelayQueue<DataStreamerImpl<K, V>> flushQ = new
DelayQueue<>();
+
+ /** Marshaller. */
+ private final Marshaller marsh;
+
+ /**
+ * @param ctx Kernal context.
+ */
+ public DataStreamProcessor(GridKernalContext ctx) {
+ super(ctx);
+
+ ctx.io().addMessageListener(TOPIC_DATASTREAM, new
GridMessageListener() {
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ assert msg instanceof DataStreamerRequest;
+
+ processRequest(nodeId, (DataStreamerRequest)msg);
+ }
+ });
+
+ marsh = ctx.config().getMarshaller();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteCheckedException {
+ if (ctx.config().isDaemon())
+ return;
+
+ flusher = new IgniteThread(new GridWorker(ctx.gridName(),
"grid-data-loader-flusher", log) {
+ @Override protected void body() throws InterruptedException {
+ while (!isCancelled()) {
+ DataStreamerImpl<K, V> ldr = flushQ.take();
+
+ if (!busyLock.enterBusy())
+ return;
+
+ try {
+ if (ldr.isClosed())
+ continue;
+
+ ldr.tryFlush();
+
+ flushQ.offer(ldr);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+ }
+ });
+
+ flusher.start();
+
+ if (log.isDebugEnabled())
+ log.debug("Started data streamer processor.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onKernalStop(boolean cancel) {
+ if (ctx.config().isDaemon())
+ return;
+
+ ctx.io().removeMessageListener(TOPIC_DATASTREAM);
+
+ busyLock.block();
+
+ U.interrupt(flusher);
+ U.join(flusher, log);
+
+ for (DataStreamerImpl<?, ?> ldr : ldrs) {
+ if (log.isDebugEnabled())
+ log.debug("Closing active data streamer on grid stop [ldr=" +
ldr + ", cancel=" + cancel + ']');
+
+ try {
+ ldr.closeEx(cancel);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ U.warn(log, "Interrupted while waiting for completion of the
data streamer: " + ldr, e);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to close data streamer: " + ldr, e);
+ }
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Stopped data streamer processor.");
+ }
+
+ /**
+ * @param cacheName Cache name ({@code null} for default cache).
+ * @return Data loader.
+ */
+ public DataStreamerImpl<K, V> dataStreamer(@Nullable String cacheName) {
+ if (!busyLock.enterBusy())
+ throw new IllegalStateException("Failed to create data streamer
(grid is stopping).");
+
+ try {
+ final DataStreamerImpl<K, V> ldr = new DataStreamerImpl<>(ctx,
cacheName, flushQ);
+
+ ldrs.add(ldr);
+
+ ldr.internalFuture().listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> f) {
+ boolean b = ldrs.remove(ldr);
+
+ assert b : "Loader has not been added to set: " + ldr;
+
+ if (log.isDebugEnabled())
+ log.debug("Loader has been completed: " + ldr);
+ }
+ });
+
+ return ldr;
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * @param nodeId Sender ID.
+ * @param req Request.
+ */
+ private void processRequest(UUID nodeId, DataStreamerRequest req) {
+ if (!busyLock.enterBusy()) {
+ if (log.isDebugEnabled())
+ log.debug("Ignoring data load request (node is stopping): " +
req);
+
+ return;
+ }
+
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Processing data load request: " + req);
+
+ Object topic;
+
+ try {
+ topic = marsh.unmarshal(req.responseTopicBytes(), null);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to unmarshal topic from request: " +
req, e);
+
+ return;
+ }
+
+ ClassLoader clsLdr;
+
+ if (req.forceLocalDeployment())
+ clsLdr = U.gridClassLoader();
+ else {
+ GridDeployment dep = ctx.deploy().getGlobalDeployment(
+ req.deploymentMode(),
+ req.sampleClassName(),
+ req.sampleClassName(),
+ req.userVersion(),
+ nodeId,
+ req.classLoaderId(),
+ req.participants(),
+ null);
+
+ if (dep == null) {
+ sendResponse(nodeId,
+ topic,
+ req.requestId(),
+ new IgniteCheckedException("Failed to get deployment
for request [sndId=" + nodeId +
+ ", req=" + req + ']'),
+ false);
+
+ return;
+ }
+
+ clsLdr = dep.classLoader();
+ }
+
+ IgniteDataStreamer.Updater<K, V> updater;
+
+ try {
+ updater = marsh.unmarshal(req.updaterBytes(), clsLdr);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to unmarshal message [nodeId=" + nodeId
+ ", req=" + req + ']', e);
+
+ sendResponse(nodeId, topic, req.requestId(), e, false);
+
+ return;
+ }
+
+ Collection<DataStreamerEntry> col = req.entries();
+
+ DataStreamerUpdateJob job = new DataStreamerUpdateJob(ctx,
+ log,
+ req.cacheName(),
+ col,
+ req.ignoreDeploymentOwnership(),
+ req.skipStore(),
+ updater);
+
+ Exception err = null;
+
+ try {
+ job.call();
+ }
+ catch (Exception e) {
+ U.error(log, "Failed to finish update job.", e);
+
+ err = e;
+ }
+
+ sendResponse(nodeId, topic, req.requestId(), err,
req.forceLocalDeployment());
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param resTopic Response topic.
+ * @param reqId Request ID.
+ * @param err Error.
+ * @param forceLocDep Force local deployment.
+ */
+ private void sendResponse(UUID nodeId, Object resTopic, long reqId,
@Nullable Throwable err,
+ boolean forceLocDep) {
- byte[] errBytes;
++ ByteBuffer errBytes;
+
+ try {
+ errBytes = err != null ? marsh.marshal(err) : null;
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to marshal message.", e);
+
+ return;
+ }
+
+ DataStreamerResponse res = new DataStreamerResponse(reqId, errBytes,
forceLocDep);
+
+ try {
+ ctx.io().send(nodeId, resTopic, res, PUBLIC_POOL);
+ }
+ catch (IgniteCheckedException e) {
+ if (ctx.discovery().alive(nodeId))
+ U.error(log, "Failed to respond to node [nodeId=" + nodeId +
", res=" + res + ']', e);
+ else if (log.isDebugEnabled())
+ log.debug("Node has left the grid: " + nodeId);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void printMemoryStats() {
+ X.println(">>>");
+ X.println(">>> Data streamer processor memory stats [grid=" +
ctx.gridName() + ']');
+ X.println(">>> ldrsSize: " + ldrs.size());
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2f6f8cdf/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --cc
modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 0000000,dd8df35..432c6bf
mode 000000,100644..100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@@ -1,0 -1,1403 +1,1404 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.datastreamer;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.cluster.*;
+ import org.apache.ignite.events.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.cluster.*;
+ import org.apache.ignite.internal.managers.communication.*;
+ import org.apache.ignite.internal.managers.deployment.*;
+ import org.apache.ignite.internal.managers.eventstorage.*;
+ import org.apache.ignite.internal.processors.affinity.*;
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+ import org.apache.ignite.internal.processors.cache.version.*;
+ import org.apache.ignite.internal.processors.cacheobject.*;
+ import org.apache.ignite.internal.processors.dr.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.internal.util.future.*;
+ import org.apache.ignite.internal.util.lang.*;
+ import org.apache.ignite.internal.util.tostring.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.lang.*;
+ import org.jdk8.backport.*;
+ import org.jetbrains.annotations.*;
+
++import java.nio.*;
+ import java.util.*;
+ import java.util.Map.*;
+ import java.util.concurrent.*;
+ import java.util.concurrent.atomic.*;
+
+ import static org.apache.ignite.events.EventType.*;
+ import static org.apache.ignite.internal.GridTopic.*;
+ import static
org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+
+ /**
+ * Data streamer implementation.
+ */
+ @SuppressWarnings("unchecked")
+ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>,
Delayed {
+ /** Isolated updater. */
+ private static final Updater ISOLATED_UPDATER = new IsolatedUpdater();
+
+ /** Cache updater. */
+ private Updater<K, V> updater = ISOLATED_UPDATER;
+
+ /** */
- private byte[] updaterBytes;
++ private ByteBuffer updaterBytes;
+
+ /** Max remap count before issuing an error. */
+ private static final int DFLT_MAX_REMAP_CNT = 32;
+
+ /** Log reference. */
+ private static final AtomicReference<IgniteLogger> logRef = new
AtomicReference<>();
+
+ /** Logger. */
+ private static IgniteLogger log;
+
+ /** Cache name ({@code null} for default cache). */
+ private final String cacheName;
+
+
+ /** Per-node buffer size. */
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ private int bufSize = DFLT_PER_NODE_BUFFER_SIZE;
+
+ /** */
+ private int parallelOps = DFLT_MAX_PARALLEL_OPS;
+
+ /** */
+ private long autoFlushFreq;
+
+ /** Mapping. */
+ @GridToStringInclude
+ private ConcurrentMap<UUID, Buffer> bufMappings = new
ConcurrentHashMap8<>();
+
+ /** Discovery listener. */
+ private final GridLocalEventListener discoLsnr;
+
+ /** Context. */
+ private final GridKernalContext ctx;
+
+ /** */
+ private final IgniteCacheObjectProcessor cacheObjProc;
+
+ /** */
+ private final CacheObjectContext cacheObjCtx;
+
+ /** Communication topic for responses. */
+ private final Object topic;
+
+ /** */
- private byte[] topicBytes;
++ private ByteBuffer topicBytes;
+
+ /** {@code True} if data loader has been cancelled. */
+ private volatile boolean cancelled;
+
+ /** Active futures of this data loader. */
+ @GridToStringInclude
+ private final Collection<IgniteInternalFuture<?>> activeFuts = new
GridConcurrentHashSet<>();
+
+ /** Closure to remove from active futures. */
+ @GridToStringExclude
+ private final IgniteInClosure<IgniteInternalFuture<?>> rmvActiveFut = new
IgniteInClosure<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> t) {
+ boolean rmv = activeFuts.remove(t);
+
+ assert rmv;
+ }
+ };
+
+ /** Job peer deploy aware. */
+ private volatile GridPeerDeployAware jobPda;
+
+ /** Deployment class. */
+ private Class<?> depCls;
+
+ /** Future to track loading finish. */
+ private final GridFutureAdapter<?> fut;
+
+ /** Public API future to track loading finish. */
+ private final IgniteFuture<?> publicFut;
+
+ /** Busy lock. */
+ private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+
+ /** Closed flag. */
+ private final AtomicBoolean closed = new AtomicBoolean();
+
+ /** */
+ private volatile long lastFlushTime = U.currentTimeMillis();
+
+ /** */
+ private final DelayQueue<DataStreamerImpl<K, V>> flushQ;
+
+ /** */
+ private boolean skipStore;
+
+ /** */
+ private int maxRemapCnt = DFLT_MAX_REMAP_CNT;
+
+ /** Whether a warning at {@link DataStreamerImpl#allowOverwrite()}
printed */
+ private static boolean isWarningPrinted;
+
+ /**
+ * @param ctx Grid kernal context.
+ * @param cacheName Cache name.
+ * @param flushQ Flush queue.
+ */
+ public DataStreamerImpl(
+ final GridKernalContext ctx,
+ @Nullable final String cacheName,
+ DelayQueue<DataStreamerImpl<K, V>> flushQ
+ ) {
+ assert ctx != null;
+
+ this.ctx = ctx;
+ this.cacheObjProc = ctx.cacheObjects();
+
+ if (log == null)
+ log = U.logger(ctx, logRef, DataStreamerImpl.class);
+
+ ClusterNode node =
F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes());
+
+ if (node == null)
+ throw new IllegalStateException("Cache doesn't exist: " +
cacheName);
+
+ this.cacheObjCtx = ctx.cacheObjects().contextForCache(node,
cacheName);
+ this.cacheName = cacheName;
+ this.flushQ = flushQ;
+
+ discoLsnr = new GridLocalEventListener() {
+ @Override public void onEvent(Event evt) {
+ assert evt.type() == EVT_NODE_FAILED || evt.type() ==
EVT_NODE_LEFT;
+
+ DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
+
+ UUID id = discoEvt.eventNode().id();
+
+ // Remap regular mappings.
+ final Buffer buf = bufMappings.remove(id);
+
+ if (buf != null) {
+ // Only async notification is possible since
+ // discovery thread may be trapped otherwise.
+ ctx.closure().callLocalSafe(
+ new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ buf.onNodeLeft();
+
+ return null;
+ }
+ },
+ true /* system pool */
+ );
+ }
+ }
+ };
+
+ ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_FAILED,
EVT_NODE_LEFT);
+
+ // Generate unique topic for this loader.
+ topic =
TOPIC_DATASTREAM.topic(IgniteUuid.fromUuid(ctx.localNodeId()));
+
+ ctx.io().addMessageListener(topic, new GridMessageListener() {
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ assert msg instanceof DataStreamerResponse;
+
+ DataStreamerResponse res = (DataStreamerResponse)msg;
+
+ if (log.isDebugEnabled())
+ log.debug("Received data load response: " + res);
+
+ Buffer buf = bufMappings.get(nodeId);
+
+ if (buf != null)
+ buf.onResponse(res);
+
+ else if (log.isDebugEnabled())
+ log.debug("Ignoring response since node has left
[nodeId=" + nodeId + ", ");
+ }
+ });
+
+ if (log.isDebugEnabled())
+ log.debug("Added response listener within topic: " + topic);
+
+ fut = new DataStreamerFuture(this);
+
+ publicFut = new IgniteFutureImpl<>(fut);
+ }
+
+ /**
+ * @return Cache object context.
+ */
+ public CacheObjectContext cacheObjectContext() {
+ return cacheObjCtx;
+ }
+
+ /**
+ * Enters busy lock.
+ */
+ private void enterBusy() {
+ if (!busyLock.enterBusy())
+ throw new IllegalStateException("Data streamer has been closed.");
+ }
+
+ /**
+ * Leaves busy lock.
+ */
+ private void leaveBusy() {
+ busyLock.leaveBusy();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<?> future() {
+ return publicFut;
+ }
+
+ /**
+ * @return Internal future.
+ */
+ public IgniteInternalFuture<?> internalFuture() {
+ return fut;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void deployClass(Class<?> depCls) {
+ this.depCls = depCls;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void updater(Updater<K, V> updater) {
+ A.notNull(updater, "updater");
+
+ this.updater = updater;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean allowOverwrite() {
+ return updater != ISOLATED_UPDATER;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void allowOverwrite(boolean allow) {
+ if (allow == allowOverwrite())
+ return;
+
+ ClusterNode node =
F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes());
+
+ if (node == null)
+ throw new IgniteException("Failed to get node for cache: " +
cacheName);
+
+ updater = allow ? DataStreamerCacheUpdaters.<K, V>individual() :
ISOLATED_UPDATER;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean skipStore() {
+ return skipStore;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void skipStore(boolean skipStore) {
+ this.skipStore = skipStore;
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public String cacheName() {
+ return cacheName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int perNodeBufferSize() {
+ return bufSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void perNodeBufferSize(int bufSize) {
+ A.ensure(bufSize > 0, "bufSize > 0");
+
+ this.bufSize = bufSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int perNodeParallelOperations() {
+ return parallelOps;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void perNodeParallelOperations(int parallelOps) {
+ this.parallelOps = parallelOps;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long autoFlushFrequency() {
+ return autoFlushFreq;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void autoFlushFrequency(long autoFlushFreq) {
+ A.ensure(autoFlushFreq >= 0, "autoFlushFreq >= 0");
+
+ long old = this.autoFlushFreq;
+
+ if (autoFlushFreq != old) {
+ this.autoFlushFreq = autoFlushFreq;
+
+ if (autoFlushFreq != 0 && old == 0)
+ flushQ.add(this);
+ else if (autoFlushFreq == 0)
+ flushQ.remove(this);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<?> addData(Map<K, V> entries) throws
IllegalStateException {
+ A.notNull(entries, "entries");
+
+ return addData(entries.entrySet());
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<?> addData(Collection<? extends
Map.Entry<K, V>> entries) {
+ A.notEmpty(entries, "entries");
+
+ enterBusy();
+
+ try {
+ GridFutureAdapter<Object> resFut = new GridFutureAdapter<>();
+
+ resFut.listen(rmvActiveFut);
+
+ activeFuts.add(resFut);
+
+ Collection<KeyCacheObject> keys = null;
+
+ if (entries.size() > 1) {
+ keys = new GridConcurrentHashSet<>(entries.size(),
U.capacity(entries.size()), 1);
+
+ for (Map.Entry<K, V> entry : entries)
+ keys.add(cacheObjProc.toCacheKeyObject(cacheObjCtx,
entry.getKey(), true));
+ }
+
+ Collection<? extends DataStreamerEntry> entries0 =
F.viewReadOnly(entries, new C1<Entry<K, V>, DataStreamerEntry>() {
+ @Override public DataStreamerEntry apply(Entry<K, V> e) {
+ KeyCacheObject key =
cacheObjProc.toCacheKeyObject(cacheObjCtx, e.getKey(), true);
+ CacheObject val = cacheObjProc.toCacheObject(cacheObjCtx,
e.getValue(), true);
+
+ return new DataStreamerEntry(key, val);
+ }
+ });
+
+ load0(entries0, resFut, keys, 0);
+
+ return new IgniteFutureImpl<>(resFut);
+ }
+ catch (IgniteException e) {
+ return new IgniteFinishedFutureImpl<>(e);
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /**
+ * @param key Key.
+ * @param val Value.
+ * @return Future.
+ */
+ public IgniteFuture<?> addDataInternal(KeyCacheObject key, CacheObject
val) {
+ return addDataInternal(Collections.singleton(new
DataStreamerEntry(key, val)));
+ }
+
+ /**
+ * @param key Key.
+ * @return Future.
+ */
+ public IgniteFuture<?> removeDataInternal(KeyCacheObject key) {
+ return addDataInternal(Collections.singleton(new
DataStreamerEntry(key, null)));
+ }
+
+ /**
+ * @param entries Entries.
+ * @return Future.
+ */
+ public IgniteFuture<?> addDataInternal(Collection<? extends
DataStreamerEntry> entries) {
+ enterBusy();
+
+ GridFutureAdapter<Object> resFut = new GridFutureAdapter<>();
+
+ try {
+ resFut.listen(rmvActiveFut);
+
+ activeFuts.add(resFut);
+
+ Collection<KeyCacheObject> keys = null;
+
+ if (entries.size() > 1) {
+ keys = new GridConcurrentHashSet<>(entries.size(),
U.capacity(entries.size()), 1);
+
+ for (DataStreamerEntry entry : entries)
+ keys.add(entry.getKey());
+ }
+
+ load0(entries, resFut, keys, 0);
+
+ return new IgniteFutureImpl<>(resFut);
+ }
+ catch (Throwable e) {
+ resFut.onDone(e);
+
+ if (e instanceof Error)
+ throw e;
+
+ return new IgniteFinishedFutureImpl<>(e);
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<?> addData(Map.Entry<K, V> entry) {
+ A.notNull(entry, "entry");
+
+ return addData(F.asList(entry));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<?> addData(K key, V val) {
+ A.notNull(key, "key");
+
+ KeyCacheObject key0 = cacheObjProc.toCacheKeyObject(cacheObjCtx, key,
true);
+ CacheObject val0 = cacheObjProc.toCacheObject(cacheObjCtx, val, true);
+
+ return addDataInternal(Collections.singleton(new
DataStreamerEntry(key0, val0)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<?> removeData(K key) {
+ return addData(key, null);
+ }
+
+ /**
+ * @param entries Entries.
+ * @param resFut Result future.
+ * @param activeKeys Active keys.
+ * @param remaps Remaps count.
+ */
+ private void load0(
+ Collection<? extends DataStreamerEntry> entries,
+ final GridFutureAdapter<Object> resFut,
+ @Nullable final Collection<KeyCacheObject> activeKeys,
+ final int remaps
+ ) {
+ assert entries != null;
+
+ if (!isWarningPrinted) {
+ synchronized (this) {
+ if (!allowOverwrite() && !isWarningPrinted) {
+ U.warn(log, "Data streamer will not overwrite existing
cache entries for better performance " +
+ "(to change, set allowOverwrite to true)");
+ }
+
+ isWarningPrinted = true;
+ }
+ }
+
+ Map<ClusterNode, Collection<DataStreamerEntry>> mappings = new
HashMap<>();
+
+ boolean initPda = ctx.deploy().enabled() && jobPda == null;
+
+ for (DataStreamerEntry entry : entries) {
+ List<ClusterNode> nodes;
+
+ try {
+ KeyCacheObject key = entry.getKey();
+
+ assert key != null;
+
+ if (initPda) {
+ jobPda = new DataStreamerPda(key.value(cacheObjCtx,
false),
+ entry.getValue() != null ?
entry.getValue().value(cacheObjCtx, false) : null,
+ updater);
+
+ initPda = false;
+ }
+
+ nodes = nodes(key);
+ }
+ catch (IgniteCheckedException e) {
+ resFut.onDone(e);
+
+ return;
+ }
+
+ if (F.isEmpty(nodes)) {
+ resFut.onDone(new ClusterTopologyException("Failed to map key
to node " +
+ "(no nodes with cache found in topology) [infos=" +
entries.size() +
+ ", cacheName=" + cacheName + ']'));
+
+ return;
+ }
+
+ for (ClusterNode node : nodes) {
+ Collection<DataStreamerEntry> col = mappings.get(node);
+
+ if (col == null)
+ mappings.put(node, col = new ArrayList<>());
+
+ col.add(entry);
+ }
+ }
+
+ for (final Map.Entry<ClusterNode, Collection<DataStreamerEntry>> e :
mappings.entrySet()) {
+ final UUID nodeId = e.getKey().id();
+
+ Buffer buf = bufMappings.get(nodeId);
+
+ if (buf == null) {
+ Buffer old = bufMappings.putIfAbsent(nodeId, buf = new
Buffer(e.getKey()));
+
+ if (old != null)
+ buf = old;
+ }
+
+ final Collection<DataStreamerEntry> entriesForNode = e.getValue();
+
+ IgniteInClosure<IgniteInternalFuture<?>> lsnr = new
IgniteInClosure<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> t) {
+ try {
+ t.get();
+
+ if (activeKeys != null) {
+ for (DataStreamerEntry e : entriesForNode)
+ activeKeys.remove(e.getKey());
+
+ if (activeKeys.isEmpty())
+ resFut.onDone();
+ }
+ else {
+ assert entriesForNode.size() == 1;
+
+ // That has been a single key,
+ // so complete result future right away.
+ resFut.onDone();
+ }
+ }
+ catch (IgniteCheckedException e1) {
+ if (log.isDebugEnabled())
+ log.debug("Future finished with error [nodeId=" +
nodeId + ", err=" + e1 + ']');
+
+ if (cancelled) {
+ resFut.onDone(new IgniteCheckedException("Data
streamer has been cancelled: " +
+ DataStreamerImpl.this, e1));
+ }
+ else if (remaps + 1 > maxRemapCnt) {
+ resFut.onDone(new IgniteCheckedException("Failed
to finish operation (too many remaps): "
+ + remaps), e1);
+ }
+ else
+ load0(entriesForNode, resFut, activeKeys, remaps
+ 1);
+ }
+ }
+ };
+
+ GridFutureAdapter<?> f;
+
+ try {
+ f = buf.update(entriesForNode, lsnr);
+ }
+ catch (IgniteInterruptedCheckedException e1) {
+ resFut.onDone(e1);
+
+ return;
+ }
+
+ if (ctx.discovery().node(nodeId) == null) {
+ if (bufMappings.remove(nodeId, buf))
+ buf.onNodeLeft();
+
+ if (f != null)
+ f.onDone(new ClusterTopologyCheckedException("Failed to
wait for request completion " +
+ "(node has left): " + nodeId));
+ }
+ }
+ }
+
+ /**
+ * @param key Key to map.
+ * @return Nodes to send requests to.
+ * @throws IgniteCheckedException If failed.
+ */
+ private List<ClusterNode> nodes(KeyCacheObject key) throws
IgniteCheckedException {
+ GridAffinityProcessor aff = ctx.affinity();
+
+ return !allowOverwrite() ? aff.mapKeyToPrimaryAndBackups(cacheName,
key) :
+ Collections.singletonList(aff.mapKeyToNode(cacheName, key));
+ }
+
+ /**
+ * Performs flush.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ private void doFlush() throws IgniteCheckedException {
+ lastFlushTime = U.currentTimeMillis();
+
+ List<IgniteInternalFuture> activeFuts0 = null;
+
+ int doneCnt = 0;
+
+ for (IgniteInternalFuture<?> f : activeFuts) {
+ if (!f.isDone()) {
+ if (activeFuts0 == null)
+ activeFuts0 = new ArrayList<>((int)(activeFuts.size() *
1.2));
+
+ activeFuts0.add(f);
+ }
+ else {
+ f.get();
+
+ doneCnt++;
+ }
+ }
+
+ if (activeFuts0 == null || activeFuts0.isEmpty())
+ return;
+
+ while (true) {
+ Queue<IgniteInternalFuture<?>> q = null;
+
+ for (Buffer buf : bufMappings.values()) {
+ IgniteInternalFuture<?> flushFut = buf.flush();
+
+ if (flushFut != null) {
+ if (q == null)
+ q = new ArrayDeque<>(bufMappings.size() * 2);
+
+ q.add(flushFut);
+ }
+ }
+
+ if (q != null) {
+ assert !q.isEmpty();
+
+ boolean err = false;
+
+ for (IgniteInternalFuture fut = q.poll(); fut != null; fut =
q.poll()) {
+ try {
+ fut.get();
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to flush buffer: " + e);
+
+ err = true;
+ }
+ }
+
+ if (err)
+ // Remaps needed - flush buffers.
+ continue;
+ }
+
+ doneCnt = 0;
+
+ for (int i = 0; i < activeFuts0.size(); i++) {
+ IgniteInternalFuture f = activeFuts0.get(i);
+
+ if (f == null)
+ doneCnt++;
+ else if (f.isDone()) {
+ f.get();
+
+ doneCnt++;
+
+ activeFuts0.set(i, null);
+ }
+ else
+ break;
+ }
+
+ if (doneCnt == activeFuts0.size())
+ return;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("ForLoopReplaceableByForEach")
+ @Override public void flush() throws IgniteException {
+ enterBusy();
+
+ try {
+ doFlush();
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /**
+ * Flushes every internal buffer if buffer was flushed before passed in
+ * threshold.
+ * <p>
+ * Does not wait for result and does not fail on errors assuming that
this method
+ * should be called periodically.
+ */
+ @Override public void tryFlush() throws IgniteInterruptedException {
+ if (!busyLock.enterBusy())
+ return;
+
+ try {
+ for (Buffer buf : bufMappings.values())
+ buf.flush();
+
+ lastFlushTime = U.currentTimeMillis();
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
+ /**
+ * @param cancel {@code True} to close with cancellation.
+ * @throws IgniteException If failed.
+ */
+ @Override public void close(boolean cancel) throws IgniteException {
+ try {
+ closeEx(cancel);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /**
+ * @param cancel {@code True} to close with cancellation.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void closeEx(boolean cancel) throws IgniteCheckedException {
+ if (!closed.compareAndSet(false, true))
+ return;
+
+ busyLock.block();
+
+ if (log.isDebugEnabled())
+ log.debug("Closing data streamer [ldr=" + this + ", cancel=" +
cancel + ']');
+
+ IgniteCheckedException e = null;
+
+ try {
+ // Assuming that no methods are called on this loader after this
method is called.
+ if (cancel) {
+ cancelled = true;
+
+ for (Buffer buf : bufMappings.values())
+ buf.cancelAll();
+ }
+ else
+ doFlush();
+
+ ctx.event().removeLocalEventListener(discoLsnr);
+
+ ctx.io().removeMessageListener(topic);
+ }
+ catch (IgniteCheckedException e0) {
+ e = e0;
+ }
+
+ fut.onDone(null, e);
+
+ if (e != null)
+ throw e;
+ }
+
+ /**
+ * @return {@code true} If the loader is closed.
+ */
+ boolean isClosed() {
+ return fut.isDone();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IgniteException {
+ close(false);
+ }
+
+ /**
+ * @return Max remap count.
+ */
+ public int maxRemapCount() {
+ return maxRemapCnt;
+ }
+
+ /**
+ * @param maxRemapCnt New max remap count.
+ */
+ public void maxRemapCount(int maxRemapCnt) {
+ this.maxRemapCnt = maxRemapCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DataStreamerImpl.class, this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getDelay(TimeUnit unit) {
+ return unit.convert(nextFlushTime() - U.currentTimeMillis(),
TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * @return Next flush time.
+ */
+ private long nextFlushTime() {
+ return lastFlushTime + autoFlushFreq;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compareTo(Delayed o) {
+ return nextFlushTime() > ((DataStreamerImpl)o).nextFlushTime() ? 1 :
-1;
+ }
+
+ /**
+ *
+ */
+ private class Buffer {
+ /** Node. */
+ private final ClusterNode node;
+
+ /** Active futures. */
+ private final Collection<IgniteInternalFuture<Object>> locFuts;
+
+ /** Buffered entries. */
+ private List<DataStreamerEntry> entries;
+
+ /** */
+ @GridToStringExclude
+ private GridFutureAdapter<Object> curFut;
+
+ /** Local node flag. */
+ private final boolean isLocNode;
+
+ /** ID generator. */
+ private final AtomicLong idGen = new AtomicLong();
+
+ /** Active futures. */
+ private final ConcurrentMap<Long, GridFutureAdapter<Object>> reqs;
+
+ /** */
+ private final Semaphore sem;
+
+ /** Closure to signal on task finish. */
+ @GridToStringExclude
+ private final IgniteInClosure<IgniteInternalFuture<Object>> signalC =
new IgniteInClosure<IgniteInternalFuture<Object>>() {
+ @Override public void apply(IgniteInternalFuture<Object> t) {
+ signalTaskFinished(t);
+ }
+ };
+
+ /**
+ * @param node Node.
+ */
+ Buffer(ClusterNode node) {
+ assert node != null;
+
+ this.node = node;
+
+ locFuts = new GridConcurrentHashSet<>();
+ reqs = new ConcurrentHashMap8<>();
+
+ // Cache local node flag.
+ isLocNode = node.equals(ctx.discovery().localNode());
+
+ entries = newEntries();
+ curFut = new GridFutureAdapter<>();
+ curFut.listen(signalC);
+
+ sem = new Semaphore(parallelOps);
+ }
+
+ /**
+ * @param newEntries Infos.
+ * @param lsnr Listener for the operation future.
+ * @throws IgniteInterruptedCheckedException If failed.
+ * @return Future for operation.
+ */
+ @Nullable GridFutureAdapter<?> update(Iterable<DataStreamerEntry>
newEntries,
+ IgniteInClosure<IgniteInternalFuture<?>> lsnr) throws
IgniteInterruptedCheckedException {
+ List<DataStreamerEntry> entries0 = null;
+ GridFutureAdapter<Object> curFut0;
+
+ synchronized (this) {
+ curFut0 = curFut;
+
+ curFut0.listen(lsnr);
+
+ for (DataStreamerEntry entry : newEntries)
+ entries.add(entry);
+
+ if (entries.size() >= bufSize) {
+ entries0 = entries;
+
+ entries = newEntries();
+ curFut = new GridFutureAdapter<>();
+ curFut.listen(signalC);
+ }
+ }
+
+ if (entries0 != null) {
+ submit(entries0, curFut0);
+
+ if (cancelled)
+ curFut0.onDone(new IgniteCheckedException("Data streamer
has been cancelled: " + DataStreamerImpl.this));
+ }
+
+ return curFut0;
+ }
+
+ /**
+ * @return Fresh collection with some space for outgrowth.
+ */
+ private List<DataStreamerEntry> newEntries() {
+ return new ArrayList<>((int)(bufSize * 1.2));
+ }
+
+ /**
+ * @return Future if any submitted.
+ *
+ * @throws IgniteInterruptedCheckedException If thread has been
interrupted.
+ */
+ @Nullable IgniteInternalFuture<?> flush() throws
IgniteInterruptedCheckedException {
+ List<DataStreamerEntry> entries0 = null;
+ GridFutureAdapter<Object> curFut0 = null;
+
+ synchronized (this) {
+ if (!entries.isEmpty()) {
+ entries0 = entries;
+ curFut0 = curFut;
+
+ entries = newEntries();
+ curFut = new GridFutureAdapter<>();
+ curFut.listen(signalC);
+ }
+ }
+
+ if (entries0 != null)
+ submit(entries0, curFut0);
+
+ // Create compound future for this flush.
+ GridCompoundFuture<Object, Object> res = null;
+
+ for (IgniteInternalFuture<Object> f : locFuts) {
+ if (res == null)
+ res = new GridCompoundFuture<>();
+
+ res.add(f);
+ }
+
+ for (IgniteInternalFuture<Object> f : reqs.values()) {
+ if (res == null)
+ res = new GridCompoundFuture<>();
+
+ res.add(f);
+ }
+
+ if (res != null)
+ res.markInitialized();
+
+ return res;
+ }
+
+ /**
+ * Increments active tasks count.
+ *
+ * @throws IgniteInterruptedCheckedException If thread has been
interrupted.
+ */
+ private void incrementActiveTasks() throws
IgniteInterruptedCheckedException {
+ U.acquire(sem);
+ }
+
+ /**
+ * @param f Future that finished.
+ */
+ private void signalTaskFinished(IgniteInternalFuture<Object> f) {
+ assert f != null;
+
+ sem.release();
+ }
+
+ /**
+ * @param entries Entries to submit.
+ * @param curFut Current future.
+ * @throws IgniteInterruptedCheckedException If interrupted.
+ */
+ private void submit(final Collection<DataStreamerEntry> entries,
final GridFutureAdapter<Object> curFut)
+ throws IgniteInterruptedCheckedException {
+ assert entries != null;
+ assert !entries.isEmpty();
+ assert curFut != null;
+
+ incrementActiveTasks();
+
+ IgniteInternalFuture<Object> fut;
+
+ if (isLocNode) {
+ fut = ctx.closure().callLocalSafe(
+ new DataStreamerUpdateJob(ctx, log, cacheName, entries,
false, skipStore, updater), false);
+
+ locFuts.add(fut);
+
+ fut.listen(new
IgniteInClosure<IgniteInternalFuture<Object>>() {
+ @Override public void apply(IgniteInternalFuture<Object>
t) {
+ try {
+ boolean rmv = locFuts.remove(t);
+
+ assert rmv;
+
+ curFut.onDone(t.get());
+ }
+ catch (IgniteCheckedException e) {
+ curFut.onDone(e);
+ }
+ }
+ });
+ }
+ else {
+ try {
+ for (DataStreamerEntry e : entries) {
+ e.getKey().prepareMarshal(cacheObjCtx);
+
+ CacheObject val = e.getValue();
+
+ if (val != null)
+ val.prepareMarshal(cacheObjCtx);
+ }
+
+ if (updaterBytes == null) {
+ assert updater != null;
+
+ updaterBytes =
ctx.config().getMarshaller().marshal(updater);
+ }
+
+ if (topicBytes == null)
+ topicBytes =
ctx.config().getMarshaller().marshal(topic);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to marshal (request will not be
sent).", e);
+
+ return;
+ }
+
+ GridDeployment dep = null;
+ GridPeerDeployAware jobPda0 = null;
+
+ if (ctx.deploy().enabled()) {
+ try {
+ jobPda0 = jobPda;
+
+ assert jobPda0 != null;
+
+ dep = ctx.deploy().deploy(jobPda0.deployClass(),
jobPda0.classLoader());
+
+ GridCacheAdapter<Object, Object> cache =
ctx.cache().internalCache(cacheName);
+
+ if (cache != null)
+ cache.context().deploy().onEnter();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to deploy class (request will
not be sent): " + jobPda0.deployClass(), e);
+
+ return;
+ }
+
+ if (dep == null)
+ U.warn(log, "Failed to deploy class (request will be
sent): " + jobPda0.deployClass());
+ }
+
+ long reqId = idGen.incrementAndGet();
+
+ fut = curFut;
+
+ reqs.put(reqId, (GridFutureAdapter<Object>)fut);
+
+ DataStreamerRequest req = new DataStreamerRequest(
+ reqId,
+ topicBytes,
+ cacheName,
+ updaterBytes,
+ entries,
+ true,
+ skipStore,
+ dep != null ? dep.deployMode() : null,
+ dep != null ? jobPda0.deployClass().getName() : null,
+ dep != null ? dep.userVersion() : null,
+ dep != null ? dep.participants() : null,
+ dep != null ? dep.classLoaderId() : null,
+ dep == null);
+
+ try {
+ ctx.io().send(node, TOPIC_DATASTREAM, req, PUBLIC_POOL);
+
+ if (log.isDebugEnabled())
+ log.debug("Sent request to node [nodeId=" + node.id()
+ ", req=" + req + ']');
+ }
+ catch (IgniteCheckedException e) {
+ if (ctx.discovery().alive(node) &&
ctx.discovery().pingNode(node.id()))
+ ((GridFutureAdapter<Object>)fut).onDone(e);
+ else
+ ((GridFutureAdapter<Object>)fut).onDone(new
ClusterTopologyCheckedException("Failed to send " +
+ "request (node has left): " + node.id()));
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ void onNodeLeft() {
+ assert !isLocNode;
+ assert bufMappings.get(node.id()) != this;
+
+ if (log.isDebugEnabled())
+ log.debug("Forcibly completing futures (node has left): " +
node.id());
+
+ Exception e = new ClusterTopologyCheckedException("Failed to wait
for request completion " +
+ "(node has left): " + node.id());
+
+ for (GridFutureAdapter<Object> f : reqs.values())
+ f.onDone(e);
+
+ // Make sure to complete current future.
+ GridFutureAdapter<Object> curFut0;
+
+ synchronized (this) {
+ curFut0 = curFut;
+ }
+
+ curFut0.onDone(e);
+ }
+
+ /**
+ * @param res Response.
+ */
+ void onResponse(DataStreamerResponse res) {
+ if (log.isDebugEnabled())
+ log.debug("Received data load response: " + res);
+
+ GridFutureAdapter<?> f = reqs.remove(res.requestId());
+
+ if (f == null) {
+ if (log.isDebugEnabled())
+ log.debug("Future for request has not been found: " +
res.requestId());
+
+ return;
+ }
+
+ Throwable err = null;
+
- byte[] errBytes = res.errorBytes();
++ ByteBuffer errBytes = res.errorBytes();
+
+ if (errBytes != null) {
+ try {
+ GridPeerDeployAware jobPda0 = jobPda;
+
+ err = ctx.config().getMarshaller().unmarshal(
+ errBytes,
+ jobPda0 != null ? jobPda0.classLoader() :
U.gridClassLoader());
+ }
+ catch (IgniteCheckedException e) {
+ f.onDone(null, new IgniteCheckedException("Failed to
unmarshal response.", e));
+
+ return;
+ }
+ }
+
+ f.onDone(null, err);
+
+ if (log.isDebugEnabled())
+ log.debug("Finished future [fut=" + f + ", reqId=" +
res.requestId() + ", err=" + err + ']');
+ }
+
+ /**
+ *
+ */
+ void cancelAll() {
+ IgniteCheckedException err = new IgniteCheckedException("Data
streamer has been cancelled: " + DataStreamerImpl.this);
+
+ for (IgniteInternalFuture<?> f : locFuts) {
+ try {
+ f.cancel();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to cancel mini-future.", e);
+ }
+ }
+
+ for (GridFutureAdapter<?> f : reqs.values())
+ f.onDone(err);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ int size;
+
+ synchronized (this) {
+ size = entries.size();
+ }
+
+ return S.toString(Buffer.class, this,
+ "entriesCnt", size,
+ "locFutsSize", locFuts.size(),
+ "reqsSize", reqs.size());
+ }
+ }
+
+ /**
+ * Data streamer peer-deploy aware.
+ */
+ private class DataStreamerPda implements GridPeerDeployAware {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Deploy class. */
+ private Class<?> cls;
+
+ /** Class loader. */
+ private ClassLoader ldr;
+
+ /** Collection of objects to detect deploy class and class loader. */
+ private Collection<Object> objs;
+
+ /**
+ * Constructs data streamer peer-deploy aware.
+ *
+ * @param objs Collection of objects to detect deploy class and class
loader.
+ */
+ private DataStreamerPda(Object... objs) {
+ this.objs = Arrays.asList(objs);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Class<?> deployClass() {
+ if (cls == null) {
+ Class<?> cls0 = null;
+
+ if (depCls != null)
+ cls0 = depCls;
+ else {
+ for (Iterator<Object> it = objs.iterator(); (cls0 == null
|| U.isJdk(cls0)) && it.hasNext();) {
+ Object o = it.next();
+
+ if (o != null)
+ cls0 = U.detectClass(o);
+ }
+
+ if (cls0 == null || U.isJdk(cls0))
+ cls0 = DataStreamerImpl.class;
+ }
+
+ assert cls0 != null : "Failed to detect deploy class [objs="
+ objs + ']';
+
+ cls = cls0;
+ }
+
+ return cls;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClassLoader classLoader() {
+ if (ldr == null) {
+ ClassLoader ldr0 = deployClass().getClassLoader();
+
+ // Safety.
+ if (ldr0 == null)
+ ldr0 = U.gridClassLoader();
+
+ assert ldr0 != null : "Failed to detect classloader [objs=" +
objs + ']';
+
+ ldr = ldr0;
+ }
+
+ return ldr;
+ }
+ }
+
+ /**
+ * Isolated updater which only loads entry initial value.
+ */
+ private static class IsolatedUpdater implements Updater<KeyCacheObject,
CacheObject>,
+ DataStreamerCacheUpdaters.InternalUpdater {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public void update(IgniteCache<KeyCacheObject, CacheObject>
cache,
+ Collection<Map.Entry<KeyCacheObject, CacheObject>> entries) {
+ IgniteCacheProxy<KeyCacheObject, CacheObject> proxy =
(IgniteCacheProxy<KeyCacheObject, CacheObject>)cache;
+
+ GridCacheAdapter<KeyCacheObject, CacheObject> internalCache =
proxy.context().cache();
+
+ if (internalCache.isNear())
+ internalCache = internalCache.context().near().dht();
+
+ GridCacheContext cctx = internalCache.context();
+
+ long topVer = cctx.affinity().affinityTopologyVersion();
+
+ GridCacheVersion ver = cctx.versions().next(topVer);
+
+ for (Map.Entry<KeyCacheObject, CacheObject> e : entries) {
+ try {
+ e.getKey().finishUnmarshal(cctx.cacheObjectContext(),
cctx.deploy().globalLoader());
+
+ GridCacheEntryEx entry =
internalCache.entryEx(e.getKey(), topVer);
+
+ entry.unswap(true, false);
+
+ entry.initialValue(e.getValue(),
+ ver,
+ CU.TTL_ETERNAL,
+ CU.EXPIRE_TIME_ETERNAL,
+ false,
+ topVer,
+ GridDrType.DR_LOAD);
+
+ cctx.evicts().touch(entry, topVer);
+ }
+ catch (GridDhtInvalidPartitionException |
GridCacheEntryRemovedException ignored) {
+ // No-op.
+ }
+ catch (IgniteCheckedException ex) {
+ IgniteLogger log = cache.unwrap(Ignite.class).log();
+
+ U.error(log, "Failed to set initial value for cache
entry: " + e, ex);
+ }
+ }
+ }
+ }
+ }