Repository: ignite Updated Branches: refs/heads/ignite-2684 9b5dcfe3f -> 9e55974c7
IGNITE-2648: DataStreamer doesn't stream BinaryObjects created with BinaryBuilder Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9e55974c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9e55974c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9e55974c Branch: refs/heads/ignite-2684 Commit: 9e55974c76e75a91256b17531521d849972ee24c Parents: 9b5dcfe Author: Denis Magda <dma...@gridgain.com> Authored: Thu Feb 18 16:45:12 2016 +0300 Committer: Denis Magda <dma...@gridgain.com> Committed: Thu Feb 18 16:45:12 2016 +0300 ---------------------------------------------------------------------- .../datastreamer/DataStreamerImpl.java | 6 +-- .../GridDataStreamerImplSelfTest.java | 49 +++++++++++++++++--- 2 files changed, 46 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9e55974c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index 27eff0c..f832e4e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -628,7 +628,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed Map<ClusterNode, Collection<DataStreamerEntry>> mappings = new HashMap<>(); - boolean initPda = ctx.deploy().enabled() && jobPda == null; + boolean initPda = ctx.deploy().enabled() && cacheObjCtx.addDeploymentInfo() && jobPda == null; AffinityTopologyVersion topVer = ctx.cache().context().exchange().readyAffinityVersion(); @@ -1304,7 +1304,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed GridDeployment dep = null; GridPeerDeployAware jobPda0 = null; - if (ctx.deploy().enabled()) { + if (ctx.deploy().enabled() && cacheObjCtx.addDeploymentInfo()) { try { jobPda0 = jobPda; @@ -1619,4 +1619,4 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed } } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9e55974c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/datastreaming/GridDataStreamerImplSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/datastreaming/GridDataStreamerImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/datastreaming/GridDataStreamerImplSelfTest.java index 4caa5fb..a260e6f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/datastreaming/GridDataStreamerImplSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/datastreaming/GridDataStreamerImplSelfTest.java @@ -23,19 +23,20 @@ import java.util.Random; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.binary.BinaryObjectBuilder; +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.binary.BinaryReader; +import org.apache.ignite.binary.BinaryWriter; +import org.apache.ignite.binary.Binarylizable; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.internal.binary.BinaryMarshaller; -import org.apache.ignite.binary.BinaryObjectException; -import org.apache.ignite.binary.Binarylizable; -import org.apache.ignite.binary.BinaryObject; -import org.apache.ignite.binary.BinaryReader; -import org.apache.ignite.binary.BinaryWriter; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -237,6 +238,42 @@ public class GridDataStreamerImplSelfTest extends GridCommonAbstractTest { } /** + * Tries to propagate cache with binary objects created using the builder. + * + * @throws Exception If failed. + */ + public void testAddBinaryCreatedWithBuilder() throws Exception { + try { + binaries = true; + + startGrids(2); + + awaitPartitionMapExchange(); + + Ignite g0 = grid(0); + + IgniteDataStreamer<Integer, BinaryObject> dataLdr = g0.dataStreamer(null); + + for (int i = 0; i < 500; i++) { + BinaryObjectBuilder obj = g0.binary().builder("NoExistedClass"); + + obj.setField("id", i); + obj.setField("name", String.valueOf("name = " + i)); + + dataLdr.addData(i, obj.build()); + } + + dataLdr.close(false); + + assertEquals(500, g0.cache(null).size(CachePeekMode.ALL)); + assertEquals(500, grid(1).cache(null).size(CachePeekMode.ALL)); + } + finally { + G.stopAll(true); + } + } + + /** * Check that keys correctly destributed by nodes after data streamer. * * @param g Grid to check.