GEODE-2062:Added new tests for PDX queries, order by queries and queries using
indexes.
* PDXInstance and PDXFactoryImpl were used to validate multiple class
version test rather than writing dummy PDX classes
* JUnit4CacheTestCase was used instead of Junit3 elements.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/a64f7f60
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/a64f7f60
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/a64f7f60
Branch: refs/heads/develop
Commit: a64f7f60e8d2e45519d7c0c7a52688e70c986a5e
Parents: 4265fa5
Author: nabarun <[email protected]>
Authored: Mon Oct 31 11:16:09 2016 -0700
Committer: nabarun <[email protected]>
Committed: Wed Nov 2 13:19:00 2016 -0700
----------------------------------------------------------------------
.../cache/query/dunit/HashIndexDUnitTest.java | 2 +-
.../dunit/OrderByPartitionedDUnitTest.java | 370 ++
.../cache/query/dunit/PDXQueryTestBase.java | 451 +++
.../PdxGroupByPartitionedQueryDUnitTest.java | 90 +
.../query/dunit/PdxLocalQueryDUnitTest.java | 924 +++++
.../PdxLocalQueryVersionedClassDUnitTest.java | 173 +
.../cache/query/dunit/PdxQueryDUnitTest.java | 3590 ++++++++++++++++++
.../cache/query/dunit/PortfolioPdxVersion.java | 103 +
.../cache/query/dunit/PositionPdxVersion.java | 163 +
.../cache/query/dunit/QueryIndexDUnitTest.java | 1331 +++++++
10 files changed, 7196 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a64f7f60/geode-core/src/test/java/org/apache/geode/cache/query/dunit/HashIndexDUnitTest.java
----------------------------------------------------------------------
diff --git
a/geode-core/src/test/java/org/apache/geode/cache/query/dunit/HashIndexDUnitTest.java
b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/HashIndexDUnitTest.java
index 78b092f..dd4cfc7 100755
---
a/geode-core/src/test/java/org/apache/geode/cache/query/dunit/HashIndexDUnitTest.java
+++
b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/HashIndexDUnitTest.java
@@ -71,7 +71,7 @@ public class HashIndexDUnitTest extends
JUnit4DistributedTestCase {
@Test
public void testHashIndexForConcurrentHashSet() throws Exception {
- doPut(333); // 111 entries for a key in the index (> 100 so creates a
ConcurrentHashSet)
+ doPut(333); // 111 entries for a key in the index (> 100 so creates a
ConcurrentHashSet)//test
doQuery();
doUpdate(333);
doQuery();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a64f7f60/geode-core/src/test/java/org/apache/geode/cache/query/dunit/OrderByPartitionedDUnitTest.java
----------------------------------------------------------------------
diff --git
a/geode-core/src/test/java/org/apache/geode/cache/query/dunit/OrderByPartitionedDUnitTest.java
b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/OrderByPartitionedDUnitTest.java
new file mode 100644
index 0000000..323559e
--- /dev/null
+++
b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/OrderByPartitionedDUnitTest.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.dunit;
+
+import static org.apache.geode.test.dunit.Assert.*;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.query.Index;
+import org.apache.geode.cache.query.IndexExistsException;
+import org.apache.geode.cache.query.IndexInvalidException;
+import org.apache.geode.cache.query.IndexNameConflictException;
+import org.apache.geode.cache.query.IndexType;
+import org.apache.geode.cache.query.RegionNotFoundException;
+import org.apache.geode.cache.query.functional.OrderByPartitionedJUnitTest;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.SerializableRunnable;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
+import org.apache.geode.test.junit.categories.DistributedTest;
+
+@Category(DistributedTest.class)
+public class OrderByPartitionedDUnitTest extends JUnit4CacheTestCase {
+
+ private OrderByPartitionedJUnitTest createTestInstance() {
+ Host host = Host.getHost(0);
+ final VM vm0 = host.getVM(0);
+ final VM vm1 = host.getVM(1);
+ final VM vm2 = host.getVM(2);
+ final VM vm3 = host.getVM(3);
+
+ OrderByPartitionedJUnitTest test = new OrderByPartitionedJUnitTest() {
+ @Override
+ public Region createRegion(String regionName, Class valueConstraint) {
+ // TODO Auto-generated method stub
+ Region rgn = createAccessor(regionName, valueConstraint);
+ createPR(vm1, regionName, valueConstraint);
+ createPR(vm2, regionName, valueConstraint);
+ createPR(vm3, regionName, valueConstraint);
+ return rgn;
+ }
+
+ @Override
+ public Index createIndex(String indexName, String indexedExpression,
String regionPath)
+ throws IndexInvalidException, IndexNameConflictException,
IndexExistsException,
+ RegionNotFoundException, UnsupportedOperationException {
+ Index indx = createIndexOnAccessor(indexName, indexedExpression,
regionPath);
+ return indx;
+ }
+
+ @Override
+ public Index createIndex(String indexName, IndexType indexType, String
indexedExpression,
+ String fromClause) throws IndexInvalidException,
IndexNameConflictException,
+ IndexExistsException, RegionNotFoundException,
UnsupportedOperationException {
+ Index indx = createIndexOnAccessor(indexName, indexType,
indexedExpression, fromClause);
+ return indx;
+ }
+
+ @Override
+ public boolean assertIndexUsedOnQueryNode() {
+ return false;
+ }
+ };
+ return test;
+ }
+
+ @Test
+ public void testOrderByWithIndexResultDefaultProjection() throws Exception {
+ Host host = Host.getHost(0);
+ final VM vm0 = host.getVM(0);
+ final VM vm1 = host.getVM(1);
+ final VM vm2 = host.getVM(2);
+ final VM vm3 = host.getVM(3);
+ Cache cache = this.getCache();
+ OrderByPartitionedJUnitTest test = createTestInstance();
+ test.testOrderByWithIndexResultDefaultProjection();
+ this.closeCache(vm0, vm1, vm2, vm3);
+ }
+
+ @Test
+ public void testOrderByWithIndexResultWithProjection() throws Exception {
+ Host host = Host.getHost(0);
+ final VM vm0 = host.getVM(0);
+ final VM vm1 = host.getVM(1);
+ final VM vm2 = host.getVM(2);
+ final VM vm3 = host.getVM(3);
+ Cache cache = this.getCache();
+ OrderByPartitionedJUnitTest test = createTestInstance();
+ test.testOrderByWithIndexResultWithProjection();
+ this.closeCache(vm0, vm1, vm2, vm3);
+ }
+
+ @Test
+ public void testMultiColOrderByWithIndexResultDefaultProjection() throws
Exception {
+ Host host = Host.getHost(0);
+ final VM vm0 = host.getVM(0);
+ final VM vm1 = host.getVM(1);
+ final VM vm2 = host.getVM(2);
+ final VM vm3 = host.getVM(3);
+ Cache cache = this.getCache();
+ OrderByPartitionedJUnitTest test = createTestInstance();
+ test.testMultiColOrderByWithIndexResultDefaultProjection();
+ this.closeCache(vm0, vm1, vm2, vm3);
+ }
+
+ @Test
+ public void testMultiColOrderByWithIndexResultWithProjection() throws
Exception {
+ Host host = Host.getHost(0);
+ final VM vm0 = host.getVM(0);
+ final VM vm1 = host.getVM(1);
+ final VM vm2 = host.getVM(2);
+ final VM vm3 = host.getVM(3);
+ Cache cache = this.getCache();
+ OrderByPartitionedJUnitTest test = createTestInstance();
+ test.testMultiColOrderByWithIndexResultWithProjection();
+ this.closeCache(vm0, vm1, vm2, vm3);
+ }
+
+ @Test
+ public void testMultiColOrderByWithMultiIndexResultDefaultProjection()
throws Exception {
+ Host host = Host.getHost(0);
+ final VM vm0 = host.getVM(0);
+ final VM vm1 = host.getVM(1);
+ final VM vm2 = host.getVM(2);
+ final VM vm3 = host.getVM(3);
+ Cache cache = this.getCache();
+ OrderByPartitionedJUnitTest test = createTestInstance();
+ test.testMultiColOrderByWithMultiIndexResultDefaultProjection();
+ this.closeCache(vm0, vm1, vm2, vm3);
+ }
+
+ @Test
+ public void testMultiColOrderByWithMultiIndexResultProjection() throws
Exception {
+ Host host = Host.getHost(0);
+ final VM vm0 = host.getVM(0);
+ final VM vm1 = host.getVM(1);
+ final VM vm2 = host.getVM(2);
+ final VM vm3 = host.getVM(3);
+ Cache cache = this.getCache();
+ OrderByPartitionedJUnitTest test = createTestInstance();
+ test.testMultiColOrderByWithMultiIndexResultProjection();
+ this.closeCache(vm0, vm1, vm2, vm3);
+ }
+
+ @Test
+ public void testLimitNotAppliedIfOrderByNotUsingIndex() throws Exception {
+ Host host = Host.getHost(0);
+ final VM vm0 = host.getVM(0);
+ final VM vm1 = host.getVM(1);
+ final VM vm2 = host.getVM(2);
+ final VM vm3 = host.getVM(3);
+ Cache cache = this.getCache();
+ OrderByPartitionedJUnitTest test = createTestInstance();
+ test.testLimitNotAppliedIfOrderByNotUsingIndex();
+ this.closeCache(vm0, vm1, vm2, vm3);
+ }
+
+ @Test
+ public void testOrderByWithNullValuesUseIndex() throws Exception {
+ Host host = Host.getHost(0);
+ final VM vm0 = host.getVM(0);
+ final VM vm1 = host.getVM(1);
+ final VM vm2 = host.getVM(2);
+ final VM vm3 = host.getVM(3);
+ Cache cache = this.getCache();
+ OrderByPartitionedJUnitTest test = createTestInstance();
+ test.testOrderByWithNullValuesUseIndex();
+ this.closeCache(vm0, vm1, vm2, vm3);
+ }
+
+ @Test
+ public void testOrderByForUndefined() throws Exception {
+ Host host = Host.getHost(0);
+ final VM vm0 = host.getVM(0);
+ final VM vm1 = host.getVM(1);
+ final VM vm2 = host.getVM(2);
+ final VM vm3 = host.getVM(3);
+ Cache cache = this.getCache();
+ OrderByPartitionedJUnitTest test = createTestInstance();
+ test.testOrderByForUndefined();
+ this.closeCache(vm0, vm1, vm2, vm3);
+ }
+
+ @Test
+ public void testOrderedResultsPartitionedRegion_Bug43514_1() throws
Exception {
+ Host host = Host.getHost(0);
+ final VM vm0 = host.getVM(0);
+ final VM vm1 = host.getVM(1);
+ final VM vm2 = host.getVM(2);
+ final VM vm3 = host.getVM(3);
+ Cache cache = this.getCache();
+ OrderByPartitionedJUnitTest test = createTestInstance();
+ test.testOrderedResultsPartitionedRegion_Bug43514_1();
+ this.closeCache(vm0, vm1, vm2, vm3);
+ }
+
+ @Test
+ public void testOrderedResultsPartitionedRegion_Bug43514_2() throws
Exception {
+ Host host = Host.getHost(0);
+ final VM vm0 = host.getVM(0);
+ final VM vm1 = host.getVM(1);
+ final VM vm2 = host.getVM(2);
+ final VM vm3 = host.getVM(3);
+ Cache cache = this.getCache();
+ OrderByPartitionedJUnitTest test = createTestInstance();
+ test.testOrderedResultsPartitionedRegion_Bug43514_2();
+ this.closeCache(vm0, vm1, vm2, vm3);
+ }
+
+ @Test
+ public void testOrderByWithNullValues() throws Exception {
+ Host host = Host.getHost(0);
+ final VM vm0 = host.getVM(0);
+ final VM vm1 = host.getVM(1);
+ final VM vm2 = host.getVM(2);
+ final VM vm3 = host.getVM(3);
+ Cache cache = this.getCache();
+ OrderByPartitionedJUnitTest test = createTestInstance();
+ test.testOrderByWithNullValues();
+ this.closeCache(vm0, vm1, vm2, vm3);
+ }
+
+ private void createBuckets(VM vm) {
+ vm.invoke(new SerializableRunnable("create accessor") {
+ public void run() {
+ Cache cache = getCache();
+ Region region = cache.getRegion("region");
+ for (int i = 0; i < 10; i++) {
+ region.put(i, i);
+ }
+ }
+ });
+ }
+
+ private void createPR(VM vm, final String regionName, final Class
valueConstraint) {
+ vm.invoke(new SerializableRunnable("create data store") {
+ public void run() {
+ Cache cache = getCache();
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ paf.setTotalNumBuckets(10);
+
cache.createRegionFactory(RegionShortcut.PARTITION).setValueConstraint(valueConstraint)
+ .setPartitionAttributes(paf.create()).create(regionName);
+ }
+ });
+ }
+
+ private Region createAccessor(String regionName, Class valueConstraint) {
+ Cache cache = getCache();
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ paf.setTotalNumBuckets(10);
+ paf.setLocalMaxMemory(0);
+ return cache.createRegionFactory(RegionShortcut.PARTITION_PROXY)
+
.setValueConstraint(valueConstraint).setPartitionAttributes(paf.create())
+ .create(regionName);
+ }
+
+ private void createIndex(VM vm, final String indexName, final String
indexedExpression,
+ final String regionPath) {
+ vm.invoke(new SerializableRunnable("create index") {
+ public void run() {
+ try {
+ Cache cache = getCache();
+ cache.getQueryService().createIndex(indexName, indexedExpression,
regionPath);
+ } catch (RegionNotFoundException e) {
+ fail(e.toString());
+ } catch (IndexExistsException e) {
+ fail(e.toString());
+ } catch (IndexNameConflictException e) {
+ fail(e.toString());
+ }
+ }
+ });
+ }
+
+ private void createIndex(VM vm, final String indexName, IndexType indexType,
+ final String indexedExpression, final String fromClause) {
+ int indxTypeCode = -1;
+ if (indexType.equals(IndexType.FUNCTIONAL)) {
+ indxTypeCode = 0;
+ } else if (indexType.equals(IndexType.PRIMARY_KEY)) {
+ indxTypeCode = 1;
+ } else if (indexType.equals(IndexType.HASH)) {
+ indxTypeCode = 2;
+ }
+ final int finalIndxTypeCode = indxTypeCode;
+ vm.invoke(new SerializableRunnable("create index") {
+ public void run() {
+ try {
+ Cache cache = getCache();
+ IndexType indxType = null;
+ if (finalIndxTypeCode == 0) {
+ indxType = IndexType.FUNCTIONAL;
+ } else if (finalIndxTypeCode == 1) {
+ indxType = IndexType.PRIMARY_KEY;
+ } else if (finalIndxTypeCode == 2) {
+ indxType = IndexType.HASH;
+ }
+ cache.getQueryService().createIndex(indexName, indxType,
indexedExpression, fromClause);
+ } catch (RegionNotFoundException e) {
+ fail(e.toString());
+ } catch (IndexExistsException e) {
+ fail(e.toString());
+ } catch (IndexNameConflictException e) {
+ fail(e.toString());
+ }
+ }
+ });
+ }
+
+ private Index createIndexOnAccessor(final String indexName, final String
indexedExpression,
+ final String regionPath) {
+ try {
+ Cache cache = getCache();
+ return cache.getQueryService().createIndex(indexName, indexedExpression,
regionPath);
+ } catch (RegionNotFoundException e) {
+ fail(e.toString());
+ } catch (IndexExistsException e) {
+ fail(e.toString());
+ } catch (IndexNameConflictException e) {
+ fail(e.toString());
+ }
+ return null;
+ }
+
+ private Index createIndexOnAccessor(final String indexName, IndexType
indexType,
+ final String indexedExpression, final String fromClause) {
+ try {
+ Cache cache = getCache();
+ return cache.getQueryService().createIndex(indexName, indexType,
indexedExpression,
+ fromClause);
+ } catch (RegionNotFoundException e) {
+ fail(e.toString());
+ } catch (IndexExistsException e) {
+ fail(e.toString());
+ } catch (IndexNameConflictException e) {
+ fail(e.toString());
+ }
+ return null;
+ }
+
+ private void closeCache(VM... vms) {
+ for (VM vm : vms) {
+ vm.invoke(new SerializableRunnable() {
+ public void run() {
+ getCache().close();
+ }
+ });
+ }
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a64f7f60/geode-core/src/test/java/org/apache/geode/cache/query/dunit/PDXQueryTestBase.java
----------------------------------------------------------------------
diff --git
a/geode-core/src/test/java/org/apache/geode/cache/query/dunit/PDXQueryTestBase.java
b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/PDXQueryTestBase.java
new file mode 100644
index 0000000..2636d84
--- /dev/null
+++
b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/PDXQueryTestBase.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.dunit;
+
+import static org.apache.geode.test.dunit.Assert.*;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.geode.LogWriter;
+import org.apache.geode.cache.AttributesFactory;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheException;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.client.PoolFactory;
+import org.apache.geode.cache.client.PoolManager;
+import org.apache.geode.cache.query.CacheUtils;
+import org.apache.geode.cache.query.Query;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.cache.query.Struct;
+import org.apache.geode.cache.query.data.PortfolioPdx;
+import org.apache.geode.cache.query.data.PositionPdx;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.cache30.CacheSerializableRunnable;
+import org.apache.geode.compression.Compressor;
+import org.apache.geode.compression.SnappyCompressor;
+import org.apache.geode.i18n.LogWriterI18n;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.pdx.PdxReader;
+import org.apache.geode.pdx.PdxSerializable;
+import org.apache.geode.pdx.PdxWriter;
+import org.apache.geode.test.dunit.Assert;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.SerializableRunnable;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
+
+public abstract class PDXQueryTestBase extends JUnit4CacheTestCase {
+
+ /** The port on which the bridge server was started in this VM */
+ private static int bridgeServerPort;
+ protected static final Compressor compressor =
SnappyCompressor.getDefaultInstance();
+ protected final String rootRegionName = "root";
+ protected final String regionName = "PdxTest";
+ protected final String regionName2 = "PdxTest2";
+ protected final String regName = "/" + rootRegionName + "/" + regionName;
+ protected final String regName2 = "/" + rootRegionName + "/" + regionName2;
+ protected final String[] queryString = new String[] {"SELECT DISTINCT id
FROM " + regName, // 0
+ "SELECT * FROM " + regName, // 1
+ "SELECT ticker FROM " + regName, // 2
+ "SELECT * FROM " + regName + " WHERE id > 5", // 3
+ "SELECT p FROM " + regName + " p, p.idTickers idTickers WHERE p.ticker =
'vmware'", // 4
+ };
+
+ protected static int getCacheServerPort() {
+ return bridgeServerPort;
+ }
+
+ @Override
+ public final void preTearDownCacheTestCase() throws Exception {
+ preTearDownPDXQueryTestBase();
+ disconnectAllFromDS(); // tests all expect to create a new ds
+ // Reset the testObject numinstance for the next test.
+ TestObject.numInstance = 0;
+ // In all VM.
+ resetTestObjectInstanceCount();
+ }
+
+ protected void preTearDownPDXQueryTestBase() throws Exception {}
+
+ private void resetTestObjectInstanceCount() {
+ final Host host = Host.getHost(0);
+ for (int i = 0; i < 4; i++) {
+ VM vm = host.getVM(i);
+ vm.invoke(new CacheSerializableRunnable("Create Bridge Server") {
+ public void run2() throws CacheException {
+ TestObject.numInstance = 0;
+ PortfolioPdx.numInstance = 0;
+ PositionPdx.numInstance = 0;
+ PositionPdx.cnt = 0;
+ TestObject2.numInstance = 0;
+ }
+ });
+ }
+ }
+
+ public void createPool(VM vm, String poolName, String server, int port,
+ boolean subscriptionEnabled) {
+ createPool(vm, poolName, new String[] {server}, new int[] {port},
subscriptionEnabled);
+ }
+
+ public void createPool(VM vm, String poolName, String server, int port) {
+ createPool(vm, poolName, new String[] {server}, new int[] {port}, false);
+ }
+
+ public void createPool(VM vm, final String poolName, final String[] servers,
final int[] ports,
+ final boolean subscriptionEnabled) {
+ createPool(vm, poolName, servers, ports, subscriptionEnabled, 0);
+ }
+
+ public void createPool(VM vm, final String poolName, final String[] servers,
final int[] ports,
+ final boolean subscriptionEnabled, final int redundancy) {
+ vm.invoke(new CacheSerializableRunnable("createPool :" + poolName) {
+ public void run2() throws CacheException {
+ // Create Cache.
+ Properties props = new Properties();
+ props.setProperty("mcast-port", "0");
+ props.setProperty("locators", "");
+ getSystem(props);
+ getCache();
+ PoolFactory cpf = PoolManager.createFactory();
+ cpf.setSubscriptionEnabled(subscriptionEnabled);
+ cpf.setSubscriptionRedundancy(redundancy);
+ for (int i = 0; i < servers.length; i++) {
+ cpf.addServer(servers[i], ports[i]);
+ }
+ cpf.create(poolName);
+ }
+ });
+ }
+
+ public void executeClientQueries(VM vm, final String poolName, final String
queryStr) {
+ vm.invoke(new CacheSerializableRunnable("Execute queries") {
+ public void run2() throws CacheException {
+ QueryService remoteQueryService = null;
+ QueryService localQueryService = null;
+ SelectResults[][] rs = new SelectResults[1][2];
+
+ try {
+ remoteQueryService = (PoolManager.find(poolName)).getQueryService();
+ localQueryService = getCache().getQueryService();
+ } catch (Exception e) {
+ Assert.fail("Failed to get QueryService.", e);
+ }
+
+ try {
+ Query query = remoteQueryService.newQuery(queryStr);
+ rs[0][0] = (SelectResults) query.execute();
+ query = localQueryService.newQuery(queryStr);
+ rs[0][1] = (SelectResults) query.execute();
+ // Compare local and remote query results.
+ if (!CacheUtils.compareResultsOfWithAndWithoutIndex(rs)) {
+ fail("Local and Remote Query Results are not matching for query :"
+ queryStr);
+ }
+ } catch (Exception e) {
+ Assert.fail("Failed executing " + queryStr, e);
+ }
+ }
+ });
+ }
+
+ public void printResults(SelectResults results, String message) {
+ Object r;
+ Struct s;
+ LogWriterI18n logger = GemFireCacheImpl.getInstance().getLoggerI18n();
+ logger.fine(message);
+ int row = 0;
+ for (Iterator iter = results.iterator(); iter.hasNext();) {
+ r = iter.next();
+ row++;
+ if (r instanceof Struct) {
+ s = (Struct) r;
+ String[] fieldNames = ((Struct) r).getStructType().getFieldNames();
+ for (int i = 0; i < fieldNames.length; i++) {
+ logger.fine("### Row " + row + "\n" + "Field: " + fieldNames[i] + "
> "
+ + s.get(fieldNames[i]).toString());
+ }
+ } else {
+ logger.fine("#### Row " + row + "\n" + r);
+ }
+ }
+ }
+
+ protected void configAndStartBridgeServer() {
+ configAndStartBridgeServer(false, false, false, null);
+ }
+
+ protected void configAndStartBridgeServer(boolean isPr, boolean isAccessor) {
+ configAndStartBridgeServer(false, false, false, null);
+ }
+
+ protected void configAndStartBridgeServer(boolean isPr, boolean isAccessor,
boolean asyncIndex,
+ Compressor compressor) {
+ AttributesFactory factory = new AttributesFactory();
+ if (isPr) {
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ if (isAccessor) {
+ paf.setLocalMaxMemory(0);
+ }
+ PartitionAttributes prAttr =
paf.setTotalNumBuckets(20).setRedundantCopies(0).create();
+ factory.setPartitionAttributes(prAttr);
+ } else {
+ factory.setScope(Scope.DISTRIBUTED_ACK);
+ factory.setDataPolicy(DataPolicy.REPLICATE);
+ }
+ if (asyncIndex) {
+ factory.setIndexMaintenanceSynchronous(!asyncIndex);
+ }
+ if (compressor != null) {
+ factory.setCompressor(compressor);
+ }
+
+ createRegion(this.regionName, this.rootRegionName, factory.create());
+ createRegion(this.regionName2, this.rootRegionName, factory.create());
+
+ try {
+ startBridgeServer(0, false);
+ } catch (Exception ex) {
+ Assert.fail("While starting CacheServer", ex);
+ }
+ }
+
+ protected void executeCompiledQueries(String poolName, Object[][] params) {
+ SelectResults results = null;
+ QueryService qService = null;
+
+ try {
+ qService = (PoolManager.find(poolName)).getQueryService();
+ } catch (Exception e) {
+ Assert.fail("Failed to get QueryService.", e);
+ }
+
+ for (int i = 0; i < queryString.length; i++) {
+ try {
+ Query query = qService.newQuery(queryString[i]);
+ results = (SelectResults) query.execute(params[i]);
+ } catch (Exception e) {
+ Assert.fail("Failed executing " + queryString[i], e);
+ }
+ }
+ }
+
+ /**
+ * Starts a bridge server on the given port, using the given
deserializeValues and
+ * notifyBySubscription to serve up the given region.
+ */
+ protected void startBridgeServer(int port, boolean notifyBySubscription)
throws IOException {
+ Cache cache = getCache();
+ CacheServer bridge = cache.addCacheServer();
+ bridge.setPort(port);
+ bridge.setNotifyBySubscription(notifyBySubscription);
+ bridge.start();
+ bridgeServerPort = bridge.getPort();
+ }
+
+ /**
+ * Stops the bridge server that serves up the given cache.
+ */
+ protected void stopBridgeServer(Cache cache) {
+ CacheServer bridge = (CacheServer)
cache.getCacheServers().iterator().next();
+ bridge.stop();
+ assertFalse(bridge.isRunning());
+ }
+
+ public void closeClient(VM client) {
+ SerializableRunnable closeCache = new CacheSerializableRunnable("Close
Client") {
+ public void run2() throws CacheException {
+ try {
+ closeCache();
+ disconnectFromDS();
+ } catch (Exception ex) {
+ }
+ }
+ };
+
+ client.invoke(closeCache);
+ }
+
+ /**
+ * Starts a bridge server on the given port, using the given
deserializeValues and
+ * notifyBySubscription to serve up the given region.
+ */
+ protected void startCacheServer(int port, boolean notifyBySubscription)
throws IOException {
+ Cache cache = CacheFactory.getAnyInstance();
+ CacheServer bridge = cache.addCacheServer();
+ bridge.setPort(port);
+ bridge.setNotifyBySubscription(notifyBySubscription);
+ bridge.start();
+ bridgeServerPort = bridge.getPort();
+ }
+
+ public static class TestObject2 implements PdxSerializable {
+ public int _id;
+ public static int numInstance = 0;
+
+ public TestObject2() {
+ numInstance++;
+ }
+
+ public TestObject2(int id) {
+ this._id = id;
+ numInstance++;
+ }
+
+ public int getId() {
+ return this._id;
+ }
+
+ public void toData(PdxWriter out) {
+ out.writeInt("id", this._id);
+ }
+
+ public void fromData(PdxReader in) {
+ this._id = in.readInt("id");
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ GemFireCacheImpl.getInstance().getLoggerI18n()
+ .fine("In TestObject2.equals() this: " + this + " other :" + o);
+ TestObject2 other = (TestObject2) o;
+ if (_id == other._id) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ GemFireCacheImpl.getInstance().getLoggerI18n()
+ .fine("In TestObject2.hashCode() : " + this._id);
+ return this._id;
+ }
+ }
+
+ public static class TestObject implements PdxSerializable {
+ public static LogWriter log;
+ protected String _ticker;
+ protected int _price;
+ public int id;
+ public int important;
+ public int selection;
+ public int select;
+ public static int numInstance = 0;
+ public Map idTickers = new HashMap();
+ public HashMap positions = new HashMap();
+ public TestObject2 test;
+
+ public TestObject() {
+ if (log != null) {
+ log.info("TestObject ctor stack trace", new Exception());
+ }
+ numInstance++;
+ }
+
+ public TestObject(int id, String ticker) {
+ if (log != null) {
+ log.info("TestObject ctor stack trace", new Exception());
+ }
+ this.id = id;
+ this._ticker = ticker;
+ this._price = id;
+ this.important = id;
+ this.selection = id;
+ this.select = id;
+ numInstance++;
+ idTickers.put(id + "", ticker);
+ this.test = new TestObject2(id);
+ }
+
+ public TestObject(int id, String ticker, int numPositions) {
+ this(id, ticker);
+ for (int i = 0; i < numPositions; i++) {
+ positions.put(id + i, new PositionPdx(ticker + ":" + id + ":" + i, (id
+ 100)));
+ }
+ }
+
+ public int getIdValue() {
+ return this.id;
+ }
+
+ public String getTicker() {
+ return this._ticker;
+ }
+
+ public int getPriceValue() {
+ return this._price;
+ }
+
+ public HashMap getPositions(String id) {
+ return this.positions;
+ }
+
+ public String getStatus() {
+ return (id % 2 == 0) ? "active" : "inactive";
+ }
+
+ public void toData(PdxWriter out) {
+ out.writeInt("id", this.id);
+ out.writeString("ticker", this._ticker);
+ out.writeInt("price", this._price);
+ out.writeObject("idTickers", this.idTickers);
+ out.writeObject("positions", this.positions);
+ out.writeObject("test", this.test);
+ }
+
+ public void fromData(PdxReader in) {
+ this.id = in.readInt("id");
+ this._ticker = in.readString("ticker");
+ this._price = in.readInt("price");
+ this.idTickers = (Map) in.readObject("idTickers");
+ this.positions = (HashMap) in.readObject("positions");
+ this.test = (TestObject2) in.readObject("test");
+ }
+
+ public String toString() {
+ StringBuffer buffer = new StringBuffer();
+ buffer.append("TestObject [").append("id=").append(this.id).append(";
ticker=")
+ .append(this._ticker).append(";
price=").append(this._price).append("]");
+ return buffer.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ TestObject other = (TestObject) o;
+ if ((id == other.id) && (_ticker.equals(other._ticker))) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ GemFireCacheImpl.getInstance().getLoggerI18n().fine("In
TestObject.hashCode() : " + this.id);
+ return this.id;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a64f7f60/geode-core/src/test/java/org/apache/geode/cache/query/dunit/PdxGroupByPartitionedQueryDUnitTest.java
----------------------------------------------------------------------
diff --git
a/geode-core/src/test/java/org/apache/geode/cache/query/dunit/PdxGroupByPartitionedQueryDUnitTest.java
b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/PdxGroupByPartitionedQueryDUnitTest.java
new file mode 100644
index 0000000..9528693
--- /dev/null
+++
b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/PdxGroupByPartitionedQueryDUnitTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.dunit;
+
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.query.functional.PdxGroupByTestImpl;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.SerializableRunnable;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.junit.categories.DistributedTest;
+
+@Category(DistributedTest.class)
+public class PdxGroupByPartitionedQueryDUnitTest extends GroupByDUnitImpl {
+
+ @Override
+ protected PdxGroupByTestImpl createTestInstance() {
+ Host host = Host.getHost(0);
+ final VM vm0 = host.getVM(0);
+ final VM vm1 = host.getVM(1);
+ final VM vm2 = host.getVM(2);
+ final VM vm3 = host.getVM(3);
+
+ PdxGroupByTestImpl test = new PdxGroupByTestImpl() {
+
+ @Override
+ public Region createRegion(String regionName, Class valueConstraint) {
+ Region rgn = createAccessor(regionName, valueConstraint);
+ createPR(vm1, regionName, valueConstraint);
+ createPR(vm2, regionName, valueConstraint);
+ createPR(vm3, regionName, valueConstraint);
+ return rgn;
+ }
+ };
+ return test;
+ }
+
+ private void createBuckets(VM vm) {
+ vm.invoke(new SerializableRunnable("create accessor") {
+ public void run() {
+ Cache cache = getCache();
+ Region region = cache.getRegion("region");
+ for (int i = 0; i < 10; i++) {
+ region.put(i, i);
+ }
+ }
+ });
+ }
+
+ private void createPR(VM vm, final String regionName, final Class
valueConstraint) {
+ vm.invoke(new SerializableRunnable("create data store") {
+ public void run() {
+ Cache cache = getCache();
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ paf.setTotalNumBuckets(10);
+
cache.createRegionFactory(RegionShortcut.PARTITION).setValueConstraint(valueConstraint)
+ .setPartitionAttributes(paf.create()).create(regionName);
+ }
+ });
+ }
+
+ private Region createAccessor(String regionName, Class valueConstraint) {
+
+ Cache cache = getCache();
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ paf.setTotalNumBuckets(10);
+ paf.setLocalMaxMemory(0);
+ return cache.createRegionFactory(RegionShortcut.PARTITION_PROXY)
+
.setValueConstraint(valueConstraint).setPartitionAttributes(paf.create())
+ .create(regionName);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a64f7f60/geode-core/src/test/java/org/apache/geode/cache/query/dunit/PdxLocalQueryDUnitTest.java
----------------------------------------------------------------------
diff --git
a/geode-core/src/test/java/org/apache/geode/cache/query/dunit/PdxLocalQueryDUnitTest.java
b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/PdxLocalQueryDUnitTest.java
new file mode 100644
index 0000000..b934240
--- /dev/null
+++
b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/PdxLocalQueryDUnitTest.java
@@ -0,0 +1,924 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.dunit;
+
+import static org.apache.geode.test.dunit.Assert.*;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.CacheException;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.cache.query.Struct;
+import org.apache.geode.cache.query.data.PortfolioPdx;
+import org.apache.geode.cache.query.data.PositionPdx;
+import org.apache.geode.cache.query.functional.StructSetOrResultsSet;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.cache30.CacheSerializableRunnable;
+import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.pdx.PdxInstance;
+import org.apache.geode.pdx.PdxInstanceFactory;
+import org.apache.geode.pdx.internal.PdxInstanceEnum;
+import org.apache.geode.pdx.internal.PdxInstanceFactoryImpl;
+import org.apache.geode.pdx.internal.PdxString;
+import org.apache.geode.test.dunit.Assert;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.LogWriterUtils;
+import org.apache.geode.test.dunit.NetworkUtils;
+import org.apache.geode.test.dunit.SerializableCallable;
+import org.apache.geode.test.dunit.SerializableRunnable;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.junit.categories.DistributedTest;
+
+@Category(DistributedTest.class)
+public class PdxLocalQueryDUnitTest extends PDXQueryTestBase {
+
+ @Test
+ public void testLocalPdxQueriesVerifyNoDeserialization() throws Exception {
+ final Host host = Host.getHost(0);
+ final VM server1 = host.getVM(0);
+ final VM server2 = host.getVM(1);
+
+ final int numberOfEntries = 10;
+ final String name = "/" + regionName;
+
+ final String[] queries = {"select * from " + name + " where status =
'inactive'",
+ "select p from " + name + " p where p.status = 'inactive'",
+ "select * from " + name + " p, p.positions.values v where v.secId =
'IBM'",
+ "select p.status from " + name + " p where p.status = 'inactive' or
p.ID > 0",
+ "select * from " + name + " p where p.status = 'inactive' and p.ID >=
0",
+ "select p.status from " + name + " p where p.status in set
('inactive', 'active')",
+ "select * from " + name + " p where p.ID > 0 and p.ID < 10",};
+
+ // Start server1
+ server1.invoke(new SerializableCallable("Create Server1") {
+ @Override
+ public Object call() throws Exception {
+ Region r1 =
getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
+
+ for (int i = 0; i < numberOfEntries; i++) {
+ PortfolioPdx p = new PortfolioPdx(i);
+ r1.put("key-" + i, p);
+ }
+ return null;
+ }
+ });
+
+ // Start server2
+ server2.invoke(new SerializableCallable("Create Server2") {
+ @Override
+ public Object call() throws Exception {
+ Region r1 =
getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
+
+ QueryService qs = null;
+ SelectResults sr = null;
+ // Execute query locally
+ try {
+ qs = getCache().getQueryService();
+ } catch (Exception e) {
+ Assert.fail("Failed to get QueryService.", e);
+ }
+
+ for (int i = 0; i < queries.length; i++) {
+ try {
+ sr = (SelectResults) qs.newQuery(queries[i]).execute();
+ assertTrue("Size of resultset should be greater than 0 for query:
" + queries[i],
+ sr.size() > 0);
+ } catch (Exception e) {
+ Assert.fail("Failed executing query " + queries[i], e);
+ }
+ }
+ assertEquals("Unexpected number of objects deserialized ", 0,
PortfolioPdx.numInstance);
+ return null;
+ }
+ });
+ this.closeClient(server1);
+ this.closeClient(server2);
+
+ }
+
+ @Test
+ public void testLocalPdxQueriesReadSerialized() throws Exception {
+ final Host host = Host.getHost(0);
+ final VM server1 = host.getVM(0);
+ final VM server2 = host.getVM(1);
+
+ final int numberOfEntries = 10;
+ final String name = "/" + regionName;
+
+ final String[] queries = {"select * from " + name + " where position1 =
$1",
+ "select * from " + name + " where aDay = $1",
+ "select * from " + name + " where status = 'inactive'",
+ "select distinct * from " + name + " where status = 'inactive'",
+ "select p from " + name + " p where p.status = 'inactive'",
+ "select * from " + name + " p, p.positions.values v where v.secId =
'IBM'",
+ "select * from " + name + " p where p.status = 'inactive' or p.ID > 0",
+ "select * from " + name + " p where p.status = 'inactive' and p.ID >=
0",
+ "select * from " + name + " p where p.status in set ('inactive',
'active')",
+ "select * from " + name + " p where p.ID > 0 and p.ID < 10",};
+
+ // Start server1
+ server1.invoke(new SerializableCallable("Create Server1") {
+ @Override
+ public Object call() throws Exception {
+ Region r1 =
getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
+
+ for (int i = 0; i < numberOfEntries; i++) {
+ PortfolioPdx p = new PortfolioPdx(i);
+ r1.put("key-" + i, p);
+ }
+ return null;
+ }
+ });
+
+ // Start server2
+ server2.invoke(new SerializableCallable("Create Server2") {
+ @Override
+ public Object call() throws Exception {
+ ((GemFireCacheImpl) getCache()).setReadSerialized(true);
+ Region r1 =
getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
+
+ QueryService qs = null;
+ SelectResults sr = null;
+ // Execute query locally
+ try {
+ qs = getCache().getQueryService();
+ } catch (Exception e) {
+ Assert.fail("Failed to get QueryService.", e);
+ }
+
+ PositionPdx pos = new PositionPdx("IBM", 100);
+ PdxInstanceFactory out = PdxInstanceFactoryImpl
+ .newCreator("org.apache.geode.cache.query.data.PositionPdx",
false);
+ out.writeLong("avg20DaysVol", 0);
+ out.writeString("bondRating", "");
+ out.writeDouble("convRatio", 0);
+ out.writeString("country", "");
+ out.writeDouble("delta", 0);
+ out.writeLong("industry", 0);
+ out.writeLong("issuer", 0);
+ out.writeDouble("mktValue", pos.getMktValue());
+ out.writeDouble("qty", 0);
+ out.writeString("secId", pos.secId);
+ out.writeString("secIdIndexed", pos.secIdIndexed);
+ out.writeString("secLinks", "");
+ out.writeDouble("sharesOutstanding", pos.getSharesOutstanding());
+ out.writeString("underlyer", "");
+ out.writeLong("volatility", 0);
+ out.writeInt("pid", pos.getPid());
+ out.writeInt("portfolioId", 0);
+ out.markIdentityField("secId");
+ PdxInstance pi = out.create();
+
+ PortfolioPdx.Day pDay = new PortfolioPdx(1).aDay;
+ PdxInstanceEnum pdxEnum = new PdxInstanceEnum(pDay);
+
+ for (int i = 0; i < queries.length; i++) {
+ try {
+ if (i == 0) {
+ sr = (SelectResults) qs.newQuery(queries[i]).execute(new
Object[] {pi});
+ } else if (i == 1) {
+ sr = (SelectResults) qs.newQuery(queries[i]).execute(new
Object[] {pdxEnum});
+ } else {
+ sr = (SelectResults) qs.newQuery(queries[i]).execute();
+ }
+ assertTrue("Size of resultset should be greater than 0 for query:
" + queries[i],
+ sr.size() > 0);
+ for (Object result : sr) {
+ if (result instanceof Struct) {
+ Object[] r = ((Struct) result).getFieldValues();
+ for (int j = 0; j < r.length; j++) {
+ if (!(r[j] instanceof PdxInstance)) {
+ fail("Result object should be a PdxInstance and not an
instance of "
+ + r[j].getClass() + " for query: " + queries[i]);
+ }
+ }
+ } else if (!(result instanceof PdxInstance)) {
+ fail("Result object should be a PdxInstance and not an
instance of "
+ + result.getClass() + " for query: " + queries[i]);
+ }
+ }
+ } catch (Exception e) {
+ Assert.fail("Failed executing query " + queries[i], e);
+ }
+ }
+ return null;
+ }
+ });
+ this.closeClient(server1);
+ this.closeClient(server2);
+
+ }
+
+ @Test
+ public void testLocalPdxQueries() throws Exception {
+ final Host host = Host.getHost(0);
+ final VM server1 = host.getVM(1);
+ final VM client = host.getVM(2);
+
+ final int numberOfEntries = 10;
+ final String name = "/" + regionName;
+ final String name2 = "/" + regionName2;
+ final String[] queries = {"select * from " + name + " where position1 =
$1",
+ "select * from " + name + " where aDay = $1",
+ "select distinct * from " + name + " p where p.status = 'inactive'",
// numberOfEntries
+ "select distinct p.status from " + name + " p where p.status =
'inactive'", // 1
+ "select p from " + name + " p where p.status = 'inactive'", //
numberOfEntries
+ "select * from " + name + " p, p.positions.values v where v.secId =
'IBM'", // 4
+ "select v from " + name + " p, p.positions.values v where v.secId =
'IBM'", // 4
+ "select p.status from " + name + " p where p.status = 'inactive'", //
numberOfEntries
+ "select distinct * from " + name + " p where p.status = 'inactive'
order by p.ID", // numberOfEntries
+ "select * from " + name + " p where p.status = 'inactive' or p.ID >
0", // 19
+ "select * from " + name + " p where p.status = 'inactive' and p.ID >=
0", // numberOfEntries
+ "select * from " + name + " p where p.status in set ('inactive',
'active')", // numberOfEntries*2
+ "select * from " + name + " p where p.ID > 0 and p.ID < 10", // 9
+ "select v from " + name + " p, p.positions.values v where p.status =
'inactive'", // numberOfEntries*2
+ "select v.secId from " + name + " p, p.positions.values v where
p.status = 'inactive'", // numberOfEntries*2
+ "select distinct p from " + name
+ + " p, p.positions.values v where p.status = 'inactive' and v.pid
>= 0", // numberOfEntries
+ "select distinct p from " + name
+ + " p, p.positions.values v where p.status = 'inactive' or v.pid >
0", // numberOfEntries*2
+ "select distinct * from " + name + " p, p.positions.values v where
p.status = 'inactive'", // numberOfEntries*2
+ "select * from " + name + ".values v where v.status = 'inactive'", //
numberOfEntries
+ "select v from " + name + " v where v in (select p from " + name + " p
where p.ID > 0)", // 19
+ "select v from " + name + " v where v.status in (select distinct
p.status from " + name
+ + " p where p.status = 'inactive')", // numberOfEntries
+ "select * from " + name + " r1, " + name2 + " r2 where r1.status =
r2.status", // 200
+ "select * from " + name + " r1, " + name2
+ + " r2 where r1.status = r2.status and r1.status = 'active'", //
100
+ "select r2.status from " + name + " r1, " + name2
+ + " r2 where r1.status = r2.status and r1.status = 'active'", //
100
+ "select distinct r2.status from " + name + " r1, " + name2
+ + " r2 where r1.status = r2.status and r1.status = 'active'", // 1
+ "select * from " + name + " v where v.status = ELEMENT (select
distinct p.status from "
+ + name + " p where p.status = 'inactive')", // numberOfEntries
+ };
+
+ final int[] results = {2, 3, numberOfEntries, 1, numberOfEntries, 4, 4,
numberOfEntries,
+ numberOfEntries, 19, numberOfEntries, numberOfEntries * 2, 9,
numberOfEntries * 2,
+ numberOfEntries * 2, numberOfEntries, numberOfEntries * 2,
numberOfEntries * 2,
+ numberOfEntries, 19, numberOfEntries, 200, 100, 100, 1,
numberOfEntries};
+
+ // Start server1
+ final int port1 = (Integer) server1.invoke(new
SerializableCallable("Create Server1") {
+ @Override
+ public Object call() throws Exception {
+ Region r1 =
getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
+ Region r2 =
getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName2);
+
+ for (int i = 0; i < numberOfEntries; i++) {
+ PortfolioPdx p = new PortfolioPdx(i);
+ r1.put("key-" + i, p);
+ r2.put("key-" + i, p);
+ }
+
+ CacheServer server = getCache().addCacheServer();
+ int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+ server.setPort(port);
+ server.start();
+ return port;
+ }
+ });
+
+ // client loads pdx objects on server
+ client.invoke(new SerializableCallable("Create client") {
+ @Override
+ public Object call() throws Exception {
+ ClientCacheFactory cf = new ClientCacheFactory();
+ cf.addPoolServer(NetworkUtils.getServerHostName(server1.getHost()),
port1);
+ ClientCache cache = getClientCache(cf);
+
+ Region region =
+
cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(regionName);
+ Region region2 =
+
cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(regionName2);
+
+ for (int i = numberOfEntries; i < numberOfEntries * 2; i++) {
+ PortfolioPdx p = new PortfolioPdx(i);
+ region.put("key-" + i, p);
+ region2.put("key-" + i, p);
+ }
+ return null;
+ }
+ });
+
+ // query locally on server1 to verify pdx objects are not deserialized
+ server1.invoke(new SerializableCallable("query locally on server1") {
+ @Override
+ public Object call() throws Exception {
+ GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
+
+ QueryService qs = null;
+ SelectResults sr = null;
+ // Execute query locally
+ try {
+ qs = getCache().getQueryService();
+ } catch (Exception e) {
+ Assert.fail("Failed to get QueryService.", e);
+ }
+
+ PositionPdx pos = new PositionPdx("IBM", 100);
+ PortfolioPdx.Day pDay = new PortfolioPdx(1).aDay;
+
+ for (int i = 0; i < queries.length; i++) {
+ try {
+ if (i == 0) {
+ sr = (SelectResults) qs.newQuery(queries[i]).execute(new
Object[] {pos});
+ } else if (i == 1) {
+ sr = (SelectResults) qs.newQuery(queries[i]).execute(new
Object[] {pDay});
+ } else {
+ sr = (SelectResults) qs.newQuery(queries[i]).execute();
+ }
+
+ assertTrue("Size of resultset should be greater than 0 for query:
" + queries[i],
+ sr.size() > 0);
+ assertEquals("Expected and actual results do not match for query:
" + queries[i],
+ results[i], sr.size());
+ } catch (Exception e) {
+ Assert.fail("Failed executing query " + queries[i], e);
+ }
+ }
+
+ int extra = 0;
+ if (cache.getLogger().fineEnabled()) {
+ extra = 20;
+ }
+ assertEquals(numberOfEntries * 6 + 1 + extra,
PortfolioPdx.numInstance);
+
+ // set readserealized and query
+ ((GemFireCacheImpl) getCache()).setReadSerialized(true);
+
+ PdxInstanceFactory out = PdxInstanceFactoryImpl
+ .newCreator("org.apache.geode.cache.query.data.PositionPdx",
false);
+ out.writeLong("avg20DaysVol", 0);
+ out.writeString("bondRating", "");
+ out.writeDouble("convRatio", 0);
+ out.writeString("country", "");
+ out.writeDouble("delta", 0);
+ out.writeLong("industry", 0);
+ out.writeLong("issuer", 0);
+ out.writeDouble("mktValue", pos.getMktValue());
+ out.writeDouble("qty", 0);
+ out.writeString("secId", pos.secId);
+ out.writeString("secIdIndexed", pos.secIdIndexed);
+ out.writeString("secLinks", "");
+ out.writeDouble("sharesOutstanding", pos.getSharesOutstanding());
+ out.writeString("underlyer", "");
+ out.writeLong("volatility", 0);
+ out.writeInt("pid", pos.getPid());
+ out.writeInt("portfolioId", 0);
+ // Identity Field.
+ out.markIdentityField("secId");
+ PdxInstance pi = out.create();
+
+ PdxInstanceEnum pdxEnum = new PdxInstanceEnum(pDay);
+
+ for (int i = 0; i < queries.length; i++) {
+ try {
+ if (i == 0) {
+ sr = (SelectResults) qs.newQuery(queries[i]).execute(new
Object[] {pi});
+ } else if (i == 1) {
+ sr = (SelectResults) qs.newQuery(queries[i]).execute(new
Object[] {pdxEnum});
+ } else {
+ sr = (SelectResults) qs.newQuery(queries[i]).execute();
+ }
+ assertTrue("Size of resultset should be greater than 0 for query:
" + queries[i],
+ sr.size() > 0);
+ // For distinct queries with a mix of pdx and non pdx objects
+ // the hashcodes should be equal for comparison which are not
+ // in case of PortfolioPdx
+ if (queries[i].indexOf("distinct") == -1) {
+ if (i == 0 || i == 1) {
+ assertEquals("Expected and actual results do not match for
query: " + queries[i], 1,
+ sr.size());
+ } else {
+ assertEquals("Expected and actual results do not match for
query: " + queries[i],
+ results[i], sr.size());
+ }
+ }
+ } catch (Exception e) {
+ Assert.fail("Failed executing query " + queries[i], e);
+ }
+ }
+
+ // reset readserealized and query
+ ((GemFireCacheImpl) getCache()).setReadSerialized(false);
+ return null;
+ }
+ });
+
+ // query from client
+ client.invoke(new SerializableCallable("Create client") {
+ @Override
+ public Object call() throws Exception {
+ ClientCacheFactory cf = new ClientCacheFactory();
+ cf.addPoolServer(NetworkUtils.getServerHostName(server1.getHost()),
port1);
+ ClientCache cache = getClientCache(cf);
+
+ QueryService qs = null;
+ SelectResults sr = null;
+ // Execute query remotely
+ try {
+ qs = cache.getQueryService();
+ } catch (Exception e) {
+ Assert.fail("Failed to get QueryService.", e);
+ }
+
+ PositionPdx pos = new PositionPdx("IBM", 100);
+ PortfolioPdx.Day pDay = new PortfolioPdx(1).aDay;
+
+ for (int i = 0; i < queries.length; i++) {
+ try {
+ if (i == 0) {
+ sr = (SelectResults) qs.newQuery(queries[i]).execute(new
Object[] {pos});
+ } else if (i == 1) {
+ sr = (SelectResults) qs.newQuery(queries[i]).execute(new
Object[] {pDay});
+ } else {
+ sr = (SelectResults) qs.newQuery(queries[i]).execute();
+ }
+ assertTrue("Size of resultset should be greater than 0 for query:
" + queries[i],
+ sr.size() > 0);
+ assertEquals("Expected and actual results do not match for query:
" + queries[i],
+ results[i], sr.size());
+ for (Object result : sr) {
+ if (result instanceof Struct) {
+ Object[] r = ((Struct) result).getFieldValues();
+ for (int j = 0; j < r.length; j++) {
+ if (r[j] instanceof PdxInstance || r[j] instanceof
PdxString) {
+ fail("Result object should be a domain object and not an
instance of "
+ + r[j].getClass() + " for query: " + queries[i]);
+ }
+ }
+ } else if (result instanceof PdxInstance || result instanceof
PdxString) {
+ fail("Result object should be a domain object and not an
instance of "
+ + result.getClass() + " for query: " + queries[i]);
+ }
+ }
+ } catch (Exception e) {
+ Assert.fail("Failed executing query " + queries[i], e);
+ }
+ }
+
+ return null;
+ }
+ });
+
+ // query locally on server1
+ server1.invoke(new SerializableCallable("query locally on server1") {
+ @Override
+ public Object call() throws Exception {
+ GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
+
+ QueryService qs = null;
+ SelectResults[][] sr = new SelectResults[queries.length][2];
+ // Execute query locally
+ try {
+ qs = getCache().getQueryService();
+ } catch (Exception e) {
+ Assert.fail("Failed to get QueryService.", e);
+ }
+ int cnt = PositionPdx.cnt;
+ PositionPdx pos = new PositionPdx("IBM", 100);
+ PortfolioPdx.Day pDay = new PortfolioPdx(1).aDay;
+
+ for (int i = 0; i < queries.length; i++) {
+ try {
+ if (i == 0) {
+ sr[i][0] = (SelectResults) qs.newQuery(queries[i]).execute(new
Object[] {pos});
+ } else if (i == 1) {
+ sr[i][0] = (SelectResults) qs.newQuery(queries[i]).execute(new
Object[] {pDay});
+ } else {
+ sr[i][0] = (SelectResults) qs.newQuery(queries[i]).execute();
+ }
+
+ assertTrue("Size of resultset should be greater than 0 for query:
" + queries[i],
+ sr[i][0].size() > 0);
+ assertEquals("Expected and actual results do not match for query:
" + queries[i],
+ results[i], sr[i][0].size());
+ for (Object result : sr[i][0]) {
+ if (result instanceof Struct) {
+ Object[] r = ((Struct) result).getFieldValues();
+ for (int j = 0; j < r.length; j++) {
+ if (r[j] instanceof PdxInstance || r[j] instanceof
PdxString) {
+ fail("Result object should be a domain object and not an
instance of "
+ + r[j].getClass() + " for query: " + queries[i]);
+ }
+ }
+ } else if (result instanceof PdxInstance || result instanceof
PdxString) {
+ fail("Result object should be a domain object and not an
instance of "
+ + result.getClass() + " for query: " + queries[i]);
+ }
+ }
+ } catch (Exception e) {
+ Assert.fail("Failed executing query " + queries[i], e);
+ }
+ }
+
+ // create index
+ qs.createIndex("statusIndex", "status", name);
+ qs.createIndex("IDIndex", "ID", name);
+ qs.createIndex("pIdIndex", "pos.getPid()", name + " p,
p.positions.values pos");
+ qs.createIndex("secIdIndex", "pos.secId", name + " p,
p.positions.values pos");
+
+ for (int i = 0; i < queries.length; i++) {
+ try {
+ if (i == 0) {
+ sr[i][1] = (SelectResults) qs.newQuery(queries[i]).execute(new
Object[] {pos});
+ } else if (i == 1) {
+ sr[i][1] = (SelectResults) qs.newQuery(queries[i]).execute(new
Object[] {pDay});
+ } else {
+ sr[i][1] = (SelectResults) qs.newQuery(queries[i]).execute();
+ }
+
+ assertTrue("Size of resultset should be greater than 0 for query:
" + queries[i],
+ sr[i][1].size() > 0);
+ assertEquals("Expected and actual results do not match for query:
" + queries[i],
+ results[i], sr[i][1].size());
+ for (Object result : sr[i][1]) {
+ if (result instanceof Struct) {
+ Object[] r = ((Struct) result).getFieldValues();
+ for (int j = 0; j < r.length; j++) {
+ if (r[j] instanceof PdxInstance || r[j] instanceof
PdxString) {
+ fail("Result object should be a domain object and not an
instance of "
+ + r[j].getClass() + " for query: " + queries[i]);
+ }
+ }
+ } else if (result instanceof PdxInstance || result instanceof
PdxString) {
+ fail("Result object should be a domain object and not an
instance of "
+ + result.getClass() + " for query: " + queries[i]);
+ }
+ }
+ } catch (Exception e) {
+ Assert.fail("Failed executing query " + queries[i], e);
+ }
+ }
+
+ StructSetOrResultsSet ssOrrs = new StructSetOrResultsSet();
+ ssOrrs.CompareQueryResultsWithoutAndWithIndexes(sr, queries.length,
queries);
+ return null;
+ }
+ });
+
+ this.closeClient(client);
+ this.closeClient(server1);
+
+ }
+
+ @Test
+ public void testLocalPdxQueriesOnPR() throws Exception {
+ final Host host = Host.getHost(0);
+ final VM server1 = host.getVM(0);
+ final VM server2 = host.getVM(1);
+ final VM client = host.getVM(2);
+
+ final int numberOfEntries = 10;
+ final String name = "/" + regionName;
+ final String[] queries = {"select * from " + name + " where position1 =
$1",
+ "select * from " + name + " where aDay = $1",
+ "select distinct * from " + name + " p where p.status = 'inactive'",
// numberOfEntries
+ "select distinct p.status from " + name + " p where p.status =
'inactive'", // 1
+ "select p from " + name + " p where p.status = 'inactive'", //
numberOfEntries
+ "select * from " + name + " p, p.positions.values v where v.secId =
'IBM'", // 4
+ "select v from " + name + " p, p.positions.values v where v.secId =
'IBM'", // 4
+ "select p.status from " + name + " p where p.status = 'inactive'", //
numberOfEntries
+ "select distinct * from " + name + " p where p.status = 'inactive'
order by p.ID", // numberOfEntries
+ "select * from " + name + " p where p.status = 'inactive' or p.ID >
0", // 19
+ "select * from " + name + " p where p.status = 'inactive' and p.ID >=
0", // numberOfEntries
+ "select * from " + name + " p where p.status in set ('inactive',
'active')", // numberOfEntries*2
+ "select * from " + name + " p where p.ID > 0 and p.ID < 10", // 9
+ "select v from " + name + " p, p.positions.values v where p.status =
'inactive'", // numberOfEntries*2
+ "select v.secId from " + name + " p, p.positions.values v where
p.status = 'inactive'", // numberOfEntries*2
+ "select distinct p from " + name
+ + " p, p.positions.values v where p.status = 'inactive' and v.pid
>= 0", // numberOfEntries
+ "select distinct p from " + name
+ + " p, p.positions.values v where p.status = 'inactive' or v.pid >
0", // numberOfEntries*2
+ "select distinct * from " + name + " p, p.positions.values v where
p.status = 'inactive'", // numberOfEntries*2
+ "select * from " + name + ".values v where v.status = 'inactive'", //
numberOfEntries
+ "select v from " + name + " v where v in (select p from " + name + " p
where p.ID > 0)", // 19
+ "select v from " + name + " v where v.status in (select distinct
p.status from " + name
+ + " p where p.status = 'inactive')", // numberOfEntries
+ "select * from " + name + " v where v.status = ELEMENT (select
distinct p.status from "
+ + name + " p where p.status = 'inactive')", // numberOfEntries
+ };
+
+ final int[] results = {2, 3, numberOfEntries, 1, numberOfEntries, 4, 4,
numberOfEntries,
+ numberOfEntries, 19, numberOfEntries, numberOfEntries * 2, 9,
numberOfEntries * 2,
+ numberOfEntries * 2, numberOfEntries, numberOfEntries * 2,
numberOfEntries * 2,
+ numberOfEntries, 19, numberOfEntries, numberOfEntries};
+
+ // Start server1
+ final int port1 = (Integer) server1.invoke(new
SerializableCallable("Create Server1") {
+ @Override
+ public Object call() throws Exception {
+ Region r1 =
getCache().createRegionFactory(RegionShortcut.PARTITION).create(regionName);
+ for (int i = 0; i < numberOfEntries; i++) {
+ r1.put("key-" + i, new PortfolioPdx(i));
+ }
+ CacheServer server = getCache().addCacheServer();
+ int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+ server.setPort(port);
+ server.start();
+ return port;
+ }
+ });
+
+ // Start server2
+ final int port2 = (Integer) server2.invoke(new
SerializableCallable("Create Server2") {
+ @Override
+ public Object call() throws Exception {
+ Region r1 =
getCache().createRegionFactory(RegionShortcut.PARTITION).create(regionName);
+ CacheServer server = getCache().addCacheServer();
+ int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+ server.setPort(port);
+ server.start();
+ return port;
+ }
+ });
+
+ // client loads pdx objects on server
+ client.invoke(new SerializableCallable("Create client") {
+ @Override
+ public Object call() throws Exception {
+ ClientCacheFactory cf = new ClientCacheFactory();
+ cf.addPoolServer(NetworkUtils.getServerHostName(server1.getHost()),
port1);
+ ClientCache cache = getClientCache(cf);
+ Region region =
+
cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(regionName);
+
+ for (int i = numberOfEntries; i < numberOfEntries * 2; i++) {
+ region.put("key-" + i, new PortfolioPdx(i));
+ }
+
+ QueryService qs = null;
+ SelectResults sr = null;
+ // Execute query remotely
+ try {
+ qs = cache.getQueryService();
+ } catch (Exception e) {
+ Assert.fail("Failed to get QueryService.", e);
+ }
+ PositionPdx pos = new PositionPdx("IBM", 100);
+ PortfolioPdx.Day pDay = new PortfolioPdx(1).aDay;
+
+ for (int i = 0; i < queries.length; i++) {
+ try {
+ if (i == 0) {
+ sr = (SelectResults) qs.newQuery(queries[i]).execute(new
Object[] {pos});
+ } else if (i == 1) {
+ sr = (SelectResults) qs.newQuery(queries[i]).execute(new
Object[] {pDay});
+ } else {
+ sr = (SelectResults) qs.newQuery(queries[i]).execute();
+ }
+ assertTrue("Size of resultset should be greater than 0 for query:
" + queries[i],
+ sr.size() > 0);
+ assertEquals("Expected and actual results do not match for query:
" + queries[i],
+ results[i], sr.size());
+
+ for (Object result : sr) {
+ if (result instanceof Struct) {
+ Object[] r = ((Struct) result).getFieldValues();
+ for (int j = 0; j < r.length; j++) {
+ if (r[j] instanceof PdxInstance || r[j] instanceof
PdxString) {
+ fail("Result object should be a domain object and not an
instance of "
+ + r[j].getClass() + " for query: " + queries[i]);
+ }
+ }
+ } else if (result instanceof PdxInstance || result instanceof
PdxString) {
+ fail("Result object should be a domain object and not an
instance of "
+ + result.getClass() + " for query: " + queries[i]);
+ }
+ }
+ } catch (Exception e) {
+ Assert.fail("Failed executing query " + queries[i], e);
+ }
+ }
+
+ return null;
+ }
+ });
+
+ // query locally on server1
+ server1.invoke(new SerializableCallable("query locally on server1") {
+ @Override
+ public Object call() throws Exception {
+ GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
+
+ QueryService qs = null;
+ SelectResults sr = null;
+ // Execute query locally
+ try {
+ qs = getCache().getQueryService();
+ } catch (Exception e) {
+ Assert.fail("Failed to get QueryService.", e);
+ }
+
+ PositionPdx pos = new PositionPdx("IBM", 100);
+ PortfolioPdx.Day pDay = new PortfolioPdx(1).aDay;
+
+ for (int i = 0; i < queries.length; i++) {
+ try {
+ if (i == 0) {
+ sr = (SelectResults) qs.newQuery(queries[i]).execute(new
Object[] {pos});
+ } else if (i == 1) {
+ sr = (SelectResults) qs.newQuery(queries[i]).execute(new
Object[] {pDay});
+ } else {
+ sr = (SelectResults) qs.newQuery(queries[i]).execute();
+ }
+ assertTrue("Size of resultset should be greater than 0 for query:
" + queries[i],
+ sr.size() > 0);
+ assertEquals("Expected and actual results do not match for query:
" + queries[i],
+ results[i], sr.size());
+
+ for (Object result : sr) {
+ if (result instanceof Struct) {
+ Object[] r = ((Struct) result).getFieldValues();
+ for (int j = 0; j < r.length; j++) {
+ if (r[j] instanceof PdxInstance || r[j] instanceof
PdxString) {
+ fail("Result object should be a domain object and not an
instance of "
+ + r[j].getClass() + " for query: " + queries[i]);
+ }
+ }
+ } else if (result instanceof PdxInstance || result instanceof
PdxString) {
+ fail("Result object should be a domain object and not an
instance of "
+ + result.getClass() + " for query: " + queries[i]);
+ }
+ }
+ } catch (Exception e) {
+ Assert.fail("Failed executing query " + queries[i], e);
+ }
+ }
+
+ return null;
+ }
+ });
+
+ // query locally on server2
+ server2.invoke(new SerializableCallable("query locally on server2") {
+ @Override
+ public Object call() throws Exception {
+ GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
+
+ QueryService qs = null;
+ SelectResults[][] sr = new SelectResults[queries.length][2];
+ // Execute query locally
+ try {
+ qs = getCache().getQueryService();
+ } catch (Exception e) {
+ Assert.fail("Failed to get QueryService.", e);
+ }
+
+ PositionPdx pos = new PositionPdx("IBM", 100);
+ PortfolioPdx.Day pDay = new PortfolioPdx(1).aDay;
+
+ for (int i = 0; i < queries.length; i++) {
+ try {
+ if (i == 0) {
+ sr[i][0] = (SelectResults) qs.newQuery(queries[i]).execute(new
Object[] {pos});
+ } else if (i == 1) {
+ sr[i][0] = (SelectResults) qs.newQuery(queries[i]).execute(new
Object[] {pDay});
+ } else {
+ sr[i][0] = (SelectResults) qs.newQuery(queries[i]).execute();
+ }
+ assertTrue("Size of resultset should be greater than 0 for query:
" + queries[i],
+ sr[i][0].size() > 0);
+ assertEquals("Expected and actual results do not match for query:
" + queries[i],
+ results[i], sr[i][0].size());
+
+ for (Object result : sr[i][0]) {
+ if (result instanceof Struct) {
+ Object[] r = ((Struct) result).getFieldValues();
+ for (int j = 0; j < r.length; j++) {
+ if (r[j] instanceof PdxInstance || r[j] instanceof
PdxString) {
+ fail("Result object should be a domain object and not an
instance of "
+ + r[j].getClass() + " for query: " + queries[i]);
+ }
+ }
+ } else if (result instanceof PdxInstance || result instanceof
PdxString) {
+ fail("Result object should be a domain object and not an
instance of "
+ + result.getClass() + " for query: " + queries[i]);
+ }
+ }
+ } catch (Exception e) {
+ Assert.fail("Failed executing query " + queries[i], e);
+ }
+ }
+
+ // create index
+ qs.createIndex("statusIndex", "p.status", name + " p");
+ qs.createIndex("IDIndex", "ID", name);
+ qs.createIndex("pIdIndex", "pos.getPid()", name + " p,
p.positions.values pos");
+ qs.createIndex("secIdIndex", "pos.secId", name + " p,
p.positions.values pos");
+
+ for (int i = 0; i < queries.length; i++) {
+ try {
+ if (i == 0) {
+ sr[i][1] = (SelectResults) qs.newQuery(queries[i]).execute(new
Object[] {pos});
+ } else if (i == 1) {
+ sr[i][1] = (SelectResults) qs.newQuery(queries[i]).execute(new
Object[] {pDay});
+ } else {
+ sr[i][1] = (SelectResults) qs.newQuery(queries[i]).execute();
+ }
+ assertTrue("Size of resultset should be greater than 0 for query:
" + queries[i],
+ sr[i][1].size() > 0);
+ assertEquals("Expected and actual results do not match for query:
" + queries[i],
+ results[i], sr[i][1].size());
+
+ for (Object result : sr[i][1]) {
+ if (result instanceof Struct) {
+ Object[] r = ((Struct) result).getFieldValues();
+ for (int j = 0; j < r.length; j++) {
+ if (r[j] instanceof PdxInstance || r[j] instanceof
PdxString) {
+ fail("Result object should be a domain object and not an
instance of "
+ + r[j].getClass() + " for query: " + queries[i]);
+ }
+ }
+ } else if (result instanceof PdxInstance || result instanceof
PdxString) {
+ fail("Result object should be a domain object and not an
instance of "
+ + result.getClass() + " for query: " + queries[i]);
+ }
+ }
+ } catch (Exception e) {
+ Assert.fail("Failed executing query " + queries[i], e);
+ }
+ }
+
+ StructSetOrResultsSet ssOrrs = new StructSetOrResultsSet();
+ ssOrrs.CompareQueryResultsWithoutAndWithIndexes(sr, queries.length,
queries);
+
+ return null;
+ }
+ });
+
+ this.closeClient(client);
+ this.closeClient(server1);
+ this.closeClient(server2);
+ }
+
+ /* Close Client */
+ public void closeClient(VM client) {
+ SerializableRunnable closeCache = new CacheSerializableRunnable("Close
Client") {
+ public void run2() throws CacheException {
+ LogWriterUtils.getLogWriter().info("### Close Client. ###");
+ try {
+ closeCache();
+ disconnectFromDS();
+ } catch (Exception ex) {
+ LogWriterUtils.getLogWriter().info("### Failed to get close client.
###");
+ }
+ }
+ };
+
+ client.invoke(closeCache);
+ }
+
+ @Override
+ protected final void preTearDownPDXQueryTestBase() throws Exception {
+ disconnectAllFromDS(); // tests all expect to create a new ds
+ // Reset the testObject numinstance for the next test.
+ TestObject.numInstance = 0;
+ PortfolioPdx.DEBUG = false;
+ // In all VM.
+ resetTestObjectInstanceCount();
+ }
+
+ @Override
+ public final void postSetUp() throws Exception {
+ resetTestObjectInstanceCount();
+ }
+
+ private void resetTestObjectInstanceCount() {
+ final Host host = Host.getHost(0);
+ for (int i = 0; i < 4; i++) {
+ VM vm = host.getVM(i);
+ vm.invoke(new CacheSerializableRunnable("Create Bridge Server") {
+ public void run2() throws CacheException {
+ TestObject.numInstance = 0;
+ PortfolioPdx.numInstance = 0;
+ PositionPdx.numInstance = 0;
+ PositionPdx.cnt = 0;
+ TestObject2.numInstance = 0;
+ }
+ });
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a64f7f60/geode-core/src/test/java/org/apache/geode/cache/query/dunit/PdxLocalQueryVersionedClassDUnitTest.java
----------------------------------------------------------------------
diff --git
a/geode-core/src/test/java/org/apache/geode/cache/query/dunit/PdxLocalQueryVersionedClassDUnitTest.java
b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/PdxLocalQueryVersionedClassDUnitTest.java
new file mode 100644
index 0000000..f83e2ff
--- /dev/null
+++
b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/PdxLocalQueryVersionedClassDUnitTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.dunit;
+
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.junit.Test;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.pdx.PdxInstance;
+import org.apache.geode.pdx.PdxInstanceFactory;
+import org.apache.geode.pdx.internal.PdxInstanceFactoryImpl;
+import org.apache.geode.test.dunit.Assert;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.NetworkUtils;
+import org.apache.geode.test.dunit.SerializableCallable;
+import org.apache.geode.test.dunit.ThreadUtils;
+import org.apache.geode.test.dunit.VM;
+import org.junit.experimental.categories.Category;
+
+@Category(DistributedTest.class)
+public class PdxLocalQueryVersionedClassDUnitTest extends PDXQueryTestBase {
+
+
+
+ /**
+ * Testing the isRemote flag which could be inconsistent when bind queries
are being executed in
+ * multiple threads. Bug #49662 is caused because of this inconsistent
behavior.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testIsRemoteFlagForRemoteQueries() throws Exception {
+ final Host host = Host.getHost(0);
+ final VM server = host.getVM(0);
+ final VM client = host.getVM(1);
+
+ final int numberOfEntries = 1000;
+ final String name = "/" + regionName;
+
+ final String query =
+ "select distinct * from " + name + " where id > $1 and id < $2 and
status = 'active'";
+
+ // Start server
+ final int port1 = (Integer) server.invoke(new SerializableCallable("Create
Server") {
+ @Override
+ public Object call() throws Exception {
+ Region r1 =
getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
+ CacheServer server = getCache().addCacheServer();
+ int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+ server.setPort(port);
+ server.start();
+ return port;
+ }
+ });
+
+ // Start client and put version1 objects on server
+ // Server does not have version1 classes in classpath
+ client.invoke(new SerializableCallable("Create client") {
+ @Override
+ public Object call() throws Exception {
+ ClientCacheFactory cf = new ClientCacheFactory();
+ cf.addPoolServer(NetworkUtils.getServerHostName(server.getHost()),
port1);
+ ClientCache cache = getClientCache(cf);
+ Region region =
cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY)
+ .create(regionName);
+
+ for (int i = 0; i < numberOfEntries; i++) {
+ PdxInstanceFactory pdxInstanceFactory =
+ PdxInstanceFactoryImpl.newCreator("PdxVersionedNewPortfolio",
false);
+ pdxInstanceFactory.writeInt("id", i);
+ pdxInstanceFactory.writeString("status", (i % 2 == 0 ? "active" :
"inactive"));
+ PdxInstance pdxInstance = pdxInstanceFactory.create();
+ region.put("key-" + i, pdxInstance);
+ }
+
+ return null;
+ }
+ });
+
+ // Execute same query remotely from client using 2 threads
+ // Since this is a bind query, the query object will be shared
+ // between the 2 threads.
+ AsyncInvocation a1 = client.invokeAsync(new SerializableCallable("Query
from client") {
+ @Override
+ public Object call() throws Exception {
+ QueryService qs = null;
+ SelectResults sr = null;
+ // Execute query remotely
+ try {
+ qs = getCache().getQueryService();
+ } catch (Exception e) {
+ Assert.fail("Failed to get QueryService.", e);
+ }
+
+ try {
+ for (int i = 0; i < 100; i++) {
+ sr = (SelectResults) qs.newQuery(query).execute(new Object[] {1,
1000});
+ }
+ Assert.assertTrue("Size of resultset should be greater than 0 for
query: " + query,
+ sr.size() > 0);
+ } catch (Exception e) {
+ Assert.fail("Failed executing query " + query, e);
+ }
+
+ return null;
+ }
+ });
+
+ AsyncInvocation a2 = client.invokeAsync(new SerializableCallable("Query
from client") {
+ @Override
+ public Object call() throws Exception {
+
+ QueryService qs = null;
+ SelectResults sr = null;
+ // Execute query remotely
+ try {
+ qs = getCache().getQueryService();
+ } catch (Exception e) {
+ Assert.fail("Failed to get QueryService.", e);
+ }
+
+ try {
+ for (int i = 0; i < 100; i++) {
+ sr = (SelectResults) qs.newQuery(query).execute(new Object[] {997,
1000});
+ }
+ Assert.assertTrue("Size of resultset should be greater than 0 for
query: " + query,
+ sr.size() > 0);
+ } catch (Exception e) {
+ Assert.fail("Failed executing query " + query, e);
+ }
+
+ return null;
+ }
+ });
+
+ ThreadUtils.join(a1, 60 * 1000);
+ ThreadUtils.join(a2, 60 * 1000);
+
+ if (a1.exceptionOccurred()) {
+ Assert.fail("Failed query execution " + a1.getException().getMessage());
+ }
+
+ if (a2.exceptionOccurred()) {
+ Assert.fail("Failed query execution " + a2.getException());
+ }
+
+ this.closeClient(client);
+ this.closeClient(server);
+
+ }
+
+}