http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2RowRange.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2RowRange.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2RowRange.java new file mode 100644 index 0000000..bc8aa75 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2RowRange.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.twostep.msg; + +import java.nio.ByteBuffer; +import java.util.List; +import org.apache.ignite.internal.GridDirectCollection; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * Range of rows. + */ +public class GridH2RowRange implements Message { + /** */ + private static int FLAG_PARTIAL = 1; + + /** */ + private int rangeId; + + /** */ + @GridDirectCollection(Message.class) + @GridToStringInclude + private List<GridH2RowMessage> rows; + + /** */ + private byte flags; + + /** + * @param rangeId Range ID. + */ + public void rangeId(int rangeId) { + this.rangeId = rangeId; + } + + /** + * @return Range ID. + */ + public int rangeId() { + return rangeId; + } + + /** + * @param rows Rows. + */ + public void rows(List<GridH2RowMessage> rows) { + this.rows = rows; + } + + /** + * @return Rows. + */ + public List<GridH2RowMessage> rows() { + return rows; + } + + /** + * Sets that this is a partial range. + */ + public void setPartial() { + flags |= FLAG_PARTIAL; + } + + /** + * @return {@code true} If this is a partial range. + */ + public boolean isPartial() { + return (flags & FLAG_PARTIAL) == FLAG_PARTIAL; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeByte("flags", flags)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeInt("rangeId", rangeId)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeCollection("rows", rows, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + flags = reader.readByte("flags"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + rangeId = reader.readInt("rangeId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + rows = reader.readCollection("rows", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(GridH2RowRange.class); + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return -34; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridH2RowRange.class, this, "rowsSize", rows != null ? rows.size() : 0); + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2RowRangeBounds.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2RowRangeBounds.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2RowRangeBounds.java new file mode 100644 index 0000000..e32e449 --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2RowRangeBounds.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.twostep.msg; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * Bounds of row range. + */ +public class GridH2RowRangeBounds implements Message { + /** */ + private int rangeId; + + /** */ + private GridH2RowMessage first; + + /** */ + private GridH2RowMessage last; + + /** + * @param rangeId Range ID. + * @param first First. + * @param last Last. + * @return Range bounds. + */ + public static GridH2RowRangeBounds rangeBounds(int rangeId, GridH2RowMessage first, GridH2RowMessage last) { + GridH2RowRangeBounds res = new GridH2RowRangeBounds(); + + res.rangeId(rangeId); + res.first(first); + res.last(last); + + return res; + } + + /** + * @param rangeId Range ID. + */ + public void rangeId(int rangeId) { + this.rangeId = rangeId; + } + + /** + * @return Range ID. + */ + public int rangeId() { + return rangeId; + } + + /** + * @param first First. + */ + public void first(GridH2RowMessage first) { + this.first = first; + } + + /** + * @return First. + */ + public GridH2RowMessage first() { + return first; + } + + /** + * @param last Last. + */ + public void last(GridH2RowMessage last) { + this.last = last; + } + + /** + * @return Last. + */ + public GridH2RowMessage last() { + return last; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeMessage("first", first)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeMessage("last", last)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeInt("rangeId", rangeId)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + first = reader.readMessage("first"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + last = reader.readMessage("last"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + rangeId = reader.readInt("rangeId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(GridH2RowRangeBounds.class); + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return -35; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridH2RowRangeBounds.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Short.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Short.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Short.java index 841e01e..ebeca9d 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Short.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Short.java @@ -99,7 +99,7 @@ public class GridH2Short extends GridH2ValueMessage { } - return true; + return reader.afterMessageRead(GridH2Short.class); } /** {@inheritDoc} */ @@ -111,4 +111,9 @@ public class GridH2Short extends GridH2ValueMessage { @Override public byte fieldsCount() { return 1; } -} \ No newline at end of file + + /** {@inheritDoc} */ + @Override public String toString() { + return String.valueOf(x); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2String.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2String.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2String.java index 50b4d58..f2f9fdc 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2String.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2String.java @@ -101,7 +101,7 @@ public class GridH2String extends GridH2ValueMessage { } - return true; + return reader.afterMessageRead(GridH2String.class); } /** {@inheritDoc} */ @@ -113,4 +113,9 @@ public class GridH2String extends GridH2ValueMessage { @Override public byte fieldsCount() { return 1; } -} \ No newline at end of file + + /** {@inheritDoc} */ + @Override public String toString() { + return x; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Time.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Time.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Time.java index 1c6c7ae..172d695 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Time.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Time.java @@ -102,7 +102,7 @@ public class GridH2Time extends GridH2ValueMessage { } - return true; + return reader.afterMessageRead(GridH2Time.class); } /** {@inheritDoc} */ @@ -114,4 +114,9 @@ public class GridH2Time extends GridH2ValueMessage { @Override public byte fieldsCount() { return 1; } -} \ No newline at end of file + + /** {@inheritDoc} */ + @Override public String toString() { + return String.valueOf(nanos); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Timestamp.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Timestamp.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Timestamp.java index ccdba92..b020799 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Timestamp.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Timestamp.java @@ -50,7 +50,7 @@ public class GridH2Timestamp extends GridH2ValueMessage { ValueTimestamp t = (ValueTimestamp)val; date = t.getDateValue(); - nanos = t.getNanos(); + nanos = t.getTimeNanos(); } /** {@inheritDoc} */ @@ -119,7 +119,7 @@ public class GridH2Timestamp extends GridH2ValueMessage { } - return true; + return reader.afterMessageRead(GridH2Timestamp.class); } /** {@inheritDoc} */ @@ -131,4 +131,9 @@ public class GridH2Timestamp extends GridH2ValueMessage { @Override public byte fieldsCount() { return 2; } -} \ No newline at end of file + + /** {@inheritDoc} */ + @Override public String toString() { + return date + "_" + nanos; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Uuid.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Uuid.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Uuid.java index fd14d90..fa9360b 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Uuid.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Uuid.java @@ -119,7 +119,7 @@ public class GridH2Uuid extends GridH2ValueMessage { } - return true; + return reader.afterMessageRead(GridH2Uuid.class); } /** {@inheritDoc} */ @@ -131,4 +131,9 @@ public class GridH2Uuid extends GridH2ValueMessage { @Override public byte fieldsCount() { return 2; } -} \ No newline at end of file + + /** {@inheritDoc} */ + @Override public String toString() { + return ValueUuid.get(high, low).getString(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessage.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessage.java index d528c47..18f8880 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessage.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessage.java @@ -52,4 +52,4 @@ public abstract class GridH2ValueMessage implements Message { @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { return true; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java index d414e5a..aa84e4b 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java @@ -90,6 +90,24 @@ public class GridH2ValueMessageFactory implements MessageFactory { case -22: return new GridH2CacheObject(); + + case -30: + return new GridH2IndexRangeRequest(); + + case -31: + return new GridH2IndexRangeResponse(); + + case -32: + return new GridH2RowMessage(); + + case -33: + return new GridH2QueryRequest(); + + case -34: + return new GridH2RowRange(); + + case -35: + return new GridH2RowRangeBounds(); } return null; @@ -118,7 +136,7 @@ public class GridH2ValueMessageFactory implements MessageFactory { * @return Filled array. * @throws IgniteCheckedException If failed. */ - public static Value[] fillArray(Iterator<Message> src, Value[] dst, GridKernalContext ctx) + public static Value[] fillArray(Iterator<? extends Message> src, Value[] dst, GridKernalContext ctx) throws IgniteCheckedException { for (int i = 0; i < dst.length; i++) { Message msg = src.next(); @@ -134,7 +152,7 @@ public class GridH2ValueMessageFactory implements MessageFactory { * @return Message. * @throws IgniteCheckedException If failed. */ - public static Message toMessage(Value v) throws IgniteCheckedException { + public static GridH2ValueMessage toMessage(Value v) throws IgniteCheckedException { switch (v.getType()) { case Value.NULL: return GridH2Null.INSTANCE; http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReduceQueryMultithreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReduceQueryMultithreadedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReduceQueryMultithreadedSelfTest.java deleted file mode 100644 index 170ab65..0000000 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReduceQueryMultithreadedSelfTest.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import java.util.Collection; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.processors.cache.query.CacheQuery; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.R1; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; - -import static org.apache.ignite.cache.CacheMode.PARTITIONED; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; - -/** - * Multithreaded reduce query tests with lots of data. - */ -public class GridCacheReduceQueryMultithreadedSelfTest extends GridCacheAbstractSelfTest { - /** */ - private static final int GRID_CNT = 5; - - /** */ - private static final int TEST_TIMEOUT = 2 * 60 * 1000; - - /** */ - private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** {@inheritDoc} */ - @Override protected int gridCount() { - return GRID_CNT; - } - - /** {@inheritDoc} */ - @Override protected long getTestTimeout() { - return TEST_TIMEOUT; - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - TcpDiscoverySpi disco = new TcpDiscoverySpi(); - - disco.setIpFinder(ipFinder); - - cfg.setDiscoverySpi(disco); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { - CacheConfiguration<?,?> cfg = super.cacheConfiguration(gridName); - - cfg.setCacheMode(PARTITIONED); - cfg.setBackups(1); - cfg.setWriteSynchronizationMode(FULL_SYNC); - - cfg.setIndexedTypes( - String.class, Integer.class - ); - - return cfg; - } - - /** - * @throws Exception In case of error. - */ - public void testReduceQuery() throws Exception { - final int keyCnt = 5000; - final int logFreq = 500; - - final GridCacheAdapter<String, Integer> c = internalCache(jcache()); - - final CountDownLatch startLatch = new CountDownLatch(1); - - IgniteInternalFuture<?> fut1 = multithreadedAsync(new Callable() { - @Override public Object call() throws Exception { - for (int i = 1; i < keyCnt; i++) { - c.getAndPut(String.valueOf(i), i); - - startLatch.countDown(); - - if (i % logFreq == 0) - info("Stored entries: " + i); - } - - return null; - } - }, 1); - - // Create query. - final CacheQuery<List<?>> sumQry = c.context().queries(). - createSqlFieldsQuery("select _val from Integer", false).timeout(TEST_TIMEOUT); - - final R1<List<?>, Integer> rmtRdc = new R1<List<?>, Integer>() { - /** */ - private AtomicInteger sum = new AtomicInteger(); - - @Override public boolean collect(List<?> e) { - sum.addAndGet((Integer)e.get(0)); - - return true; - } - - @Override public Integer reduce() { - return sum.get(); - } - }; - - final AtomicBoolean stop = new AtomicBoolean(); - - startLatch.await(); - - IgniteInternalFuture<?> fut2 = multithreadedAsync(new Callable() { - @Override public Object call() throws Exception { - int cnt = 0; - - while (!stop.get()) { - Collection<Integer> res = sumQry.execute(rmtRdc).get(); - - int sum = F.sumInt(res); - - cnt++; - - assertTrue(sum > 0); - - if (cnt % logFreq == 0) { - info("Reduced value: " + sum); - info("Executed queries: " + cnt); - } - } - - return null; - } - }, 1); - - fut1.get(); - - stop.set(true); - - fut2.get(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java index 65d479d..7c5b472 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java @@ -69,7 +69,6 @@ import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedQuerySelfTest; -import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager; import org.apache.ignite.internal.processors.cache.query.QueryCursorEx; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -969,22 +968,6 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac /** * @throws Exception If failed. */ - public void testEmptyObject() throws Exception { - IgniteCache<EmptyObject, EmptyObject> cache = ignite().cache(null); - - cache.put(new EmptyObject(1), new EmptyObject(2)); - - for (int i = 0; i < gridCount(); i++) { - GridCacheQueryManager<Object, Object> qryMgr = - ((IgniteKernal)grid(i)).internalCache().context().queries(); - - assert !hasIndexTable(EmptyObject.class, qryMgr); - } - } - - /** - * @throws Exception If failed. - */ public void testPrimitiveType() throws Exception { IgniteCache<Integer, Integer> cache = ignite().cache(null); @@ -1536,7 +1519,7 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac } try { - IgniteCache<UUID, Person> cache = ignite().cache(null); + IgniteCache<UUID,Person> cache = ignite().cache(null); for (int i = 1; i <= 20; i++) cache.put(UUID.randomUUID(), new Person("Person " + i, i)); @@ -1555,17 +1538,6 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac } /** - * @param cls Class to check index table for. - * @param qryMgr Query manager. - * @return {@code true} if index has a table for given class. - * @throws IgniteCheckedException If failed. - */ - private boolean hasIndexTable(Class<?> cls, GridCacheQueryManager<Object, Object> qryMgr) - throws IgniteCheckedException { - return qryMgr.size(cls) != -1; - } - - /** * */ private static class ArrayObject implements Serializable { http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheCrossCacheJoinRandomTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheCrossCacheJoinRandomTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheCrossCacheJoinRandomTest.java new file mode 100644 index 0000000..4d5a44e --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheCrossCacheJoinRandomTest.java @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Serializable; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Stack; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.QueryIndex; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.processors.query.h2.sql.AbstractH2CompareQueryTest; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgniteCacheCrossCacheJoinRandomTest extends AbstractH2CompareQueryTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private boolean client; + + /** */ + private static final int OBJECTS = 200; + + /** */ + private static final int MAX_CACHES = 5; + + /** */ + private static Random rnd; + + /** */ + private static List<Map<Integer, Integer>> cachesData; + + /** */ + private static final List<T2<CacheMode, Integer>> MODES_1 = F.asList( + //new T2<>(REPLICATED, 0), + new T2<>(PARTITIONED, 0), + new T2<>(PARTITIONED, 1), + new T2<>(PARTITIONED, 2)); + + /** */ + private static final List<T2<CacheMode, Integer>> MODES_2 = F.asList( + //new T2<>(REPLICATED, 0), + new T2<>(PARTITIONED, 0), + new T2<>(PARTITIONED, 1)); + + /** {@inheritDoc} */ + @Override protected void setIndexedTypes(CacheConfiguration<?, ?> cc, CacheMode mode) { + // No-op. + } + + /** {@inheritDoc} */ + @Override protected void initCacheAndDbData() throws Exception { + // No-op. + } + + /** {@inheritDoc} */ + @Override protected void checkAllDataEquals() throws Exception { + // No-op. + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 10 * 60_000; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = ((TcpDiscoverySpi)cfg.getDiscoverySpi()); + + spi.setIpFinder(IP_FINDER); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + client = true; + + startGrid(SRVS); + + long seed = System.currentTimeMillis(); + + rnd = new Random(seed); + + log.info("Random seed: " + seed); + + cachesData = new ArrayList<>(MAX_CACHES); + + for (int i = 0; i < MAX_CACHES; i++) { + Map<Integer, Integer> data = createData(OBJECTS * 2); + + insertH2(data, i); + + cachesData.add(data); + } + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + cachesData = null; + + super.afterTestsStopped(); + } + + /** {@inheritDoc} */ + @Override protected Statement initializeH2Schema() throws SQLException { + Statement st = super.initializeH2Schema(); + + for (int i = 0; i < MAX_CACHES; i++) { + st.execute("CREATE SCHEMA \"cache" + i + "\""); + + st.execute("create table \"cache" + i + "\".TESTOBJECT" + + " (_key int not null," + + " _val other not null," + + " parentId int)"); + } + + return st; + } + + /** + * @param name Cache name. + * @param cacheMode Cache mode. + * @param backups Number of backups. + * @return Cache configuration. + */ + private CacheConfiguration configuration(String name, CacheMode cacheMode, int backups) { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName(name); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicWriteOrderMode(PRIMARY); + ccfg.setAtomicityMode(ATOMIC); + ccfg.setCacheMode(cacheMode); + + if (cacheMode == PARTITIONED) + ccfg.setBackups(backups); + + QueryEntity entity = new QueryEntity(); + entity.setKeyType(Integer.class.getName()); + entity.setValueType(TestObject.class.getName()); + entity.addQueryField("parentId", Integer.class.getName(), null); + entity.setIndexes(F.asList(new QueryIndex("parentId"))); + + ccfg.setQueryEntities(F.asList(entity)); + + return ccfg; + } + + /** + * @throws Exception If failed. + */ + public void testJoin2Caches() throws Exception { + testJoin(2, MODES_1); + } + + /** + * @throws Exception If failed. + */ + public void testJoin3Caches() throws Exception { + testJoin(3, MODES_1); + } + + /** + * @throws Exception If failed. + */ + public void testJoin4Caches() throws Exception { + testJoin(4, MODES_2); + } + + /** + * @throws Exception If failed. + */ + public void testJoin5Caches() throws Exception { + testJoin(5, MODES_2); + } + + /** + * @param caches Number of caches. + * @param allModes Cache modes. + * @throws Exception If failed. + */ + private void testJoin(int caches, List<T2<CacheMode, Integer>> allModes) throws Exception { + checkJoin(cachesData, allModes, new Stack<T2<CacheMode, Integer>>(), caches); + } + + /** + * @param cachesData Caches data. + * @param allModes Modes to test. + * @param modes Select modes. + * @param caches Caches number. + * @throws Exception If failed. + */ + private void checkJoin(List<Map<Integer, Integer>> cachesData, + List<T2<CacheMode, Integer>> allModes, + Stack<T2<CacheMode, Integer>> modes, + int caches) throws Exception { + if (modes.size() == caches) { + List<CacheConfiguration> ccfgs = new ArrayList<>(); + + for (int i = 0; i < modes.size(); i++) { + T2<CacheMode, Integer> mode = modes.get(i); + + CacheConfiguration ccfg = configuration("cache" + i, mode.get1(), mode.get2()); + + ccfgs.add(ccfg); + } + + log.info("Check configurations: " + modes); + + checkJoinQueries(ccfgs, cachesData); + } + else { + for (T2<CacheMode, Integer> mode : allModes) { + modes.push(mode); + + checkJoin(cachesData, allModes, modes, caches); + + modes.pop(); + } + } + } + + /** + * @param ccfgs Configurations. + * @param cachesData Caches data. + * @throws Exception If failed. + */ + private void checkJoinQueries(List<CacheConfiguration> ccfgs, List<Map<Integer, Integer>> cachesData) throws Exception { + Ignite client = ignite(SRVS); + + final int CACHES = ccfgs.size(); + + try { + IgniteCache cache = null; + + boolean hasReplicated = false; + + for (int i = 0; i < CACHES; i++) { + CacheConfiguration ccfg = ccfgs.get(i); + + IgniteCache cache0 = client.createCache(ccfg); + + if (ccfg.getCacheMode() == REPLICATED) + hasReplicated = true; + + if (cache == null && ccfg.getCacheMode() == PARTITIONED) + cache = cache0; + + insertCache(cachesData.get(i), cache0); + } + + boolean distributedJoin = true; + + // Do not use distributed join if all caches are REPLICATED. + if (cache == null) { + cache = client.cache(ccfgs.get(0).getName()); + + distributedJoin = false; + } + + Object[] args = {}; + + compareQueryRes0(cache, createQuery(CACHES, false, null), distributedJoin, false, args, Ordering.RANDOM); + + if (!hasReplicated) { + compareQueryRes0(cache, createQuery(CACHES, false, null), distributedJoin, true, args, Ordering.RANDOM); + + compareQueryRes0(cache, createQuery(CACHES, true, null), distributedJoin, true, args, Ordering.RANDOM); + } + + Map<Integer, Integer> data = cachesData.get(CACHES - 1); + + final int QRY_CNT = CACHES > 4 ? 2 : 50; + + int cnt = 0; + + for (Integer objId : data.keySet()) { + compareQueryRes0(cache, createQuery(CACHES, false, objId), distributedJoin, false, args, Ordering.RANDOM); + + if (!hasReplicated) { + compareQueryRes0(cache, createQuery(CACHES, false, objId), distributedJoin, true, args, Ordering.RANDOM); + + compareQueryRes0(cache, createQuery(CACHES, true, objId), distributedJoin, true, args, Ordering.RANDOM); + } + + if (cnt++ == QRY_CNT) + break; + } + } + finally { + for (CacheConfiguration ccfg : ccfgs) + client.destroyCache(ccfg.getName()); + } + } + + /** + * @param caches Number of caches to join. + * @param outer If {@code true} creates outer join query, otherwise inner join. + * @param objId Object ID. + * @return SQL. + */ + @SuppressWarnings("StringConcatenationInsideStringBufferAppend") + private String createQuery(int caches, boolean outer, @Nullable Integer objId) { + StringBuilder qry = new StringBuilder("select "); + + for (int i = 0; i < caches; i++) { + if (i != 0) + qry.append(", "); + + qry.append("o" + i + "._key"); + } + + qry.append(" from \"cache0\".TestObject o0 "); + + for (int i = 1; i < caches; i++) { + String cacheName = "cache" + i; + + String cur = "o" + i; + String prev = "o" + (i - 1); + + qry.append(outer ? "left outer join " : "inner join "); + qry.append("\"" + cacheName + "\".TestObject " + cur); + + if (i == caches - 1 && objId != null) + qry.append(" on (" + prev + ".parentId=" + cur + "._key and " + cur + "._key=" + objId + ") "); + else + qry.append(" on (" + prev + ".parentId=" + cur + "._key) "); + } + + return qry.toString(); + } + + /** + * @param data Data. + * @param cache Cache. + */ + private void insertCache(Map<Integer, Integer> data, IgniteCache<Object, Object> cache) { + for (Map.Entry<Integer, Integer> e : data.entrySet()) + cache.put(e.getKey(), new TestObject(e.getValue())); + } + + /** + * @param data Data. + * @param cache Cache index. + * @throws Exception If failed. + */ + private void insertH2(Map<Integer, Integer> data, int cache) throws Exception { + for (Map.Entry<Integer, Integer> e : data.entrySet()) { + try (PreparedStatement st = conn.prepareStatement("insert into \"cache" + cache + "\".TESTOBJECT " + + "(_key, _val, parentId) values(?, ?, ?)")) { + st.setObject(1, e.getKey()); + st.setObject(2, new TestObject(e.getValue())); + st.setObject(3, e.getValue()); + + st.executeUpdate(); + } + } + } + + /** + * @param cnt Objects count. + * @return Generated data. + */ + private Map<Integer, Integer> createData(int cnt) { + Map<Integer, Integer> res = new LinkedHashMap<>(); + + while (res.size() < cnt) + res.put(rnd.nextInt(cnt), rnd.nextInt(OBJECTS + 1)); + + return res; + } + + /** + * + */ + static class TestObject implements Serializable { + /** */ + int parentId; + + /** + * @param parentId Parent object ID. + */ + public TestObject(int parentId) { + this.parentId = parentId; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TestObject.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinCollocatedAndNotTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinCollocatedAndNotTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinCollocatedAndNotTest.java new file mode 100644 index 0000000..7881c44 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinCollocatedAndNotTest.java @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheKeyConfiguration; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.QueryIndex; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.affinity.AffinityKeyMapped; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgniteCacheDistributedJoinCollocatedAndNotTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String PERSON_CACHE = "person"; + + /** */ + private static final String ORG_CACHE = "org"; + + /** */ + private static final String ACCOUNT_CACHE = "acc"; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheKeyConfiguration keyCfg = new CacheKeyConfiguration(PersonKey.class.getName(), "affKey"); + + cfg.setCacheKeyConfiguration(keyCfg); + + TcpDiscoverySpi spi = ((TcpDiscoverySpi)cfg.getDiscoverySpi()); + + spi.setIpFinder(IP_FINDER); + + List<CacheConfiguration> ccfgs = new ArrayList<>(); + + { + CacheConfiguration ccfg = configuration(PERSON_CACHE); + + QueryEntity entity = new QueryEntity(); + entity.setKeyType(PersonKey.class.getName()); + entity.setValueType(Person.class.getName()); + entity.addQueryField("id", Integer.class.getName(), null); + entity.addQueryField("affKey", Integer.class.getName(), null); + entity.addQueryField("name", String.class.getName(), null); + + ccfg.setQueryEntities(F.asList(entity)); + + ccfgs.add(ccfg); + } + + { + CacheConfiguration ccfg = configuration(ORG_CACHE); + + QueryEntity entity = new QueryEntity(); + entity.setKeyType(Integer.class.getName()); + entity.setValueType(Organization.class.getName()); + entity.addQueryField("name", String.class.getName(), null); + entity.setIndexes(F.asList(new QueryIndex("name"))); + + ccfg.setQueryEntities(F.asList(entity)); + + ccfgs.add(ccfg); + } + + { + CacheConfiguration ccfg = configuration(ACCOUNT_CACHE); + + QueryEntity entity = new QueryEntity(); + entity.setKeyType(Integer.class.getName()); + entity.setValueType(Account.class.getName()); + entity.addQueryField("personId", Integer.class.getName(), null); + entity.addQueryField("name", String.class.getName(), null); + entity.setIndexes(F.asList(new QueryIndex("personId"), new QueryIndex("name"))); + + ccfg.setQueryEntities(F.asList(entity)); + + ccfgs.add(ccfg); + } + + cfg.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()])); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(2); + + client = true; + + startGrid(2); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * @param name Cache name. + * @return Cache configuration. + */ + private CacheConfiguration configuration(String name) { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName(name); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicWriteOrderMode(PRIMARY); + ccfg.setAtomicityMode(ATOMIC); + ccfg.setBackups(1); + + return ccfg; + } + + /** + * @throws Exception If failed. + */ + public void testJoin() throws Exception { + Ignite client = grid(2); + + IgniteCache<Object, Object> personCache = client.cache(PERSON_CACHE); + IgniteCache<Object, Object> orgCache = client.cache(ORG_CACHE); + IgniteCache<Object, Object> accCache = client.cache(ACCOUNT_CACHE); + + Affinity<Object> aff = client.affinity(PERSON_CACHE); + + AtomicInteger orgKey = new AtomicInteger(); + AtomicInteger accKey = new AtomicInteger(); + + ClusterNode node0 = ignite(0).cluster().localNode(); + ClusterNode node1 = ignite(1).cluster().localNode(); + + /** + * One organization, one person, two accounts. + */ + + int orgId1 = keyForNode(aff, orgKey, node0); + + orgCache.put(orgId1, new Organization("obj-" + orgId1)); + + personCache.put(new PersonKey(1, orgId1), new Person(1, "o1-p1")); + personCache.put(new PersonKey(2, orgId1), new Person(2, "o1-p2")); + + accCache.put(keyForNode(aff, accKey, node0), new Account(1, "a0")); + accCache.put(keyForNode(aff, accKey, node1), new Account(1, "a1")); + + // Join on affinity keys equals condition should not be distributed. + String qry = "select o.name, p._key, p.name " + + "from \"org\".Organization o, \"person\".Person p " + + "where p.affKey = o._key"; + + assertFalse(plan(qry, orgCache, false).contains("batched")); + + checkQuery(qry, orgCache, false, 2); + + checkQuery("select o.name, p._key, p.name, a.name " + + "from \"org\".Organization o, \"person\".Person p, \"acc\".Account a " + + "where p.affKey = o._key and p.id = a.personId", orgCache, true, 2); + } + + /** + * @param sql SQL. + * @param cache Cache. + * @param enforceJoinOrder Enforce join order flag. + * @return Query plan. + */ + private String plan(String sql, + IgniteCache<?, ?> cache, + boolean enforceJoinOrder) { + return (String)cache.query(new SqlFieldsQuery("explain " + sql) + .setDistributedJoins(true) + .setEnforceJoinOrder(enforceJoinOrder)) + .getAll().get(0).get(0); + } + + /** + * @param sql SQL. + * @param cache Cache. + * @param enforceJoinOrder Enforce join order flag. + * @param expSize Expected results size. + */ + private void checkQuery(String sql, + IgniteCache<Object, Object> cache, + boolean enforceJoinOrder, + int expSize) { + String plan = (String)cache.query(new SqlFieldsQuery("explain " + sql) + .setDistributedJoins(true) + .setEnforceJoinOrder(enforceJoinOrder)) + .getAll().get(0).get(0); + + log.info("Plan: " + plan); + + SqlFieldsQuery qry = new SqlFieldsQuery(sql); + + qry.setDistributedJoins(true); + qry.setEnforceJoinOrder(enforceJoinOrder); + + QueryCursor<List<?>> cur = cache.query(qry); + + List<List<?>> res = cur.getAll(); + + if (expSize != res.size()) + log.info("Results: " + res); + + assertEquals(expSize, res.size()); + } + /** + * + */ + public static class PersonKey { + /** */ + private int id; + + /** */ + @AffinityKeyMapped + private int affKey; + + /** + * @param id Key. + * @param affKey Affinity key. + */ + public PersonKey(int id, int affKey) { + this.id = id; + this.affKey = affKey; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + PersonKey other = (PersonKey)o; + + return id == other.id; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return id; + } + } + + /** + * + */ + private static class Account implements Serializable { + /** */ + int personId; + + /** */ + String name; + + /** + * @param personId Person ID. + * @param name Name. + */ + public Account(int personId, String name) { + this.personId = personId; + this.name = name; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Account.class, this); + } + } + + /** + * + */ + private static class Person implements Serializable { + /** */ + int id; + + /** */ + String name; + + /** + * @param id Person ID. + * @param name Name. + */ + public Person(int id, String name) { + this.id = id; + this.name = name; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Person.class, this); + } + } + + /** + * + */ + private static class Organization implements Serializable { + /** */ + String name; + + /** + * @param name Name. + */ + public Organization(String name) { + this.name = name; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Organization.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinCustomAffinityMapper.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinCustomAffinityMapper.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinCustomAffinityMapper.java new file mode 100644 index 0000000..1d4f7b2 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinCustomAffinityMapper.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import javax.cache.CacheException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.QueryIndex; +import org.apache.ignite.cache.affinity.AffinityKeyMapper; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgniteCacheDistributedJoinCustomAffinityMapper extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String PERSON_CACHE = "person"; + + /** */ + private static final String PERSON_CACHE_CUSTOM_AFF = "personCustomAff"; + + /** */ + private static final String ORG_CACHE = "org"; + + /** */ + private static final String ORG_CACHE_REPL_CUSTOM = "orgReplCustomAff"; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + List<CacheConfiguration> ccfgs = new ArrayList<>(); + + { + CacheConfiguration ccfg = configuration(PERSON_CACHE); + + QueryEntity entity = new QueryEntity(); + entity.setKeyType(Integer.class.getName()); + entity.setValueType(Person.class.getName()); + entity.addQueryField("orgId", Integer.class.getName(), null); + entity.setIndexes(F.asList(new QueryIndex("orgId"))); + + ccfg.setQueryEntities(F.asList(entity)); + + ccfgs.add(ccfg); + } + + { + CacheConfiguration ccfg = configuration(PERSON_CACHE_CUSTOM_AFF); + + ccfg.setAffinityMapper(new TestMapper()); + + QueryEntity entity = new QueryEntity(); + entity.setKeyType(Integer.class.getName()); + entity.setValueType(Person.class.getName()); + entity.addQueryField("orgId", Integer.class.getName(), null); + entity.setIndexes(F.asList(new QueryIndex("orgId"))); + + ccfg.setQueryEntities(F.asList(entity)); + + ccfgs.add(ccfg); + } + + { + CacheConfiguration ccfg = configuration(ORG_CACHE); + + QueryEntity entity = new QueryEntity(); + entity.setKeyType(Integer.class.getName()); + entity.setValueType(Organization.class.getName()); + + ccfg.setQueryEntities(F.asList(entity)); + + ccfgs.add(ccfg); + } + + { + CacheConfiguration ccfg = configuration(ORG_CACHE_REPL_CUSTOM); + + ccfg.setCacheMode(REPLICATED); + ccfg.setAffinityMapper(new TestMapper()); + + QueryEntity entity = new QueryEntity(); + entity.setKeyType(Integer.class.getName()); + entity.setValueType(Organization.class.getName()); + + ccfg.setQueryEntities(F.asList(entity)); + + ccfgs.add(ccfg); + } + + cfg.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()])); + + return cfg; + } + + /** + * @param name Cache name. + * @return Cache configuration. + */ + private CacheConfiguration configuration(String name) { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName(name); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicWriteOrderMode(PRIMARY); + ccfg.setAtomicityMode(ATOMIC); + ccfg.setBackups(0); + + return ccfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(3); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testJoinCustomAffinityMapper() throws Exception { + Ignite ignite = ignite(0); + + IgniteCache<Object, Object> cache = ignite.cache(PERSON_CACHE); + + checkQueryFails(cache, "select o._key k1, p._key k2 " + + "from \"org\".Organization o, \"personCustomAff\".Person p where o._key=p.orgId", false); + + checkQueryFails(cache, "select o._key k1, p._key k2 " + + "from \"personCustomAff\".Person p, \"org\".Organization o where o._key=p.orgId", false); + + { + // Check regular query does not fail. + SqlFieldsQuery qry = new SqlFieldsQuery("select o._key k1, p._key k2 " + + "from \"org\".Organization o, \"personCustomAff\".Person p where o._key=p.orgId"); + + cache.query(qry).getAll(); + } + + { + // Should not check affinity for replicated cache. + SqlFieldsQuery qry = new SqlFieldsQuery("select o1._key k1, p._key k2, o2._key k3 " + + "from \"org\".Organization o1, \"person\".Person p, \"orgReplCustomAff\".Organization o2 where " + + "o1._key=p.orgId and o2._key=p.orgId"); + + cache.query(qry).getAll(); + } + } + + /** + * @param cache Cache. + * @param sql SQL. + * @param enforceJoinOrder Enforce join order flag. + */ + private void checkQueryFails(final IgniteCache<Object, Object> cache, + String sql, + boolean enforceJoinOrder) { + final SqlFieldsQuery qry = new SqlFieldsQuery(sql); + + qry.setDistributedJoins(true); + qry.setEnforceJoinOrder(enforceJoinOrder); + + Throwable err = GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + cache.query(qry); + + return null; + } + }, CacheException.class, null); + + assertTrue("Unexpected error message: " + err.getMessage(), + err.getMessage().contains("can not use distributed joins for cache with custom AffinityKeyMapper configured.")); + } + + /** + * + */ + static class TestMapper implements AffinityKeyMapper { + /** {@inheritDoc} */ + @Override public Object affinityKey(Object key) { + return key; + } + + /** {@inheritDoc} */ + @Override public void reset() { + // No-op. + } + } + /** + * + */ + private static class Person implements Serializable { + /** */ + int orgId; + + /** + * @param orgId Organization ID. + */ + public Person(int orgId) { + this.orgId = orgId; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Person.class, this); + } + } + + /** + * + */ + private static class Organization implements Serializable { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/68891e89/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinNoIndexTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinNoIndexTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinNoIndexTest.java new file mode 100644 index 0000000..95c56fa --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinNoIndexTest.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.CacheException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgniteCacheDistributedJoinNoIndexTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final String PERSON_CACHE = "person"; + + /** */ + private static final String ORG_CACHE = "org"; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = ((TcpDiscoverySpi)cfg.getDiscoverySpi()); + + spi.setIpFinder(IP_FINDER); + + List<CacheConfiguration> ccfgs = new ArrayList<>(); + + { + CacheConfiguration ccfg = configuration(PERSON_CACHE); + + QueryEntity entity = new QueryEntity(); + entity.setKeyType(Integer.class.getName()); + entity.setValueType(Person.class.getName()); + entity.addQueryField("orgId", Integer.class.getName(), null); + entity.addQueryField("orgName", String.class.getName(), null); + + ccfg.setQueryEntities(F.asList(entity)); + + ccfgs.add(ccfg); + } + + { + CacheConfiguration ccfg = configuration(ORG_CACHE); + + QueryEntity entity = new QueryEntity(); + entity.setKeyType(Integer.class.getName()); + entity.setValueType(Organization.class.getName()); + entity.addQueryField("name", String.class.getName(), null); + + ccfg.setQueryEntities(F.asList(entity)); + + ccfgs.add(ccfg); + } + + cfg.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()])); + + cfg.setClientMode(client); + + return cfg; + } + + /** + * @param name Cache name. + * @return Cache configuration. + */ + private CacheConfiguration configuration(String name) { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName(name); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicWriteOrderMode(PRIMARY); + ccfg.setAtomicityMode(ATOMIC); + ccfg.setBackups(0); + + return ccfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(2); + + client = true; + + startGrid(2); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testJoin() throws Exception { + Ignite client = grid(2); + + Affinity<Object> aff = client.affinity(PERSON_CACHE); + + final IgniteCache<Object, Object> personCache = client.cache(PERSON_CACHE); + IgniteCache<Object, Object> orgCache = client.cache(ORG_CACHE); + + AtomicInteger pKey = new AtomicInteger(100_000); + AtomicInteger orgKey = new AtomicInteger(); + + ClusterNode node0 = ignite(0).cluster().localNode(); + ClusterNode node1 = ignite(1).cluster().localNode(); + + for (int i = 0; i < 3; i++) { + int orgId = keyForNode(aff, orgKey, node0); + + orgCache.put(orgId, new Organization("org-" + i)); + + for (int j = 0; j < i; j++) + personCache.put(keyForNode(aff, pKey, node1), new Person(orgId, "org-" + i)); + } + + checkNoIndexError(personCache, "select o.name, p._key, p.orgName " + + "from \"org\".Organization o, \"person\".Person p " + + "where p.orgName = o.name"); + + checkNoIndexError(personCache, "select o.name, p._key, p.orgName " + + "from \"org\".Organization o inner join \"person\".Person p " + + "on p.orgName = o.name"); + + checkNoIndexError(personCache, "select o.name, p._key, p.orgName " + + "from \"org\".Organization o, \"person\".Person p " + + "where p.orgName > o.name"); + + checkNoIndexError(personCache, "select o.name, p._key, p.orgName " + + "from (select * from \"org\".Organization) o, \"person\".Person p " + + "where p.orgName = o.name"); + + checkNoIndexError(personCache, "select o.name, p._key, p.orgName " + + "from \"org\".Organization o, (select * from \"person\".Person) p " + + "where p.orgName = o.name"); + + checkNoIndexError(personCache, "select o.name, p._key, p.orgName " + + "from (select * from \"org\".Organization) o, (select * from \"person\".Person) p " + + "where p.orgName = o.name"); + + checkNoIndexError(personCache, "select o.name, p._key, p.orgName " + + "from \"org\".Organization o, \"person\".Person p"); + + checkNoIndexError(personCache, "select o.name, p._key, p.orgName " + + "from \"org\".Organization o, \"person\".Person p where o._key != p._key"); + + checkQuery("select o.name, p._key, p.orgName " + + "from \"org\".Organization o, \"person\".Person p " + + "where p._key = o._key and o.name=?", personCache, 0, "aaa"); + } + + /** + * @param cache Cache. + * @param sql SQL. + */ + private void checkNoIndexError(final IgniteCache<Object, Object> cache, final String sql) { + Throwable err = GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + SqlFieldsQuery qry = new SqlFieldsQuery(sql); + + qry.setDistributedJoins(true); + + cache.query(qry).getAll(); + + return null; + } + }, CacheException.class, null); + + log.info("Error: " + err.getMessage()); + + assertTrue("Unexpected error message: " + err.getMessage(), + err.getMessage().contains("join condition does not use index")); + } + + /** + * @param sql SQL. + * @param cache Cache. + * @param expSize Expected results size. + * @param args Arguments. + * @return Results. + */ + private List<List<?>> checkQuery(String sql, + IgniteCache<Object, Object> cache, + int expSize, + Object... args) { + SqlFieldsQuery qry = new SqlFieldsQuery(sql); + + qry.setDistributedJoins(true); + qry.setArgs(args); + + log.info("Plan: " + queryPlan(cache, qry)); + + QueryCursor<List<?>> cur = cache.query(qry); + + List<List<?>> res = cur.getAll(); + + if (expSize != res.size()) + log.info("Results: " + res); + + assertEquals(expSize, res.size()); + + return res; + } + + /** + * + */ + private static class Person implements Serializable { + /** */ + int orgId; + + /** */ + String orgName; + + /** + * @param orgId Organization ID. + * @param orgName Organization name. + */ + public Person(int orgId, String orgName) { + this.orgId = orgId; + this.orgName = orgName; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Person.class, this); + } + } + + /** + * + */ + private static class Organization implements Serializable { + /** */ + String name; + + /** + * @param name Name. + */ + public Organization(String name) { + this.name = name; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(Organization.class, this); + } + } +}
