http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlRequest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlRequest.java new file mode 100644 index 0000000..e40bc2d --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlRequest.java @@ -0,0 +1,516 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.query.h2.twostep.msg; + +import java.io.Externalizable; +import java.nio.ByteBuffer; +import java.util.List; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.GridDirectCollection; +import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.binary.BinaryMarshaller; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +import static org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery.EMPTY_PARAMS; + +/** + * Request for DML operation on remote node. + */ +public class GridH2DmlRequest implements Message, GridCacheQueryMarshallable { + /** */ + private static final long serialVersionUID = 0L; + + /** Request id. */ + @GridToStringInclude + private long reqId; + + /** Cache identifiers. */ + @GridToStringInclude + @GridDirectCollection(Integer.class) + private List<Integer> caches; + + /** Topology version. */ + @GridToStringInclude + private AffinityTopologyVersion topVer; + + /** Query partitions. */ + @GridToStringInclude + private int[] qryParts; + + /** Page size. */ + private int pageSize; + + /** Query. */ + @GridToStringInclude + private String qry; + + /** Flags. */ + private byte flags; + + /** Timeout. */ + private int timeout; + + /** Query parameters. */ + @GridToStringInclude(sensitive = true) + @GridDirectTransient + private Object[] params; + + /** Query parameters as bytes. */ + private byte[] paramsBytes; + + /** Schema name. */ + @GridToStringInclude + private String schemaName; + + /** + * Required by {@link Externalizable} + */ + public GridH2DmlRequest() { + // No-op. + } + + /** + * @param req Request. + */ + public GridH2DmlRequest(GridH2DmlRequest req) { + reqId = req.reqId; + caches = req.caches; + topVer = req.topVer; + qryParts = req.qryParts; + pageSize = req.pageSize; + qry = req.qry; + flags = req.flags; + timeout = req.timeout; + params = req.params; + paramsBytes = req.paramsBytes; + schemaName = req.schemaName; + } + + /** + * @return Parameters. + */ + public Object[] parameters() { + return params; + } + + /** + * @param params Parameters. + * @return {@code this}. + */ + public GridH2DmlRequest parameters(Object[] params) { + if (params == null) + params = EMPTY_PARAMS; + + this.params = params; + + return this; + } + + /** + * @param reqId Request ID. + * @return {@code this}. + */ + public GridH2DmlRequest requestId(long reqId) { + this.reqId = reqId; + + return this; + } + + /** + * @return Request ID. + */ + public long requestId() { + return reqId; + } + + /** + * @param caches Caches. + * @return {@code this}. + */ + public GridH2DmlRequest caches(List<Integer> caches) { + this.caches = caches; + + return this; + } + + /** + * @return Caches. + */ + public List<Integer> caches() { + return caches; + } + + /** + * @param topVer Topology version. + * @return {@code this}. + */ + public GridH2DmlRequest topologyVersion(AffinityTopologyVersion topVer) { + this.topVer = topVer; + + return this; + } + + /** + * @return Topology version. + */ + public AffinityTopologyVersion topologyVersion() { + return topVer; + } + + /** + * @return Query partitions. + */ + public int[] queryPartitions() { + return qryParts; + } + + /** + * @param qryParts Query partitions. + * @return {@code this}. + */ + public GridH2DmlRequest queryPartitions(int[] qryParts) { + this.qryParts = qryParts; + + return this; + } + + /** + * @param pageSize Page size. + * @return {@code this}. + */ + public GridH2DmlRequest pageSize(int pageSize) { + this.pageSize = pageSize; + + return this; + } + + /** + * @return Page size. + */ + public int pageSize() { + return pageSize; + } + + /** + * @param qry SQL Query. + * @return {@code this}. + */ + public GridH2DmlRequest query(String qry) { + this.qry = qry; + + return this; + } + + /** + * @return SQL Query. + */ + public String query() { + return qry; + } + + /** + * @param flags Flags. + * @return {@code this}. + */ + public GridH2DmlRequest flags(int flags) { + assert flags >= 0 && flags <= 255: flags; + + this.flags = (byte)flags; + + return this; + } + + /** + * @param flags Flags to check. + * @return {@code true} If all the requested flags are set to {@code true}. + */ + public boolean isFlagSet(int flags) { + return (this.flags & flags) == flags; + } + + /** + * @return Timeout. + */ + public int timeout() { + return timeout; + } + + /** + * @param timeout New timeout. + * @return {@code this}. + */ + public GridH2DmlRequest timeout(int timeout) { + this.timeout = timeout; + + return this; + } + + /** + * @return Schema name. + */ + public String schemaName() { + return schemaName; + } + + /** + * @param schemaName Schema name. + * @return {@code this}. + */ + public GridH2DmlRequest schemaName(String schemaName) { + this.schemaName = schemaName; + + return this; + } + + /** {@inheritDoc} */ + @Override public void marshall(Marshaller m) { + if (paramsBytes != null) + return; + + assert params != null; + + try { + paramsBytes = U.marshal(m, params); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("IfMayBeConditional") + @Override public void unmarshall(Marshaller m, GridKernalContext ctx) { + if (params != null) + return; + + assert paramsBytes != null; + + try { + final ClassLoader ldr = U.resolveClassLoader(ctx.config()); + + if (m instanceof BinaryMarshaller) + // To avoid deserializing of enum types. + params = ((BinaryMarshaller)m).binaryMarshaller().unmarshal(paramsBytes, ldr); + else + params = U.unmarshal(m, paramsBytes, ldr); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeCollection("caches", caches, MessageCollectionItemType.INT)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeByte("flags", flags)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeInt("pageSize", pageSize)) + return false; + + writer.incrementState(); + + case 3: + if (!writer.writeByteArray("paramsBytes", paramsBytes)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeString("qry", qry)) + return false; + + writer.incrementState(); + + case 5: + if (!writer.writeIntArray("qryParts", qryParts)) + return false; + + writer.incrementState(); + + case 6: + if (!writer.writeLong("reqId", reqId)) + return false; + + writer.incrementState(); + + case 7: + if (!writer.writeString("schemaName", schemaName)) + return false; + + writer.incrementState(); + + case 8: + if (!writer.writeInt("timeout", timeout)) + return false; + + writer.incrementState(); + + case 9: + if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + caches = reader.readCollection("caches", MessageCollectionItemType.INT); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + flags = reader.readByte("flags"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + pageSize = reader.readInt("pageSize"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 3: + paramsBytes = reader.readByteArray("paramsBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + qry = reader.readString("qry"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: + qryParts = reader.readIntArray("qryParts"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 6: + reqId = reader.readLong("reqId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 7: + schemaName = reader.readString("schemaName"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 8: + timeout = reader.readInt("timeout"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 9: + topVer = reader.readMessage("topVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(GridH2DmlRequest.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return -55; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 10; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridH2DmlRequest.class, this); + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlResponse.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlResponse.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlResponse.java new file mode 100644 index 0000000..808ff9e --- /dev/null +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlResponse.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.h2.twostep.msg; + +import java.nio.ByteBuffer; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.binary.BinaryMarshaller; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * Response to remote DML request. + */ +public class GridH2DmlResponse implements Message, GridCacheQueryMarshallable { + /** */ + private static final long serialVersionUID = 0L; + + /** Request id. */ + @GridToStringInclude + private long reqId; + + /** Number of updated rows. */ + @GridToStringInclude + private long updCnt; + + /** Error message. */ + @GridToStringInclude + private String err; + + /** Keys that failed. */ + @GridToStringInclude + @GridDirectTransient + private Object[] errKeys; + + /** Keys that failed (after marshalling). */ + private byte[] errKeysBytes; + + /** + * Default constructor. + */ + public GridH2DmlResponse() { + // No-op. + } + + /** + * Constructor. + * + * @param reqId Request id. + * @param updCnt Updated row number. + * @param errKeys Erroneous keys. + * @param error Error message. + */ + public GridH2DmlResponse(long reqId, long updCnt, Object[] errKeys, String error) { + this.reqId = reqId; + this.updCnt = updCnt; + this.errKeys = errKeys; + this.err = error; + } + + /** + * @return Request id. + */ + public long requestId() { + return reqId; + } + + /** + * @return Update counter. + */ + public long updateCounter() { + return updCnt; + } + + /** + * @return Error keys. + */ + public Object[] errorKeys() { + return errKeys; + } + + /** + * @return Error message. + */ + public String error() { + return err; + } + + /** {@inheritDoc} */ + @Override public void marshall(Marshaller m) { + if (errKeysBytes != null || errKeys == null) + return; + + try { + errKeysBytes = U.marshal(m, errKeys); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("IfMayBeConditional") + @Override public void unmarshall(Marshaller m, GridKernalContext ctx) { + if (errKeys != null || errKeysBytes == null) + return; + + try { + final ClassLoader ldr = U.resolveClassLoader(ctx.config()); + + if (m instanceof BinaryMarshaller) + // To avoid deserializing of enum types. + errKeys = ((BinaryMarshaller)m).binaryMarshaller().unmarshal(errKeysBytes, ldr); + else + errKeys = U.unmarshal(m, errKeysBytes, ldr); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridH2DmlResponse.class, this); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeString("err", err)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeByteArray("errKeysBytes", errKeysBytes)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeLong("reqId", reqId)) + return false; + + writer.incrementState(); + + case 3: + if (!writer.writeLong("updCnt", updCnt)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + err = reader.readString("err"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + errKeysBytes = reader.readByteArray("errKeysBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + reqId = reader.readLong("reqId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 3: + updCnt = reader.readLong("updCnt"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(GridH2DmlResponse.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return -56; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 4; + } + + @Override public void onAckReceived() { + // No-op + } +} + http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java index 18b1afb..3c13392 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java @@ -112,6 +112,12 @@ public class GridH2ValueMessageFactory implements MessageFactory { case -54: return new QueryTable(); + + case -55: + return new GridH2DmlRequest(); + + case -56: + return new GridH2DmlResponse(); } return null; http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlFlagSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlFlagSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlFlagSelfTest.java new file mode 100644 index 0000000..e5efc06 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlFlagSelfTest.java @@ -0,0 +1,783 @@ +package org.apache.ignite.internal.processors.query; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.cache.Cache; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Tests for {@link SqlFieldsQueryEx#skipReducerOnUpdate} flag. + */ +public class IgniteSqlSkipReducerOnUpdateDmlFlagSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static int NODE_COUNT = 4; + + /** */ + private static String NODE_CLIENT = "client"; + + /** */ + private static String CACHE_ACCOUNT = "acc"; + + /** */ + private static String CACHE_REPORT = "rep"; + + /** */ + private static String CACHE_STOCK = "stock"; + + /** */ + private static String CACHE_TRADE = "trade"; + + /** */ + private static String CACHE_LIST = "list"; + + /** */ + private static IgniteEx client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + c.setDiscoverySpi(disco); + + List<CacheConfiguration> ccfgs = new ArrayList<>(); + + ccfgs.add(buildCacheConfiguration(CACHE_ACCOUNT)); + ccfgs.add(buildCacheConfiguration(CACHE_STOCK)); + ccfgs.add(buildCacheConfiguration(CACHE_TRADE)); + ccfgs.add(buildCacheConfiguration(CACHE_REPORT)); + ccfgs.add(buildCacheConfiguration(CACHE_LIST)); + + c.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()])); + + if (gridName.equals(NODE_CLIENT)) + c.setClientMode(true); + + return c; + } + + /** + * Creates a cache configuration. + * + * @param name Name of the cache. + * @return Cache configuration. + */ + private CacheConfiguration buildCacheConfiguration(String name) { + if (name.equals(CACHE_ACCOUNT)) { + CacheConfiguration ccfg = new CacheConfiguration(CACHE_ACCOUNT); + + ccfg.setCacheMode(CacheMode.PARTITIONED); + + QueryEntity entity = new QueryEntity(Integer.class, Account.class); + + ccfg.setQueryEntities(Collections.singletonList(entity)); + + return ccfg; + } + if (name.equals(CACHE_STOCK)) { + CacheConfiguration ccfg = new CacheConfiguration(CACHE_STOCK); + + ccfg.setCacheMode(CacheMode.REPLICATED); + + QueryEntity entity = new QueryEntity(Integer.class, Stock.class); + + ccfg.setQueryEntities(Collections.singletonList(entity)); + + return ccfg; + } + if (name.equals(CACHE_TRADE)) { + CacheConfiguration ccfg = new CacheConfiguration(CACHE_TRADE); + + ccfg.setCacheMode(CacheMode.PARTITIONED); + + QueryEntity entity = new QueryEntity(Integer.class, Trade.class); + + ccfg.setQueryEntities(Collections.singletonList(entity)); + + return ccfg; + } + if (name.equals(CACHE_REPORT)) { + CacheConfiguration ccfg = new CacheConfiguration(CACHE_REPORT); + + ccfg.setCacheMode(CacheMode.PARTITIONED); + + QueryEntity entity = new QueryEntity(Integer.class, Report.class); + + ccfg.setQueryEntities(Collections.singletonList(entity)); + + return ccfg; + } + if (name.equals(CACHE_LIST)) { + CacheConfiguration ccfg = new CacheConfiguration(CACHE_LIST); + + ccfg.setCacheMode(CacheMode.PARTITIONED); + + QueryEntity entity = new QueryEntity(Integer.class, String.class); + + ccfg.setQueryEntities(Collections.singletonList(entity)); + + return ccfg; + } + + assert false; + + return null; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(NODE_COUNT); + + client = (IgniteEx)startGrid(NODE_CLIENT); + + awaitPartitionMapExchange(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + awaitPartitionMapExchange(); + + client.cache(CACHE_ACCOUNT).clear(); + client.cache(CACHE_STOCK).clear(); + client.cache(CACHE_TRADE).clear(); + client.cache(CACHE_REPORT).clear(); + client.cache(CACHE_LIST).clear(); + } + + /** + * + * @throws Exception If failed. + */ + public void testUpdate() throws Exception { + Map<Integer, Account> accounts = getAccounts(100, 1, 100); + + String text = "UPDATE \"acc\".Account SET depo = depo - ? WHERE depo > 0"; + + checkUpdate(client.<Integer, Account>cache(CACHE_ACCOUNT), accounts, new SqlFieldsQueryEx(text, false).setArgs(10)); + } + + /** + * + * @throws Exception If failed. + */ + public void testUpdateFastKey() throws Exception { + Map<Integer, Account> accounts = getAccounts(100, 1, 100); + + String text = "UPDATE \"acc\".Account SET depo = depo - ? WHERE _key = ?"; + + checkUpdate(client.<Integer, Account>cache(CACHE_ACCOUNT), accounts, + new SqlFieldsQueryEx(text, false).setArgs(10, 1)); + } + + /** + * + * @throws Exception If failed. + */ + public void testUpdateLimit() throws Exception { + Map<Integer, Account> accounts = getAccounts(100, 1, 100); + + String text = "UPDATE \"acc\".Account SET depo = depo - ? WHERE sn >= ? AND sn < ? LIMIT ?"; + + checkUpdate(client.<Integer, Account>cache(CACHE_ACCOUNT), accounts, + new SqlFieldsQueryEx(text, false).setArgs(10, 0, 10, 10)); + } + + /** + * + * @throws Exception If failed. + */ + public void testUpdateWhereSubquery() throws Exception { + Map<Integer, Account> accounts = getAccounts(100, 1, -100); + + Map<Integer, Trade> trades = getTrades(100, 2); + + client.cache(CACHE_ACCOUNT).putAll(accounts); + + String text = "UPDATE \"trade\".Trade t SET qty = ? " + + "WHERE accountId IN (SELECT p._key FROM \"acc\".Account p WHERE depo < ?)"; + + checkUpdate(client.<Integer, Trade>cache(CACHE_TRADE), trades, + new SqlFieldsQueryEx(text, false).setArgs(0, 0)); + } + + /** + * + * @throws Exception If failed. + */ + public void testUpdateSetSubquery() throws Exception { + Map<Integer, Account> accounts = getAccounts(100, 1, 1000); + Map<Integer, Trade> trades = getTrades(100, 2); + + client.cache(CACHE_ACCOUNT).putAll(accounts); + + String text = "UPDATE \"trade\".Trade t SET qty = " + + "(SELECT a.depo/t.price FROM \"acc\".Account a WHERE t.accountId = a._key)"; + + checkUpdate(client.<Integer, Trade>cache(CACHE_TRADE), trades, + new SqlFieldsQueryEx(text, false)); + } + + /** + * + * @throws Exception If failed. + */ + public void testUpdateSetTableSubquery() throws Exception { + Map<Integer, Account> accounts = getAccounts(100, 1, 1000); + Map<Integer, Trade> trades = getTrades(100, 2); + + client.cache(CACHE_ACCOUNT).putAll(accounts); + + String text = "UPDATE \"trade\".Trade t SET (qty) = " + + "(SELECT a.depo/t.price FROM \"acc\".Account a WHERE t.accountId = a._key)"; + + checkUpdate(client.<Integer, Trade>cache(CACHE_TRADE), trades, + new SqlFieldsQueryEx(text, false)); + } + + /** + * + * @throws Exception If failed. + */ + public void testInsertValues() throws Exception { + String text = "INSERT INTO \"acc\".Account (_key, name, sn, depo)" + + " VALUES (?, ?, ?, ?), (?, ?, ?, ?)"; + + checkUpdate(client.<Integer, Account>cache(CACHE_ACCOUNT), null, + new SqlFieldsQueryEx(text, false).setArgs(1, "John Marry", 11111, 100, 2, "Marry John", 11112, 200)); + } + + /** + * + * @throws Exception If failed. + */ + public void testInsertFromSelect() throws Exception { + Map<Integer, Account> accounts = getAccounts(100, 1, 1000); + + client.cache(CACHE_ACCOUNT).putAll(accounts); + + String text = "INSERT INTO \"trade\".Trade (_key, accountId, stockId, qty, price) " + + "SELECT a._key, a._key, ?, a.depo/?, ? FROM \"acc\".Account a"; + + checkUpdate(client.<Integer, Trade>cache(CACHE_TRADE), null, + new SqlFieldsQueryEx(text, false).setArgs(1, 10, 10)); + } + + /** + * + * @throws Exception If failed. + */ + public void testInsertFromSelectOrderBy() throws Exception { + Map<Integer, Account> accounts = getAccounts(100, 1, 1000); + + client.cache(CACHE_ACCOUNT).putAll(accounts); + + String text = "INSERT INTO \"trade\".Trade (_key, accountId, stockId, qty, price) " + + "SELECT a._key, a._key, ?, a.depo/?, ? FROM \"acc\".Account a " + + "ORDER BY a.sn DESC"; + + checkUpdate(client.<Integer, Trade>cache(CACHE_TRADE), null, + new SqlFieldsQueryEx(text, false).setArgs(1, 10, 10)); + } + + /** + * + * @throws Exception If failed. + */ + public void testInsertFromSelectUnion() throws Exception { + Map<Integer, Account> accounts = getAccounts(20, 1, 1000); + + client.cache(CACHE_ACCOUNT).putAll(accounts); + + String text = "INSERT INTO \"trade\".Trade (_key, accountId, stockId, qty, price) " + + "SELECT a._key, a._key, 0, a.depo, 1 FROM \"acc\".Account a " + + "UNION " + + "SELECT 101 + a2._key, a2._key, 1, a2.depo, 1 FROM \"acc\".Account a2"; + + checkUpdate(client.<Integer, Trade>cache(CACHE_TRADE), null, + new SqlFieldsQueryEx(text, false)); + } + + /** + * + * @throws Exception If failed. + */ + public void testInsertFromSelectGroupBy() throws Exception { + Map<Integer, Account> accounts = getAccounts(100, 1, 1000); + Map<Integer, Trade> trades = getTrades(100, 2); + + client.cache(CACHE_ACCOUNT).putAll(accounts); + client.cache(CACHE_TRADE).putAll(trades); + + String text = "INSERT INTO \"rep\".Report (_key, accountId, spends, count) " + + "SELECT accountId, accountId, SUM(qty * price), COUNT(*) " + + "FROM \"trade\".Trade " + + "GROUP BY accountId"; + + checkUpdate(client.<Integer, Report>cache(CACHE_REPORT), null, + new SqlFieldsQueryEx(text, false)); + } + + /** + * + * @throws Exception If failed. + */ + public void testInsertFromSelectDistinct() throws Exception { + Map<Integer, Account> accounts = getAccounts(100, 2, 100); + + client.cache(CACHE_ACCOUNT).putAll(accounts); + + String text = "INSERT INTO \"list\".String (_key, _val) " + + "SELECT DISTINCT sn, name FROM \"acc\".Account "; + + checkUpdate(client.<Integer, String>cache(CACHE_LIST), null, + new SqlFieldsQueryEx(text, false)); + } + + /** + * + * @throws Exception If failed. + */ + public void testInsertFromSelectJoin() throws Exception { + Map<Integer, Account> accounts = getAccounts(100, 1, 100); + Map<Integer, Stock> stocks = getStocks(5); + + client.cache(CACHE_ACCOUNT).putAll(accounts); + client.cache(CACHE_STOCK).putAll(stocks); + + String text = "INSERT INTO \"trade\".Trade(_key, accountId, stockId, qty, price) " + + "SELECT 5*a._key + s._key, a._key, s._key, ?, a.depo/? " + + "FROM \"acc\".Account a JOIN \"stock\".Stock s ON 1=1"; + + checkUpdate(client.<Integer, Trade>cache(CACHE_TRADE), null, + new SqlFieldsQueryEx(text, false).setArgs(10, 10)); + } + + /** + * + * @throws Exception If failed. + */ + public void testDelete() throws Exception { + Map<Integer, Account> accounts = getAccounts(100, 1, 100); + + client.cache(CACHE_ACCOUNT).putAll(accounts); + + String text = "DELETE FROM \"acc\".Account WHERE sn > ?"; + + checkUpdate(client.<Integer, Account>cache(CACHE_ACCOUNT), accounts, + new SqlFieldsQueryEx(text, false).setArgs(10)); + } + + /** + * + * @throws Exception If failed. + */ + public void testDeleteTop() throws Exception { + Map<Integer, Account> accounts = getAccounts(100, 1, 100); + + client.cache(CACHE_ACCOUNT).putAll(accounts); + + String text = "DELETE TOP ? FROM \"acc\".Account WHERE sn < ?"; + + checkUpdate(client.<Integer, Account>cache(CACHE_ACCOUNT), accounts, + new SqlFieldsQueryEx(text, false).setArgs(10, 10)); + } + + /** + * + * @throws Exception If failed. + */ + public void testDeleteWhereSubquery() throws Exception { + Map<Integer, Account> accounts = getAccounts(20, 1, 100); + Map<Integer, Trade> trades = getTrades(10, 2); + + client.cache(CACHE_ACCOUNT).putAll(accounts); + client.cache(CACHE_TRADE).putAll(trades); + + String text = "DELETE FROM \"acc\".Account " + + "WHERE _key IN (SELECT t.accountId FROM \"trade\".Trade t)"; + + checkUpdate(client.<Integer, Account>cache(CACHE_ACCOUNT), accounts, + new SqlFieldsQueryEx(text, false)); + } + + /** + * + * @throws Exception If failed. + */ + public void testMergeValues() throws Exception { + Map<Integer, Account> accounts = getAccounts(1, 1, 100); + + String text = "MERGE INTO \"acc\".Account (_key, name, sn, depo)" + + " VALUES (?, ?, ?, ?), (?, ?, ?, ?)"; + + checkUpdate(client.<Integer, Account>cache(CACHE_ACCOUNT), accounts, + new SqlFieldsQueryEx(text, false).setArgs(0, "John Marry", 11111, 100, 1, "Marry John", 11112, 200)); + } + + /** + * + * @throws Exception If failed. + */ + public void testMergeFromSelectJoin() throws Exception { + Map<Integer, Account> accounts = getAccounts(100, 1, 100); + Map<Integer, Stock> stocks = getStocks(5); + + client.cache(CACHE_ACCOUNT).putAll(accounts); + client.cache(CACHE_STOCK).putAll(stocks); + + Map<Integer, Trade> trades = new HashMap<>(); + + trades.put(5, new Trade(1, 1, 1, 1)); + + String text = "MERGE INTO \"trade\".Trade(_key, accountId, stockId, qty, price) " + + "SELECT 5*a._key + s._key, a._key, s._key, ?, a.depo/? " + + "FROM \"acc\".Account a JOIN \"stock\".Stock s ON 1=1"; + + checkUpdate(client.<Integer, Trade>cache(CACHE_TRADE), trades, + new SqlFieldsQueryEx(text, false).setArgs(10, 10)); + } + + /** + * + * @throws Exception If failed. + */ + public void testMergeFromSelectOrderBy() throws Exception { + Map<Integer, Account> accounts = getAccounts(100, 1, 1000); + + client.cache(CACHE_ACCOUNT).putAll(accounts); + + Map<Integer, Trade> trades = new HashMap<>(); + + trades.put(5, new Trade(1, 1, 1, 1)); + + String text = "MERGE INTO \"trade\".Trade (_key, accountId, stockId, qty, price) " + + "SELECT a._key, a._key, ?, a.depo/?, ? FROM \"acc\".Account a " + + "ORDER BY a.sn DESC"; + + checkUpdate(client.<Integer, Trade>cache(CACHE_TRADE), trades, + new SqlFieldsQueryEx(text, false).setArgs(1, 10, 10)); + } + + /** + * + * @throws Exception If failed. + */ + public void testMergeFromSelectGroupBy() throws Exception { + Map<Integer, Account> accounts = getAccounts(100, 1, 1000); + Map<Integer, Trade> trades = getTrades(100, 2); + + client.cache(CACHE_ACCOUNT).putAll(accounts); + client.cache(CACHE_TRADE).putAll(trades); + + Map<Integer, Report> reports = new HashMap<>(); + + reports.put(5, new Report(5, 1, 1)); + + String text = "MERGE INTO \"rep\".Report (_key, accountId, spends, count) " + + "SELECT accountId, accountId, SUM(qty * price), COUNT(*) " + + "FROM \"trade\".Trade " + + "GROUP BY accountId"; + + checkUpdate(client.<Integer, Report>cache(CACHE_REPORT), reports, + new SqlFieldsQueryEx(text, false)); + } + + /** + * Constructs multiple Account objects. + * + * @param num Number of accounts. + * @param numCopy Number of copies. + * @param depo Deposit amount. + * @return Map of accounts. + */ + private Map<Integer, Account> getAccounts(int num, int numCopy, int depo) { + Map<Integer, Account> res = new HashMap<>(); + + int count = 0; + + for (int i = 0; i < num; ++i) { + String name = "John doe #" + i; + + for (int j = 0; j < numCopy; ++j) + res.put(count++, new Account(name, i, depo)); + } + + return res; + } + + /** + * Constructs multiple Stock objects. + * + * @param num Number of stocks. + * @return Map of Stock objects. + */ + private Map<Integer, Stock> getStocks(int num) { + Map<Integer, Stock> res = new HashMap<>(); + + for (int i = 0; i < num; ++i) + res.put(i, new Stock("T" + i, "Stock #" + i)); + + return res; + } + + /** + * Constructs multiple Trade objects. + * + * @param numAccounts Number of accounts. + * @param numStocks Number of stocks. + * @return Map of Trade objects. + */ + private Map<Integer, Trade> getTrades(int numAccounts, int numStocks) { + Map<Integer, Trade> res = new HashMap<>(); + + int count = 0; + + for (int i = 0; i < numAccounts; ++i) { + for (int j = 0; j < numStocks; ++j) { + res.put(count++, new Trade(i, j, 100, 100)); + } + } + + return res; + } + + /** + * Executes provided sql update with skipReducerOnUpdate flag on and off and checks results are the same. + * + * @param cache Cache. + * @param initial Initial content of the cache. + * @param qry Query to execute. + * @param <K> Key type. + * @param <V> Value type. + */ + private <K, V> void checkUpdate(IgniteCache<K, V> cache, Map<K, V> initial, SqlFieldsQueryEx qry) { + cache.clear(); + + if (!F.isEmpty(initial)) + cache.putAll(initial); + + List<List<?>> updRes = cache.query(qry.setSkipReducerOnUpdate(true)).getAll(); + + Map<K, V> result = new HashMap<>(cache.size()); + + for (Cache.Entry<K, V> e : cache) + result.put(e.getKey(), e.getValue()); + + cache.clear(); + + if (!F.isEmpty(initial)) + cache.putAll(initial); + + List<List<?>> updRes2 = cache.query(qry.setSkipReducerOnUpdate(false)).getAll(); + + assertTrue(((Number)updRes.get(0).get(0)).intValue() > 0); + + assertEquals(((Number)updRes.get(0).get(0)).intValue(), ((Number)updRes2.get(0).get(0)).intValue()); + + assertEquals(result.size(), cache.size()); + + for (Cache.Entry<K, V> e : cache) + assertEquals(e.getValue(), result.get(e.getKey())); + } + + /** */ + public class Account { + /** */ + @QuerySqlField + String name; + + /** */ + @QuerySqlField + int sn; + + /** */ + @QuerySqlField + int depo; + + /** + * Constructor. + * + * @param name Name. + * @param sn ID. + * @param depo Deposit amount. + */ + Account(String name, int sn, int depo) { + this.name = name; + this.sn = sn; + this.depo = depo; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return (name == null ? 0 : name.hashCode()) ^ sn ^ depo; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + if (obj == null) + return false; + + if (!obj.getClass().equals(Account.class)) + return false; + + Account other = (Account)obj; + + return F.eq(name, other.name) && sn == other.sn && depo == other.depo; + } + } + + /** */ + public class Stock { + /** */ + @QuerySqlField + String ticker; + + /** */ + @QuerySqlField + String name; + + /** + * Constructor. + * + * @param ticker Short name. + * @param name Name. + */ + Stock(String ticker, String name) { + this.ticker = ticker; + this.name = name; + } + } + + /** */ + public class Trade { + /** */ + @QuerySqlField + int accountId; + + /** */ + @QuerySqlField + int stockId; + + /** */ + @QuerySqlField + int qty; + + /** */ + @QuerySqlField + int price; + + /** + * Constructor. + * + * @param accountId Account id. + * @param stockId Stock id. + * @param qty Quantity. + * @param price Price. + */ + Trade(int accountId, int stockId, int qty, int price) { + this.accountId = accountId; + this.stockId = stockId; + this.qty = qty; + this.price = price; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return accountId ^ stockId ^ qty ^ price; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + if (obj == null) + return false; + + if (!obj.getClass().equals(Trade.class)) + return false; + + Trade other = (Trade)obj; + + return accountId == other.accountId && stockId == other.stockId && + qty == other.qty && price == other.price; + } + + } + + /** */ + public class Report { + /** */ + @QuerySqlField + int accountId; + + /** */ + @QuerySqlField + int spends; + + /** */ + @QuerySqlField + int count; + + /** + * Constructor. + * + * @param accountId Account id. + * @param spends Spends. + * @param count Count. + */ + Report(int accountId, int spends, int count) { + this.accountId = accountId; + this.spends = spends; + this.count = count; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return accountId ^ spends ^ count; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + if (obj == null) + return false; + + if (!obj.getClass().equals(Report.class)) + return false; + + Report other = (Report)obj; + + return accountId == other.accountId && spends == other.spends && + count == other.count; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlSelfTest.java new file mode 100644 index 0000000..a2a6bf8 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlSelfTest.java @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import javax.cache.CacheException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheKeyConfiguration; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.affinity.AffinityKeyMapped; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.annotations.QuerySqlField; +import org.apache.ignite.cache.query.annotations.QuerySqlFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.CacheQueryExecutedEvent; +import org.apache.ignite.events.Event; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx; +import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; +import org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor; +import org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jsr166.ThreadLocalRandom8; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED; + +/** + * Tests for distributed DML. + */ +@SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"}) +public class IgniteSqlSkipReducerOnUpdateDmlSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static int NODE_COUNT = 4; + + /** */ + private static String NODE_CLIENT = "client"; + + /** */ + private static String CACHE_ORG = "org"; + + /** */ + private static String CACHE_PERSON = "person"; + + /** */ + private static String CACHE_POSITION = "pos"; + + /** */ + private static Ignite client; + + /** */ + private static CountDownLatch latch; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + c.setDiscoverySpi(disco); + + List<CacheConfiguration> ccfgs = new ArrayList<>(); + + ccfgs.add(buildCacheConfiguration(CACHE_ORG)); + ccfgs.add(buildCacheConfiguration(CACHE_PERSON)); + ccfgs.add(buildCacheConfiguration(CACHE_POSITION)); + + c.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()])); + + c.setLongQueryWarningTimeout(10000); + + if (gridName.equals(NODE_CLIENT)) + c.setClientMode(true); + + return c; + } + + /** + * Creates cache configuration. + * + * @param name Cache name. + * @return Cache configuration. + */ + private CacheConfiguration buildCacheConfiguration(String name) { + if (name.equals(CACHE_ORG)) { + CacheConfiguration ccfg = new CacheConfiguration(CACHE_ORG); + + ccfg.setCacheMode(CacheMode.PARTITIONED); + + QueryEntity entity = new QueryEntity(Integer.class, Organization.class); + + ccfg.setQueryEntities(Collections.singletonList(entity)); + + ccfg.setSqlFunctionClasses(IgniteSqlSkipReducerOnUpdateDmlSelfTest.class); + + return ccfg; + } + if (name.equals(CACHE_PERSON)) { + CacheConfiguration ccfg = new CacheConfiguration(CACHE_PERSON); + + ccfg.setCacheMode(CacheMode.PARTITIONED); + + QueryEntity entity = new QueryEntity(PersonKey.class, Person.class); + + ccfg.setQueryEntities(Collections.singletonList(entity)); + + ccfg.setKeyConfiguration(new CacheKeyConfiguration(PersonKey.class)); + + ccfg.setSqlFunctionClasses(IgniteSqlSkipReducerOnUpdateDmlSelfTest.class); + + return ccfg; + } + if (name.equals(CACHE_POSITION)) { + CacheConfiguration ccfg = new CacheConfiguration(CACHE_POSITION); + + ccfg.setCacheMode(CacheMode.REPLICATED); + + QueryEntity entity = new QueryEntity(Integer.class, Position.class); + + ccfg.setQueryEntities(Collections.singletonList(entity)); + + ccfg.setSqlFunctionClasses(IgniteSqlSkipReducerOnUpdateDmlSelfTest.class); + + return ccfg; + } + + assert false; + + return null; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(NODE_COUNT); + + client = startGrid(NODE_CLIENT); + + awaitPartitionMapExchange(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + checkNoLeaks(); + + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + // Stop additional node that is started in one of the test. + stopGrid(NODE_COUNT + 1); + + awaitPartitionMapExchange(); + + client.cache(CACHE_PERSON).clear(); + client.cache(CACHE_ORG).clear(); + client.cache(CACHE_POSITION).clear(); + } + + /** + * + * @throws Exception if failed. + */ + public void testSimpleUpdateDistributedReplicated() throws Exception { + fillCaches(); + + IgniteCache<Integer, Position> cache = grid(NODE_CLIENT).cache(CACHE_POSITION); + + Position p = cache.get(1); + + List<List<?>> r = cache.query(new SqlFieldsQueryEx("UPDATE Position p SET name = CONCAT('A ', name)", false) + .setSkipReducerOnUpdate(true)).getAll(); + + assertEquals((long)cache.size(), r.get(0).get(0)); + + assertEquals(cache.get(1).name, "A " + p.name); + } + + /** + * + * @throws Exception if failed. + */ + public void testSimpleUpdateDistributedPartitioned() throws Exception { + fillCaches(); + + IgniteCache<PersonKey, Person> cache = grid(NODE_CLIENT).cache(CACHE_PERSON); + + List<List<?>> r = cache.query(new SqlFieldsQueryEx( + "UPDATE Person SET position = CASEWHEN(position = 1, 1, position - 1)", false) + .setSkipReducerOnUpdate(true)).getAll(); + + assertEquals((long)cache.size(), r.get(0).get(0)); + } + + /** + * + * @throws Exception if failed. + */ + public void testDistributedUpdateFailedKeys() throws Exception { + // UPDATE can produce failed keys due to concurrent modification + fillCaches(); + + final IgniteCache<Integer, Organization> cache = grid(NODE_CLIENT).cache(CACHE_ORG); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() { + return cache.query(new SqlFieldsQueryEx("UPDATE Organization SET rate = Modify(_key, rate - 1)", false) + .setSkipReducerOnUpdate(true)); + } + }, CacheException.class, "Failed to update some keys because they had been modified concurrently"); + } + + /** + * + * @throws Exception if failed. + */ + public void testDistributedUpdateFail() throws Exception { + fillCaches(); + + final IgniteCache cache = grid(NODE_CLIENT).cache(CACHE_PERSON); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() { + return cache.query(new SqlFieldsQueryEx("UPDATE Person SET name = Fail(name)", false) + .setSkipReducerOnUpdate(true)); + } + }, CacheException.class, "Failed to execute SQL query"); + } + + /** + * + * @throws Exception if failed. + */ + @SuppressWarnings("ConstantConditions") + public void testQueryParallelism() throws Exception { + String cacheName = CACHE_ORG + "x4"; + + CacheConfiguration cfg = buildCacheConfiguration(CACHE_ORG) + .setQueryParallelism(4) + .setName(cacheName); + + IgniteCache<Integer, Organization> cache = grid(NODE_CLIENT).createCache(cfg); + + for (int i = 0; i < 1024; i++) + cache.put(i, new Organization("Acme Inc #" + i, 0)); + + List<List<?>> r = cache.query(new SqlFieldsQueryEx("UPDATE \"" + cacheName + + "\".Organization o SET name = UPPER(name)", false).setSkipReducerOnUpdate(true)).getAll(); + + assertEquals((long)cache.size(), r.get(0).get(0)); + } + + /** + * + * @throws Exception if failed. + */ + public void testEvents() throws Exception { + final CountDownLatch latch = new CountDownLatch(NODE_COUNT); + + final IgnitePredicate<Event> pred = new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + assert evt instanceof CacheQueryExecutedEvent; + + CacheQueryExecutedEvent qe = (CacheQueryExecutedEvent)evt; + + assertNotNull(qe.clause()); + + latch.countDown(); + + return true; + } + }; + + for (int idx = 0; idx < NODE_COUNT; idx++) + grid(idx).events().localListen(pred, EVT_CACHE_QUERY_EXECUTED); + + IgniteCache<Integer, Organization> cache = grid(NODE_CLIENT).cache(CACHE_ORG); + + for (int i = 0; i < 1024; i++) + cache.put(i, new Organization("Acme Inc #" + i, 0)); + + cache.query(new SqlFieldsQueryEx("UPDATE \"org\".Organization o SET name = UPPER(name)", false) + .setSkipReducerOnUpdate(true)).getAll(); + + assertTrue(latch.await(5000, MILLISECONDS)); + + for (int idx = 0; idx < NODE_COUNT; idx++) + grid(idx).events().stopLocalListen(pred); + } + + /** + * + * @throws Exception if failed. + */ + public void testSpecificPartitionsUpdate() throws Exception { + fillCaches(); + + Affinity aff = grid(NODE_CLIENT).affinity(CACHE_PERSON); + + int numParts = aff.partitions(); + int parts[] = new int[numParts / 2]; + + for (int idx = 0; idx < numParts / 2; idx++) + parts[idx] = idx * 2; + + IgniteCache<PersonKey, Person> cache = grid(NODE_CLIENT).cache(CACHE_PERSON); + + // UPDATE over even partitions + cache.query(new SqlFieldsQueryEx("UPDATE Person SET position = 0", false) + .setSkipReducerOnUpdate(true) + .setPartitions(parts)); + + List<List<?>> rows = cache.query(new SqlFieldsQuery("SELECT _key, position FROM Person")).getAll(); + + for (List<?> row : rows) { + PersonKey personKey = (PersonKey)row.get(0); + int pos = ((Number)row.get(1)).intValue(); + int part = aff.partition(personKey); + + assertTrue((part % 2 == 0) ^ (pos != 0)); + } + } + + /** + * + * @throws Exception if failed. + */ + public void testCancel() throws Exception { + latch = new CountDownLatch(NODE_COUNT + 1); + + fillCaches(); + + final IgniteCache<Integer, Organization> cache = grid(NODE_CLIENT).cache(CACHE_ORG); + + final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() { + return cache.query(new SqlFieldsQueryEx("UPDATE Organization SET name = WAIT(name)", false) + .setSkipReducerOnUpdate(true)); + } + }); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + Collection<GridRunningQueryInfo> qCol = + grid(NODE_CLIENT).context().query().runningQueries(0); + + if (qCol.isEmpty()) + return false; + + for (GridRunningQueryInfo queryInfo : qCol) + queryInfo.cancel(); + + return true; + } + }, 5000); + + latch.await(5000, MILLISECONDS); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws IgniteCheckedException { + return fut.get(); + } + }, IgniteCheckedException.class, "Future was cancelled"); + } + + /** + * + * @throws Exception if failed. + */ + public void testNodeStopDuringUpdate() throws Exception { + startGrid(NODE_COUNT + 1); + + awaitPartitionMapExchange(); + + fillCaches(); + + latch = new CountDownLatch(NODE_COUNT + 1 + 1); + + final IgniteCache<Integer, Organization> cache = grid(NODE_CLIENT).cache(CACHE_ORG); + + final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() { + return cache.query(new SqlFieldsQueryEx("UPDATE Organization SET name = WAIT(name)", false) + .setSkipReducerOnUpdate(true)); + } + }); + + final CountDownLatch finalLatch = latch; + + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return finalLatch.getCount() == 1; + } + }, 5000)); + + latch.countDown(); + + stopGrid(NODE_COUNT + 1); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws IgniteCheckedException { + return fut.get(); + } + }, IgniteCheckedException.class, "Update failed because map node left topology"); + } + + /** + * Ensure there are no leaks in data structures associated with distributed dml execution. + */ + private void checkNoLeaks() { + GridQueryProcessor qryProc = grid(NODE_CLIENT).context().query(); + + IgniteH2Indexing h2Idx = GridTestUtils.getFieldValue(qryProc, GridQueryProcessor.class, "idx"); + + GridReduceQueryExecutor rdcQryExec = GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "rdcQryExec"); + + Map updRuns = GridTestUtils.getFieldValue(rdcQryExec, GridReduceQueryExecutor.class, "updRuns"); + + assertEquals(0, updRuns.size()); + + for (int idx = 0; idx < NODE_COUNT; idx++) { + qryProc = grid(idx).context().query(); + + h2Idx = GridTestUtils.getFieldValue(qryProc, GridQueryProcessor.class, "idx"); + + GridMapQueryExecutor mapQryExec = GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec"); + + Map qryRess = GridTestUtils.getFieldValue(mapQryExec, GridMapQueryExecutor.class, "qryRess"); + + for (Object obj : qryRess.values()) { + Map updCancels = GridTestUtils.getFieldValue(obj, "updCancels"); + + assertEquals(0, updCancels.size()); + } + } + } + + /** + * Fills caches with initial data. + */ + private void fillCaches() { + Ignite client = grid(NODE_CLIENT); + + IgniteCache<Integer, Position> posCache = client.cache(CACHE_POSITION); + + // Generate positions + Position[] positions = new Position[] { + new Position(1, "High Ranking Officer", 1), + new Position(2, "Administrative worker", 3), + new Position(3, "Worker", 7), + new Position(4, "Security", 2), + new Position(5, "Cleaner", 1) + }; + + for (Position pos: positions) + posCache.put(pos.id, pos); + + // Generate organizations + String[] forms = new String[] {" Inc", " Co", " AG", " Industries"}; + String[] orgNames = new String[] {"Acme", "Sierra", "Mesa", "Umbrella", "Robotics"}; + String[] names = new String[] {"Mary", "John", "William", "Tom", "Basil", "Ann", "Peter"}; + + IgniteCache<PersonKey, Person> personCache = client.cache(CACHE_PERSON); + + IgniteCache<Integer, Organization> orgCache = client.cache(CACHE_ORG); + + int orgId = 0; + int personId = 0; + + for (String orgName : produceCombination(orgNames, orgNames, forms)) { + Organization org = new Organization(orgName, 1 + orgId); + + orgCache.put(++orgId, org); + + // Generate persons + + List<String> personNames = produceCombination(names, names, new String[]{"s"}); + + int positionId = 0; + int posCounter = 0; + + for (String name : personNames) { + PersonKey pKey = new PersonKey(orgId, ++personId); + + if (positions[positionId].rate < posCounter++) { + posCounter = 0; + positionId = (positionId + 1) % positions.length; + } + + Person person = new Person(name, positions[positionId].id, org.rate * positions[positionId].rate); + + personCache.put(pKey, person); + } + } + } + + /** + * Produces all possible combinations. + * + * @param a First array. + * @param b Second array. + * @param ends Endings array. + * @return Result. + */ + private List<String> produceCombination(String[] a, String[] b, String[] ends) { + List<String> res = new ArrayList<>(); + + for (String s1 : a) { + for (String s2 : b) { + if (!s1.equals(s2)) { + String end = ends[ThreadLocalRandom8.current().nextInt(ends.length)]; + + res.add(s1 + " " + s2 + end); + } + } + } + + return res; + } + + /** */ + private static class Organization { + /** */ + @QuerySqlField + String name; + + /** */ + @QuerySqlField + int rate; + + /** */ + @QuerySqlField + Date updated; + + /** + * Constructor. + * + * @param name Organization name. + * @param rate Rate. + */ + public Organization(String name, int rate) { + this.name = name; + this.rate = rate; + this.updated = new Date(System.currentTimeMillis()); + } + } + + /** */ + public static class PersonKey { + /** */ + @AffinityKeyMapped + @QuerySqlField + private Integer orgId; + + /** */ + @QuerySqlField + private Integer id; + + /** + * Constructor. + * + * @param orgId Organization id. + * @param id Person id. + */ + PersonKey(int orgId, int id) { + this.orgId = orgId; + this.id = id; + } + } + + /** */ + public static class Person { + /** */ + @QuerySqlField + String name; + + /** */ + @QuerySqlField + int position; + + /** */ + @QuerySqlField + int amount; + /** */ + @QuerySqlField + Date updated; + + /** + * Constructor. + * + * @param name Name. + * @param position Position. + * @param amount Amount. + */ + private Person(String name, int position, int amount) { + this.name = name; + this.position = position; + this.amount = amount; + + this.updated = new Date(System.currentTimeMillis()); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return (name==null? 0: name.hashCode()) ^ position ^ amount ^ (updated == null ? 0 : updated.hashCode()); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + if (obj == null) + return false; + + if (!obj.getClass().equals(Person.class)) + return false; + + Person other = (Person)obj; + + return F.eq(name, other.name) && position == other.position && + amount == other.amount && F.eq(updated, other.updated); + } + } + + /** */ + private static class Position { + /** */ + @QuerySqlField + int id; + + /** */ + @QuerySqlField + String name; + + /** */ + @QuerySqlField + int rate; + + /** + * Constructor. + * + * @param id Id. + * @param name Name. + * @param rate Rate. + */ + public Position(int id, String name, int rate) { + this.id = id; + this.name = name; + this.rate = rate; + } + } + + /** + * SQL function that always fails. + * + * @param param Arbitrary parameter. + * @return Result. + */ + @QuerySqlFunction + public static String Fail(String param) { + throw new IgniteSQLException("Fail() called"); + } + + /** + * SQL function that waits for condition. + * + * @param param Arbitrary parameter. + * @return Result. + */ + @QuerySqlFunction + public static String Wait(String param) { + try { + if (latch.getCount() > 0) { + latch.countDown(); + + latch.await(5000, MILLISECONDS); + } + else + Thread.sleep(100); + } + catch (InterruptedException ignore) { + // No-op + } + return param; + } + + /** + * SQL function that makes a concurrent modification. + * + * @param id Id. + * @param rate Rate. + * @return Result. + */ + @QuerySqlFunction + public static int Modify(final int id, final int rate) { + try { + GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() { + IgniteCache cache = client.cache(CACHE_ORG); + + cache.put(id, new Organization("Acme Inc #" + id, rate + 1)); + + return null; + } + }).get(); + } + catch (Exception e) { + // No-op + } + + return rate - 1; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index c49649b..83b4689 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -123,9 +123,11 @@ import org.apache.ignite.internal.processors.cache.query.IgniteCacheQueryCacheDe import org.apache.ignite.internal.processors.cache.query.IndexingSpiQuerySelfTest; import org.apache.ignite.internal.processors.cache.query.IndexingSpiQueryTxSelfTest; import org.apache.ignite.internal.processors.client.ClientConnectorConfigurationValidationSelfTest; +import org.apache.ignite.internal.processors.query.IgniteSqlSkipReducerOnUpdateDmlFlagSelfTest; import org.apache.ignite.internal.processors.query.IgniteSqlParameterizedQueryTest; import org.apache.ignite.internal.processors.query.h2.IgniteSqlBigIntegerKeyTest; import org.apache.ignite.internal.processors.query.IgniteQueryDedicatedPoolTest; +import org.apache.ignite.internal.processors.query.IgniteSqlSkipReducerOnUpdateDmlSelfTest; import org.apache.ignite.internal.processors.query.IgniteSqlEntryCacheModeAgnosticTest; import org.apache.ignite.internal.processors.query.IgniteSqlKeyValueFieldsTest; import org.apache.ignite.internal.processors.query.IgniteSqlNotNullConstraintTest; @@ -243,6 +245,8 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(IgniteCacheInsertSqlQuerySelfTest.class); suite.addTestSuite(IgniteCacheUpdateSqlQuerySelfTest.class); suite.addTestSuite(IgniteCacheDeleteSqlQuerySelfTest.class); + suite.addTestSuite(IgniteSqlSkipReducerOnUpdateDmlSelfTest.class); + suite.addTestSuite(IgniteSqlSkipReducerOnUpdateDmlFlagSelfTest.class); suite.addTestSuite(IgniteBinaryObjectQueryArgumentsTest.class); suite.addTestSuite(IgniteBinaryObjectLocalQueryArgumentsTest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/platforms/cpp/odbc-test/src/configuration_test.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc-test/src/configuration_test.cpp b/modules/platforms/cpp/odbc-test/src/configuration_test.cpp index 7da6757..3165c4d 100644 --- a/modules/platforms/cpp/odbc-test/src/configuration_test.cpp +++ b/modules/platforms/cpp/odbc-test/src/configuration_test.cpp @@ -43,6 +43,7 @@ namespace const bool testReplicatedOnly = true; const bool testCollocated = true; const bool testLazy = true; + const bool testSkipReducerOnUpdate = true; const std::string testAddress = testServerHost + ':' + ignite::common::LexicalCast<std::string>(testServerPort); } @@ -132,6 +133,7 @@ void CheckConnectionConfig(const Configuration& cfg) BOOST_CHECK_EQUAL(cfg.IsReplicatedOnly(), testReplicatedOnly); BOOST_CHECK_EQUAL(cfg.IsCollocated(), testCollocated); BOOST_CHECK_EQUAL(cfg.IsLazy(), testLazy); + BOOST_CHECK_EQUAL(cfg.IsSkipReducerOnUpdate(), testSkipReducerOnUpdate); std::stringstream constructor; @@ -143,7 +145,8 @@ void CheckConnectionConfig(const Configuration& cfg) << "lazy=" << BoolToStr(testLazy) << ';' << "page_size=" << testPageSize << ';' << "replicated_only=" << BoolToStr(testReplicatedOnly) << ';' - << "schema=" << testSchemaName << ';'; + << "schema=" << testSchemaName << ';' + << "skip_reducer_on_update=" << BoolToStr(testReplicatedOnly) << ';'; const std::string& expectedStr = constructor.str(); @@ -164,6 +167,7 @@ void CheckDsnConfig(const Configuration& cfg) BOOST_CHECK_EQUAL(cfg.IsReplicatedOnly(), false); BOOST_CHECK_EQUAL(cfg.IsCollocated(), false); BOOST_CHECK_EQUAL(cfg.IsLazy(), false); + BOOST_CHECK_EQUAL(cfg.IsSkipReducerOnUpdate(), false); } BOOST_AUTO_TEST_SUITE(ConfigurationTestSuite) @@ -180,6 +184,8 @@ BOOST_AUTO_TEST_CASE(CheckTestValuesNotEquealDefault) BOOST_CHECK_NE(testEnforceJoinOrder, Configuration::DefaultValue::enforceJoinOrder); BOOST_CHECK_NE(testReplicatedOnly, Configuration::DefaultValue::replicatedOnly); BOOST_CHECK_NE(testCollocated, Configuration::DefaultValue::collocated); + BOOST_CHECK_NE(testLazy, Configuration::DefaultValue::lazy); + BOOST_CHECK_NE(testSkipReducerOnUpdate, Configuration::DefaultValue::skipReducerOnUpdate); } BOOST_AUTO_TEST_CASE(TestConnectStringUppercase) @@ -196,7 +202,8 @@ BOOST_AUTO_TEST_CASE(TestConnectStringUppercase) << "COLLOCATED=" << BoolToStr(testCollocated, false) << ';' << "REPLICATED_ONLY=" << BoolToStr(testReplicatedOnly, false) << ';' << "PAGE_SIZE=" << testPageSize << ';' - << "SCHEMA=" << testSchemaName; + << "SCHEMA=" << testSchemaName << ';' + << "SKIP_REDUCER_ON_UPDATE=" << BoolToStr(testSkipReducerOnUpdate, false); const std::string& connectStr = constructor.str(); @@ -219,7 +226,8 @@ BOOST_AUTO_TEST_CASE(TestConnectStringLowercase) << "enforce_join_order=" << BoolToStr(testEnforceJoinOrder) << ';' << "replicated_only=" << BoolToStr(testReplicatedOnly) << ';' << "collocated=" << BoolToStr(testCollocated) << ';' - << "schema=" << testSchemaName; + << "schema=" << testSchemaName << ';' + << "skip_reducer_on_update=" << BoolToStr(testSkipReducerOnUpdate); const std::string& connectStr = constructor.str(); @@ -242,7 +250,8 @@ BOOST_AUTO_TEST_CASE(TestConnectStringZeroTerminated) << "collocated=" << BoolToStr(testCollocated) << ';' << "distributed_joins=" << BoolToStr(testDistributedJoins) << ';' << "enforce_join_order=" << BoolToStr(testEnforceJoinOrder) << ';' - << "schema=" << testSchemaName; + << "schema=" << testSchemaName << ';' + << "skip_reducer_on_update=" << BoolToStr(testSkipReducerOnUpdate); const std::string& connectStr = constructor.str(); @@ -265,7 +274,8 @@ BOOST_AUTO_TEST_CASE(TestConnectStringMixed) << "Enforce_Join_Order=" << BoolToStr(testEnforceJoinOrder) << ';' << "Replicated_Only=" << BoolToStr(testReplicatedOnly, false) << ';' << "Collocated=" << BoolToStr(testCollocated) << ';' - << "Schema=" << testSchemaName; + << "Schema=" << testSchemaName << ';' + << "Skip_Reducer_On_Update=" << BoolToStr(testSkipReducerOnUpdate); const std::string& connectStr = constructor.str(); @@ -288,7 +298,8 @@ BOOST_AUTO_TEST_CASE(TestConnectStringWhitepaces) << "COLLOCATED =" << BoolToStr(testCollocated, false) << " ;" << " REPLICATED_ONLY= " << BoolToStr(testReplicatedOnly, false) << ';' << "ENFORCE_JOIN_ORDER= " << BoolToStr(testEnforceJoinOrder, false) << " ;" - << "SCHEMA = \n\r" << testSchemaName; + << "SCHEMA = \n\r" << testSchemaName << ';' + << " skip_reducer_on_update=" << BoolToStr(testSkipReducerOnUpdate, false); const std::string& connectStr = constructor.str(); @@ -358,6 +369,7 @@ BOOST_AUTO_TEST_CASE(TestConnectStringInvalidBoolKeys) keys.insert("replicated_only"); keys.insert("collocated"); keys.insert("lazy"); + keys.insert("skip_reducer_on_update"); for (Set::const_iterator it = keys.begin(); it != keys.end(); ++it) { @@ -385,6 +397,7 @@ BOOST_AUTO_TEST_CASE(TestConnectStringValidBoolKeys) keys.insert("replicated_only"); keys.insert("collocated"); keys.insert("lazy"); + keys.insert("skip_reducer_on_update"); for (Set::const_iterator it = keys.begin(); it != keys.end(); ++it) { http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/platforms/cpp/odbc-test/src/queries_test.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc-test/src/queries_test.cpp b/modules/platforms/cpp/odbc-test/src/queries_test.cpp index 4c7e402..707669d 100644 --- a/modules/platforms/cpp/odbc-test/src/queries_test.cpp +++ b/modules/platforms/cpp/odbc-test/src/queries_test.cpp @@ -755,6 +755,14 @@ BOOST_AUTO_TEST_CASE(TestConnectionProtocolVersion_2_1_5) InsertTestBatch(11, 20, 9); } +BOOST_AUTO_TEST_CASE(TestConnectionProtocolVersion_2_3_0) +{ + Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;SCHEMA=cache;PROTOCOL_VERSION=2.3.0"); + + InsertTestStrings(10, false); + InsertTestBatch(11, 20, 9); +} + BOOST_AUTO_TEST_CASE(TestTwoRowsInt8) { CheckTwoRowsInt<signed char>(SQL_C_STINYINT); http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/platforms/cpp/odbc/include/ignite/odbc/config/configuration.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/config/configuration.h b/modules/platforms/cpp/odbc/include/ignite/odbc/config/configuration.h index 2b1ec52..419a65e 100644 --- a/modules/platforms/cpp/odbc/include/ignite/odbc/config/configuration.h +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/config/configuration.h @@ -82,6 +82,9 @@ namespace ignite /** Connection attribute keyword for lazy attribute. */ static const std::string lazy; + + /** Connection attribute keyword for skipReducerOnUpdate attribute. */ + static const std::string skipReducerOnUpdate; }; /** Default values for configuration. */ @@ -125,6 +128,9 @@ namespace ignite /** Default value for lazy attribute. */ static const bool lazy; + + /** Default value for skipReducerOnUpdate attribute. */ + static const bool skipReducerOnUpdate; }; /** @@ -384,6 +390,26 @@ namespace ignite } /** + * Check update on server flag. + * + * @return True if update on server. + */ + bool IsSkipReducerOnUpdate() const + { + return GetBoolValue(Key::skipReducerOnUpdate, DefaultValue::skipReducerOnUpdate); + } + + /** + * Set update on server. + * + * @param val Value to set. + */ + void SetSkipReducerOnUpdate(bool val) + { + SetBoolValue(Key::skipReducerOnUpdate, val); + } + + /** * Get protocol version. * * @return Protocol version. http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/platforms/cpp/odbc/include/ignite/odbc/message.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/message.h b/modules/platforms/cpp/odbc/include/ignite/odbc/message.h index 91a808c..dda0ba9 100644 --- a/modules/platforms/cpp/odbc/include/ignite/odbc/message.h +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/message.h @@ -79,9 +79,10 @@ namespace ignite * @param replicatedOnly Replicated only flag. * @param collocated Collocated flag. * @param lazy Lazy flag. + * @param skipReducerOnUpdate Skip reducer on update. */ HandshakeRequest(const ProtocolVersion& version, bool distributedJoins, bool enforceJoinOrder, - bool replicatedOnly, bool collocated, bool lazy); + bool replicatedOnly, bool collocated, bool lazy, bool skipReducerOnUpdate); /** * Destructor. @@ -112,6 +113,9 @@ namespace ignite /** Lazy flag. */ bool lazy; + + /** Skip reducer on update flag. */ + bool skipReducerOnUpdate; }; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/platforms/cpp/odbc/include/ignite/odbc/protocol_version.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/protocol_version.h b/modules/platforms/cpp/odbc/include/ignite/odbc/protocol_version.h index c36d5dd..e6088a7 100644 --- a/modules/platforms/cpp/odbc/include/ignite/odbc/protocol_version.h +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/protocol_version.h @@ -34,6 +34,7 @@ namespace ignite /** Current protocol version. */ static const ProtocolVersion VERSION_2_1_0; static const ProtocolVersion VERSION_2_1_5; + static const ProtocolVersion VERSION_2_3_0; typedef std::set<ProtocolVersion> VersionSet; http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/platforms/cpp/odbc/include/ignite/odbc/system/ui/dsn_configuration_window.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/system/ui/dsn_configuration_window.h b/modules/platforms/cpp/odbc/include/ignite/odbc/system/ui/dsn_configuration_window.h index 2974b67..90286b9 100644 --- a/modules/platforms/cpp/odbc/include/ignite/odbc/system/ui/dsn_configuration_window.h +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/system/ui/dsn_configuration_window.h @@ -55,6 +55,7 @@ namespace ignite REPLICATED_ONLY_CHECK_BOX, COLLOCATED_CHECK_BOX, LAZY_CHECK_BOX, + SKIP_REDUCER_ON_UPDATE_CHECK_BOX, PROTOCOL_VERSION_LABEL, PROTOCOL_VERSION_COMBO_BOX, OK_BUTTON, @@ -149,6 +150,9 @@ namespace ignite /** Lazy CheckBox. */ std::auto_ptr<Window> lazyCheckBox; + /** Update on server CheckBox. */ + std::auto_ptr<Window> skipReducerOnUpdateCheckBox; + /** Protocol version edit field. */ std::auto_ptr<Window> protocolVersionLabel;
