GEODE-2062:Added new tests for PDX queries, order by queries and queries using 
indexes.

        * PDXInstance and PDXFactoryImpl were used to validate multiple class 
version test rather than writing dummy PDX classes
        * JUnit4CacheTestCase was used instead of Junit3 elements.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/a64f7f60
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/a64f7f60
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/a64f7f60

Branch: refs/heads/develop
Commit: a64f7f60e8d2e45519d7c0c7a52688e70c986a5e
Parents: 4265fa5
Author: nabarun <[email protected]>
Authored: Mon Oct 31 11:16:09 2016 -0700
Committer: nabarun <[email protected]>
Committed: Wed Nov 2 13:19:00 2016 -0700

----------------------------------------------------------------------
 .../cache/query/dunit/HashIndexDUnitTest.java   |    2 +-
 .../dunit/OrderByPartitionedDUnitTest.java      |  370 ++
 .../cache/query/dunit/PDXQueryTestBase.java     |  451 +++
 .../PdxGroupByPartitionedQueryDUnitTest.java    |   90 +
 .../query/dunit/PdxLocalQueryDUnitTest.java     |  924 +++++
 .../PdxLocalQueryVersionedClassDUnitTest.java   |  173 +
 .../cache/query/dunit/PdxQueryDUnitTest.java    | 3590 ++++++++++++++++++
 .../cache/query/dunit/PortfolioPdxVersion.java  |  103 +
 .../cache/query/dunit/PositionPdxVersion.java   |  163 +
 .../cache/query/dunit/QueryIndexDUnitTest.java  | 1331 +++++++
 10 files changed, 7196 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a64f7f60/geode-core/src/test/java/org/apache/geode/cache/query/dunit/HashIndexDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/cache/query/dunit/HashIndexDUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/HashIndexDUnitTest.java
index 78b092f..dd4cfc7 100755
--- 
a/geode-core/src/test/java/org/apache/geode/cache/query/dunit/HashIndexDUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/HashIndexDUnitTest.java
@@ -71,7 +71,7 @@ public class HashIndexDUnitTest extends 
JUnit4DistributedTestCase {
 
   @Test
   public void testHashIndexForConcurrentHashSet() throws Exception {
-    doPut(333); // 111 entries for a key in the index (> 100 so creates a 
ConcurrentHashSet)
+    doPut(333); // 111 entries for a key in the index (> 100 so creates a 
ConcurrentHashSet)//test
     doQuery();
     doUpdate(333);
     doQuery();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a64f7f60/geode-core/src/test/java/org/apache/geode/cache/query/dunit/OrderByPartitionedDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/cache/query/dunit/OrderByPartitionedDUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/OrderByPartitionedDUnitTest.java
new file mode 100644
index 0000000..323559e
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/OrderByPartitionedDUnitTest.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.dunit;
+
+import static org.apache.geode.test.dunit.Assert.*;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.query.Index;
+import org.apache.geode.cache.query.IndexExistsException;
+import org.apache.geode.cache.query.IndexInvalidException;
+import org.apache.geode.cache.query.IndexNameConflictException;
+import org.apache.geode.cache.query.IndexType;
+import org.apache.geode.cache.query.RegionNotFoundException;
+import org.apache.geode.cache.query.functional.OrderByPartitionedJUnitTest;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.SerializableRunnable;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
+import org.apache.geode.test.junit.categories.DistributedTest;
+
+@Category(DistributedTest.class)
+public class OrderByPartitionedDUnitTest extends JUnit4CacheTestCase {
+
+  private OrderByPartitionedJUnitTest createTestInstance() {
+    Host host = Host.getHost(0);
+    final VM vm0 = host.getVM(0);
+    final VM vm1 = host.getVM(1);
+    final VM vm2 = host.getVM(2);
+    final VM vm3 = host.getVM(3);
+
+    OrderByPartitionedJUnitTest test = new OrderByPartitionedJUnitTest() {
+      @Override
+      public Region createRegion(String regionName, Class valueConstraint) {
+        // TODO Auto-generated method stub
+        Region rgn = createAccessor(regionName, valueConstraint);
+        createPR(vm1, regionName, valueConstraint);
+        createPR(vm2, regionName, valueConstraint);
+        createPR(vm3, regionName, valueConstraint);
+        return rgn;
+      }
+
+      @Override
+      public Index createIndex(String indexName, String indexedExpression, 
String regionPath)
+          throws IndexInvalidException, IndexNameConflictException, 
IndexExistsException,
+          RegionNotFoundException, UnsupportedOperationException {
+        Index indx = createIndexOnAccessor(indexName, indexedExpression, 
regionPath);
+        return indx;
+      }
+
+      @Override
+      public Index createIndex(String indexName, IndexType indexType, String 
indexedExpression,
+          String fromClause) throws IndexInvalidException, 
IndexNameConflictException,
+          IndexExistsException, RegionNotFoundException, 
UnsupportedOperationException {
+        Index indx = createIndexOnAccessor(indexName, indexType, 
indexedExpression, fromClause);
+        return indx;
+      }
+
+      @Override
+      public boolean assertIndexUsedOnQueryNode() {
+        return false;
+      }
+    };
+    return test;
+  }
+
+  @Test
+  public void testOrderByWithIndexResultDefaultProjection() throws Exception {
+    Host host = Host.getHost(0);
+    final VM vm0 = host.getVM(0);
+    final VM vm1 = host.getVM(1);
+    final VM vm2 = host.getVM(2);
+    final VM vm3 = host.getVM(3);
+    Cache cache = this.getCache();
+    OrderByPartitionedJUnitTest test = createTestInstance();
+    test.testOrderByWithIndexResultDefaultProjection();
+    this.closeCache(vm0, vm1, vm2, vm3);
+  }
+
+  @Test
+  public void testOrderByWithIndexResultWithProjection() throws Exception {
+    Host host = Host.getHost(0);
+    final VM vm0 = host.getVM(0);
+    final VM vm1 = host.getVM(1);
+    final VM vm2 = host.getVM(2);
+    final VM vm3 = host.getVM(3);
+    Cache cache = this.getCache();
+    OrderByPartitionedJUnitTest test = createTestInstance();
+    test.testOrderByWithIndexResultWithProjection();
+    this.closeCache(vm0, vm1, vm2, vm3);
+  }
+
+  @Test
+  public void testMultiColOrderByWithIndexResultDefaultProjection() throws 
Exception {
+    Host host = Host.getHost(0);
+    final VM vm0 = host.getVM(0);
+    final VM vm1 = host.getVM(1);
+    final VM vm2 = host.getVM(2);
+    final VM vm3 = host.getVM(3);
+    Cache cache = this.getCache();
+    OrderByPartitionedJUnitTest test = createTestInstance();
+    test.testMultiColOrderByWithIndexResultDefaultProjection();
+    this.closeCache(vm0, vm1, vm2, vm3);
+  }
+
+  @Test
+  public void testMultiColOrderByWithIndexResultWithProjection() throws 
Exception {
+    Host host = Host.getHost(0);
+    final VM vm0 = host.getVM(0);
+    final VM vm1 = host.getVM(1);
+    final VM vm2 = host.getVM(2);
+    final VM vm3 = host.getVM(3);
+    Cache cache = this.getCache();
+    OrderByPartitionedJUnitTest test = createTestInstance();
+    test.testMultiColOrderByWithIndexResultWithProjection();
+    this.closeCache(vm0, vm1, vm2, vm3);
+  }
+
+  @Test
+  public void testMultiColOrderByWithMultiIndexResultDefaultProjection() 
throws Exception {
+    Host host = Host.getHost(0);
+    final VM vm0 = host.getVM(0);
+    final VM vm1 = host.getVM(1);
+    final VM vm2 = host.getVM(2);
+    final VM vm3 = host.getVM(3);
+    Cache cache = this.getCache();
+    OrderByPartitionedJUnitTest test = createTestInstance();
+    test.testMultiColOrderByWithMultiIndexResultDefaultProjection();
+    this.closeCache(vm0, vm1, vm2, vm3);
+  }
+
+  @Test
+  public void testMultiColOrderByWithMultiIndexResultProjection() throws 
Exception {
+    Host host = Host.getHost(0);
+    final VM vm0 = host.getVM(0);
+    final VM vm1 = host.getVM(1);
+    final VM vm2 = host.getVM(2);
+    final VM vm3 = host.getVM(3);
+    Cache cache = this.getCache();
+    OrderByPartitionedJUnitTest test = createTestInstance();
+    test.testMultiColOrderByWithMultiIndexResultProjection();
+    this.closeCache(vm0, vm1, vm2, vm3);
+  }
+
+  @Test
+  public void testLimitNotAppliedIfOrderByNotUsingIndex() throws Exception {
+    Host host = Host.getHost(0);
+    final VM vm0 = host.getVM(0);
+    final VM vm1 = host.getVM(1);
+    final VM vm2 = host.getVM(2);
+    final VM vm3 = host.getVM(3);
+    Cache cache = this.getCache();
+    OrderByPartitionedJUnitTest test = createTestInstance();
+    test.testLimitNotAppliedIfOrderByNotUsingIndex();
+    this.closeCache(vm0, vm1, vm2, vm3);
+  }
+
+  @Test
+  public void testOrderByWithNullValuesUseIndex() throws Exception {
+    Host host = Host.getHost(0);
+    final VM vm0 = host.getVM(0);
+    final VM vm1 = host.getVM(1);
+    final VM vm2 = host.getVM(2);
+    final VM vm3 = host.getVM(3);
+    Cache cache = this.getCache();
+    OrderByPartitionedJUnitTest test = createTestInstance();
+    test.testOrderByWithNullValuesUseIndex();
+    this.closeCache(vm0, vm1, vm2, vm3);
+  }
+
+  @Test
+  public void testOrderByForUndefined() throws Exception {
+    Host host = Host.getHost(0);
+    final VM vm0 = host.getVM(0);
+    final VM vm1 = host.getVM(1);
+    final VM vm2 = host.getVM(2);
+    final VM vm3 = host.getVM(3);
+    Cache cache = this.getCache();
+    OrderByPartitionedJUnitTest test = createTestInstance();
+    test.testOrderByForUndefined();
+    this.closeCache(vm0, vm1, vm2, vm3);
+  }
+
+  @Test
+  public void testOrderedResultsPartitionedRegion_Bug43514_1() throws 
Exception {
+    Host host = Host.getHost(0);
+    final VM vm0 = host.getVM(0);
+    final VM vm1 = host.getVM(1);
+    final VM vm2 = host.getVM(2);
+    final VM vm3 = host.getVM(3);
+    Cache cache = this.getCache();
+    OrderByPartitionedJUnitTest test = createTestInstance();
+    test.testOrderedResultsPartitionedRegion_Bug43514_1();
+    this.closeCache(vm0, vm1, vm2, vm3);
+  }
+
+  @Test
+  public void testOrderedResultsPartitionedRegion_Bug43514_2() throws 
Exception {
+    Host host = Host.getHost(0);
+    final VM vm0 = host.getVM(0);
+    final VM vm1 = host.getVM(1);
+    final VM vm2 = host.getVM(2);
+    final VM vm3 = host.getVM(3);
+    Cache cache = this.getCache();
+    OrderByPartitionedJUnitTest test = createTestInstance();
+    test.testOrderedResultsPartitionedRegion_Bug43514_2();
+    this.closeCache(vm0, vm1, vm2, vm3);
+  }
+
+  @Test
+  public void testOrderByWithNullValues() throws Exception {
+    Host host = Host.getHost(0);
+    final VM vm0 = host.getVM(0);
+    final VM vm1 = host.getVM(1);
+    final VM vm2 = host.getVM(2);
+    final VM vm3 = host.getVM(3);
+    Cache cache = this.getCache();
+    OrderByPartitionedJUnitTest test = createTestInstance();
+    test.testOrderByWithNullValues();
+    this.closeCache(vm0, vm1, vm2, vm3);
+  }
+
+  private void createBuckets(VM vm) {
+    vm.invoke(new SerializableRunnable("create accessor") {
+      public void run() {
+        Cache cache = getCache();
+        Region region = cache.getRegion("region");
+        for (int i = 0; i < 10; i++) {
+          region.put(i, i);
+        }
+      }
+    });
+  }
+
+  private void createPR(VM vm, final String regionName, final Class 
valueConstraint) {
+    vm.invoke(new SerializableRunnable("create data store") {
+      public void run() {
+        Cache cache = getCache();
+        PartitionAttributesFactory paf = new PartitionAttributesFactory();
+        paf.setTotalNumBuckets(10);
+        
cache.createRegionFactory(RegionShortcut.PARTITION).setValueConstraint(valueConstraint)
+            .setPartitionAttributes(paf.create()).create(regionName);
+      }
+    });
+  }
+
+  private Region createAccessor(String regionName, Class valueConstraint) {
+    Cache cache = getCache();
+    PartitionAttributesFactory paf = new PartitionAttributesFactory();
+    paf.setTotalNumBuckets(10);
+    paf.setLocalMaxMemory(0);
+    return cache.createRegionFactory(RegionShortcut.PARTITION_PROXY)
+        
.setValueConstraint(valueConstraint).setPartitionAttributes(paf.create())
+        .create(regionName);
+  }
+
+  private void createIndex(VM vm, final String indexName, final String 
indexedExpression,
+      final String regionPath) {
+    vm.invoke(new SerializableRunnable("create index") {
+      public void run() {
+        try {
+          Cache cache = getCache();
+          cache.getQueryService().createIndex(indexName, indexedExpression, 
regionPath);
+        } catch (RegionNotFoundException e) {
+          fail(e.toString());
+        } catch (IndexExistsException e) {
+          fail(e.toString());
+        } catch (IndexNameConflictException e) {
+          fail(e.toString());
+        }
+      }
+    });
+  }
+
+  private void createIndex(VM vm, final String indexName, IndexType indexType,
+      final String indexedExpression, final String fromClause) {
+    int indxTypeCode = -1;
+    if (indexType.equals(IndexType.FUNCTIONAL)) {
+      indxTypeCode = 0;
+    } else if (indexType.equals(IndexType.PRIMARY_KEY)) {
+      indxTypeCode = 1;
+    } else if (indexType.equals(IndexType.HASH)) {
+      indxTypeCode = 2;
+    }
+    final int finalIndxTypeCode = indxTypeCode;
+    vm.invoke(new SerializableRunnable("create index") {
+      public void run() {
+        try {
+          Cache cache = getCache();
+          IndexType indxType = null;
+          if (finalIndxTypeCode == 0) {
+            indxType = IndexType.FUNCTIONAL;
+          } else if (finalIndxTypeCode == 1) {
+            indxType = IndexType.PRIMARY_KEY;
+          } else if (finalIndxTypeCode == 2) {
+            indxType = IndexType.HASH;
+          }
+          cache.getQueryService().createIndex(indexName, indxType, 
indexedExpression, fromClause);
+        } catch (RegionNotFoundException e) {
+          fail(e.toString());
+        } catch (IndexExistsException e) {
+          fail(e.toString());
+        } catch (IndexNameConflictException e) {
+          fail(e.toString());
+        }
+      }
+    });
+  }
+
+  private Index createIndexOnAccessor(final String indexName, final String 
indexedExpression,
+      final String regionPath) {
+    try {
+      Cache cache = getCache();
+      return cache.getQueryService().createIndex(indexName, indexedExpression, 
regionPath);
+    } catch (RegionNotFoundException e) {
+      fail(e.toString());
+    } catch (IndexExistsException e) {
+      fail(e.toString());
+    } catch (IndexNameConflictException e) {
+      fail(e.toString());
+    }
+    return null;
+  }
+
+  private Index createIndexOnAccessor(final String indexName, IndexType 
indexType,
+      final String indexedExpression, final String fromClause) {
+    try {
+      Cache cache = getCache();
+      return cache.getQueryService().createIndex(indexName, indexType, 
indexedExpression,
+          fromClause);
+    } catch (RegionNotFoundException e) {
+      fail(e.toString());
+    } catch (IndexExistsException e) {
+      fail(e.toString());
+    } catch (IndexNameConflictException e) {
+      fail(e.toString());
+    }
+    return null;
+  }
+
+  private void closeCache(VM... vms) {
+    for (VM vm : vms) {
+      vm.invoke(new SerializableRunnable() {
+        public void run() {
+          getCache().close();
+        }
+      });
+    }
+  }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a64f7f60/geode-core/src/test/java/org/apache/geode/cache/query/dunit/PDXQueryTestBase.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/cache/query/dunit/PDXQueryTestBase.java
 
b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/PDXQueryTestBase.java
new file mode 100644
index 0000000..2636d84
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/PDXQueryTestBase.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.dunit;
+
+import static org.apache.geode.test.dunit.Assert.*;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.geode.LogWriter;
+import org.apache.geode.cache.AttributesFactory;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheException;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.client.PoolFactory;
+import org.apache.geode.cache.client.PoolManager;
+import org.apache.geode.cache.query.CacheUtils;
+import org.apache.geode.cache.query.Query;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.cache.query.Struct;
+import org.apache.geode.cache.query.data.PortfolioPdx;
+import org.apache.geode.cache.query.data.PositionPdx;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.cache30.CacheSerializableRunnable;
+import org.apache.geode.compression.Compressor;
+import org.apache.geode.compression.SnappyCompressor;
+import org.apache.geode.i18n.LogWriterI18n;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.pdx.PdxReader;
+import org.apache.geode.pdx.PdxSerializable;
+import org.apache.geode.pdx.PdxWriter;
+import org.apache.geode.test.dunit.Assert;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.SerializableRunnable;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
+
+public abstract class PDXQueryTestBase extends JUnit4CacheTestCase {
+
+  /** The port on which the bridge server was started in this VM */
+  private static int bridgeServerPort;
+  protected static final Compressor compressor = 
SnappyCompressor.getDefaultInstance();
+  protected final String rootRegionName = "root";
+  protected final String regionName = "PdxTest";
+  protected final String regionName2 = "PdxTest2";
+  protected final String regName = "/" + rootRegionName + "/" + regionName;
+  protected final String regName2 = "/" + rootRegionName + "/" + regionName2;
+  protected final String[] queryString = new String[] {"SELECT DISTINCT id 
FROM " + regName, // 0
+      "SELECT * FROM " + regName, // 1
+      "SELECT ticker FROM " + regName, // 2
+      "SELECT * FROM " + regName + " WHERE id > 5", // 3
+      "SELECT p FROM " + regName + " p, p.idTickers idTickers WHERE p.ticker = 
'vmware'", // 4
+  };
+
+  protected static int getCacheServerPort() {
+    return bridgeServerPort;
+  }
+
+  @Override
+  public final void preTearDownCacheTestCase() throws Exception {
+    preTearDownPDXQueryTestBase();
+    disconnectAllFromDS(); // tests all expect to create a new ds
+    // Reset the testObject numinstance for the next test.
+    TestObject.numInstance = 0;
+    // In all VM.
+    resetTestObjectInstanceCount();
+  }
+
+  protected void preTearDownPDXQueryTestBase() throws Exception {}
+
+  private void resetTestObjectInstanceCount() {
+    final Host host = Host.getHost(0);
+    for (int i = 0; i < 4; i++) {
+      VM vm = host.getVM(i);
+      vm.invoke(new CacheSerializableRunnable("Create Bridge Server") {
+        public void run2() throws CacheException {
+          TestObject.numInstance = 0;
+          PortfolioPdx.numInstance = 0;
+          PositionPdx.numInstance = 0;
+          PositionPdx.cnt = 0;
+          TestObject2.numInstance = 0;
+        }
+      });
+    }
+  }
+
+  public void createPool(VM vm, String poolName, String server, int port,
+      boolean subscriptionEnabled) {
+    createPool(vm, poolName, new String[] {server}, new int[] {port}, 
subscriptionEnabled);
+  }
+
+  public void createPool(VM vm, String poolName, String server, int port) {
+    createPool(vm, poolName, new String[] {server}, new int[] {port}, false);
+  }
+
+  public void createPool(VM vm, final String poolName, final String[] servers, 
final int[] ports,
+      final boolean subscriptionEnabled) {
+    createPool(vm, poolName, servers, ports, subscriptionEnabled, 0);
+  }
+
+  public void createPool(VM vm, final String poolName, final String[] servers, 
final int[] ports,
+      final boolean subscriptionEnabled, final int redundancy) {
+    vm.invoke(new CacheSerializableRunnable("createPool :" + poolName) {
+      public void run2() throws CacheException {
+        // Create Cache.
+        Properties props = new Properties();
+        props.setProperty("mcast-port", "0");
+        props.setProperty("locators", "");
+        getSystem(props);
+        getCache();
+        PoolFactory cpf = PoolManager.createFactory();
+        cpf.setSubscriptionEnabled(subscriptionEnabled);
+        cpf.setSubscriptionRedundancy(redundancy);
+        for (int i = 0; i < servers.length; i++) {
+          cpf.addServer(servers[i], ports[i]);
+        }
+        cpf.create(poolName);
+      }
+    });
+  }
+
+  public void executeClientQueries(VM vm, final String poolName, final String 
queryStr) {
+    vm.invoke(new CacheSerializableRunnable("Execute queries") {
+      public void run2() throws CacheException {
+        QueryService remoteQueryService = null;
+        QueryService localQueryService = null;
+        SelectResults[][] rs = new SelectResults[1][2];
+
+        try {
+          remoteQueryService = (PoolManager.find(poolName)).getQueryService();
+          localQueryService = getCache().getQueryService();
+        } catch (Exception e) {
+          Assert.fail("Failed to get QueryService.", e);
+        }
+
+        try {
+          Query query = remoteQueryService.newQuery(queryStr);
+          rs[0][0] = (SelectResults) query.execute();
+          query = localQueryService.newQuery(queryStr);
+          rs[0][1] = (SelectResults) query.execute();
+          // Compare local and remote query results.
+          if (!CacheUtils.compareResultsOfWithAndWithoutIndex(rs)) {
+            fail("Local and Remote Query Results are not matching for query :" 
+ queryStr);
+          }
+        } catch (Exception e) {
+          Assert.fail("Failed executing " + queryStr, e);
+        }
+      }
+    });
+  }
+
+  public void printResults(SelectResults results, String message) {
+    Object r;
+    Struct s;
+    LogWriterI18n logger = GemFireCacheImpl.getInstance().getLoggerI18n();
+    logger.fine(message);
+    int row = 0;
+    for (Iterator iter = results.iterator(); iter.hasNext();) {
+      r = iter.next();
+      row++;
+      if (r instanceof Struct) {
+        s = (Struct) r;
+        String[] fieldNames = ((Struct) r).getStructType().getFieldNames();
+        for (int i = 0; i < fieldNames.length; i++) {
+          logger.fine("### Row " + row + "\n" + "Field: " + fieldNames[i] + " 
> "
+              + s.get(fieldNames[i]).toString());
+        }
+      } else {
+        logger.fine("#### Row " + row + "\n" + r);
+      }
+    }
+  }
+
+  protected void configAndStartBridgeServer() {
+    configAndStartBridgeServer(false, false, false, null);
+  }
+
+  protected void configAndStartBridgeServer(boolean isPr, boolean isAccessor) {
+    configAndStartBridgeServer(false, false, false, null);
+  }
+
+  protected void configAndStartBridgeServer(boolean isPr, boolean isAccessor, 
boolean asyncIndex,
+      Compressor compressor) {
+    AttributesFactory factory = new AttributesFactory();
+    if (isPr) {
+      PartitionAttributesFactory paf = new PartitionAttributesFactory();
+      if (isAccessor) {
+        paf.setLocalMaxMemory(0);
+      }
+      PartitionAttributes prAttr = 
paf.setTotalNumBuckets(20).setRedundantCopies(0).create();
+      factory.setPartitionAttributes(prAttr);
+    } else {
+      factory.setScope(Scope.DISTRIBUTED_ACK);
+      factory.setDataPolicy(DataPolicy.REPLICATE);
+    }
+    if (asyncIndex) {
+      factory.setIndexMaintenanceSynchronous(!asyncIndex);
+    }
+    if (compressor != null) {
+      factory.setCompressor(compressor);
+    }
+
+    createRegion(this.regionName, this.rootRegionName, factory.create());
+    createRegion(this.regionName2, this.rootRegionName, factory.create());
+
+    try {
+      startBridgeServer(0, false);
+    } catch (Exception ex) {
+      Assert.fail("While starting CacheServer", ex);
+    }
+  }
+
+  protected void executeCompiledQueries(String poolName, Object[][] params) {
+    SelectResults results = null;
+    QueryService qService = null;
+
+    try {
+      qService = (PoolManager.find(poolName)).getQueryService();
+    } catch (Exception e) {
+      Assert.fail("Failed to get QueryService.", e);
+    }
+
+    for (int i = 0; i < queryString.length; i++) {
+      try {
+        Query query = qService.newQuery(queryString[i]);
+        results = (SelectResults) query.execute(params[i]);
+      } catch (Exception e) {
+        Assert.fail("Failed executing " + queryString[i], e);
+      }
+    }
+  }
+
+  /**
+   * Starts a bridge server on the given port, using the given 
deserializeValues and
+   * notifyBySubscription to serve up the given region.
+   */
+  protected void startBridgeServer(int port, boolean notifyBySubscription) 
throws IOException {
+    Cache cache = getCache();
+    CacheServer bridge = cache.addCacheServer();
+    bridge.setPort(port);
+    bridge.setNotifyBySubscription(notifyBySubscription);
+    bridge.start();
+    bridgeServerPort = bridge.getPort();
+  }
+
+  /**
+   * Stops the bridge server that serves up the given cache.
+   */
+  protected void stopBridgeServer(Cache cache) {
+    CacheServer bridge = (CacheServer) 
cache.getCacheServers().iterator().next();
+    bridge.stop();
+    assertFalse(bridge.isRunning());
+  }
+
+  public void closeClient(VM client) {
+    SerializableRunnable closeCache = new CacheSerializableRunnable("Close 
Client") {
+      public void run2() throws CacheException {
+        try {
+          closeCache();
+          disconnectFromDS();
+        } catch (Exception ex) {
+        }
+      }
+    };
+
+    client.invoke(closeCache);
+  }
+
+  /**
+   * Starts a bridge server on the given port, using the given 
deserializeValues and
+   * notifyBySubscription to serve up the given region.
+   */
+  protected void startCacheServer(int port, boolean notifyBySubscription) 
throws IOException {
+    Cache cache = CacheFactory.getAnyInstance();
+    CacheServer bridge = cache.addCacheServer();
+    bridge.setPort(port);
+    bridge.setNotifyBySubscription(notifyBySubscription);
+    bridge.start();
+    bridgeServerPort = bridge.getPort();
+  }
+
+  public static class TestObject2 implements PdxSerializable {
+    public int _id;
+    public static int numInstance = 0;
+
+    public TestObject2() {
+      numInstance++;
+    }
+
+    public TestObject2(int id) {
+      this._id = id;
+      numInstance++;
+    }
+
+    public int getId() {
+      return this._id;
+    }
+
+    public void toData(PdxWriter out) {
+      out.writeInt("id", this._id);
+    }
+
+    public void fromData(PdxReader in) {
+      this._id = in.readInt("id");
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      GemFireCacheImpl.getInstance().getLoggerI18n()
+          .fine("In TestObject2.equals() this: " + this + " other :" + o);
+      TestObject2 other = (TestObject2) o;
+      if (_id == other._id) {
+        return true;
+      } else {
+        return false;
+      }
+    }
+
+    @Override
+    public int hashCode() {
+      GemFireCacheImpl.getInstance().getLoggerI18n()
+          .fine("In TestObject2.hashCode() : " + this._id);
+      return this._id;
+    }
+  }
+
+  public static class TestObject implements PdxSerializable {
+    public static LogWriter log;
+    protected String _ticker;
+    protected int _price;
+    public int id;
+    public int important;
+    public int selection;
+    public int select;
+    public static int numInstance = 0;
+    public Map idTickers = new HashMap();
+    public HashMap positions = new HashMap();
+    public TestObject2 test;
+
+    public TestObject() {
+      if (log != null) {
+        log.info("TestObject ctor stack trace", new Exception());
+      }
+      numInstance++;
+    }
+
+    public TestObject(int id, String ticker) {
+      if (log != null) {
+        log.info("TestObject ctor stack trace", new Exception());
+      }
+      this.id = id;
+      this._ticker = ticker;
+      this._price = id;
+      this.important = id;
+      this.selection = id;
+      this.select = id;
+      numInstance++;
+      idTickers.put(id + "", ticker);
+      this.test = new TestObject2(id);
+    }
+
+    public TestObject(int id, String ticker, int numPositions) {
+      this(id, ticker);
+      for (int i = 0; i < numPositions; i++) {
+        positions.put(id + i, new PositionPdx(ticker + ":" + id + ":" + i, (id 
+ 100)));
+      }
+    }
+
+    public int getIdValue() {
+      return this.id;
+    }
+
+    public String getTicker() {
+      return this._ticker;
+    }
+
+    public int getPriceValue() {
+      return this._price;
+    }
+
+    public HashMap getPositions(String id) {
+      return this.positions;
+    }
+
+    public String getStatus() {
+      return (id % 2 == 0) ? "active" : "inactive";
+    }
+
+    public void toData(PdxWriter out) {
+      out.writeInt("id", this.id);
+      out.writeString("ticker", this._ticker);
+      out.writeInt("price", this._price);
+      out.writeObject("idTickers", this.idTickers);
+      out.writeObject("positions", this.positions);
+      out.writeObject("test", this.test);
+    }
+
+    public void fromData(PdxReader in) {
+      this.id = in.readInt("id");
+      this._ticker = in.readString("ticker");
+      this._price = in.readInt("price");
+      this.idTickers = (Map) in.readObject("idTickers");
+      this.positions = (HashMap) in.readObject("positions");
+      this.test = (TestObject2) in.readObject("test");
+    }
+
+    public String toString() {
+      StringBuffer buffer = new StringBuffer();
+      buffer.append("TestObject [").append("id=").append(this.id).append("; 
ticker=")
+          .append(this._ticker).append("; 
price=").append(this._price).append("]");
+      return buffer.toString();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      TestObject other = (TestObject) o;
+      if ((id == other.id) && (_ticker.equals(other._ticker))) {
+        return true;
+      } else {
+        return false;
+      }
+    }
+
+    @Override
+    public int hashCode() {
+      GemFireCacheImpl.getInstance().getLoggerI18n().fine("In 
TestObject.hashCode() : " + this.id);
+      return this.id;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a64f7f60/geode-core/src/test/java/org/apache/geode/cache/query/dunit/PdxGroupByPartitionedQueryDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/cache/query/dunit/PdxGroupByPartitionedQueryDUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/PdxGroupByPartitionedQueryDUnitTest.java
new file mode 100644
index 0000000..9528693
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/PdxGroupByPartitionedQueryDUnitTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.dunit;
+
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.query.functional.PdxGroupByTestImpl;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.SerializableRunnable;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.junit.categories.DistributedTest;
+
+@Category(DistributedTest.class)
+public class PdxGroupByPartitionedQueryDUnitTest extends GroupByDUnitImpl {
+
+  @Override
+  protected PdxGroupByTestImpl createTestInstance() {
+    Host host = Host.getHost(0);
+    final VM vm0 = host.getVM(0);
+    final VM vm1 = host.getVM(1);
+    final VM vm2 = host.getVM(2);
+    final VM vm3 = host.getVM(3);
+
+    PdxGroupByTestImpl test = new PdxGroupByTestImpl() {
+
+      @Override
+      public Region createRegion(String regionName, Class valueConstraint) {
+        Region rgn = createAccessor(regionName, valueConstraint);
+        createPR(vm1, regionName, valueConstraint);
+        createPR(vm2, regionName, valueConstraint);
+        createPR(vm3, regionName, valueConstraint);
+        return rgn;
+      }
+    };
+    return test;
+  }
+
+  private void createBuckets(VM vm) {
+    vm.invoke(new SerializableRunnable("create accessor") {
+      public void run() {
+        Cache cache = getCache();
+        Region region = cache.getRegion("region");
+        for (int i = 0; i < 10; i++) {
+          region.put(i, i);
+        }
+      }
+    });
+  }
+
+  private void createPR(VM vm, final String regionName, final Class 
valueConstraint) {
+    vm.invoke(new SerializableRunnable("create data store") {
+      public void run() {
+        Cache cache = getCache();
+        PartitionAttributesFactory paf = new PartitionAttributesFactory();
+        paf.setTotalNumBuckets(10);
+        
cache.createRegionFactory(RegionShortcut.PARTITION).setValueConstraint(valueConstraint)
+            .setPartitionAttributes(paf.create()).create(regionName);
+      }
+    });
+  }
+
+  private Region createAccessor(String regionName, Class valueConstraint) {
+
+    Cache cache = getCache();
+    PartitionAttributesFactory paf = new PartitionAttributesFactory();
+    paf.setTotalNumBuckets(10);
+    paf.setLocalMaxMemory(0);
+    return cache.createRegionFactory(RegionShortcut.PARTITION_PROXY)
+        
.setValueConstraint(valueConstraint).setPartitionAttributes(paf.create())
+        .create(regionName);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a64f7f60/geode-core/src/test/java/org/apache/geode/cache/query/dunit/PdxLocalQueryDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/cache/query/dunit/PdxLocalQueryDUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/PdxLocalQueryDUnitTest.java
new file mode 100644
index 0000000..b934240
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/PdxLocalQueryDUnitTest.java
@@ -0,0 +1,924 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.dunit;
+
+import static org.apache.geode.test.dunit.Assert.*;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.CacheException;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.cache.query.Struct;
+import org.apache.geode.cache.query.data.PortfolioPdx;
+import org.apache.geode.cache.query.data.PositionPdx;
+import org.apache.geode.cache.query.functional.StructSetOrResultsSet;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.cache30.CacheSerializableRunnable;
+import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.pdx.PdxInstance;
+import org.apache.geode.pdx.PdxInstanceFactory;
+import org.apache.geode.pdx.internal.PdxInstanceEnum;
+import org.apache.geode.pdx.internal.PdxInstanceFactoryImpl;
+import org.apache.geode.pdx.internal.PdxString;
+import org.apache.geode.test.dunit.Assert;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.LogWriterUtils;
+import org.apache.geode.test.dunit.NetworkUtils;
+import org.apache.geode.test.dunit.SerializableCallable;
+import org.apache.geode.test.dunit.SerializableRunnable;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.junit.categories.DistributedTest;
+
+@Category(DistributedTest.class)
+public class PdxLocalQueryDUnitTest extends PDXQueryTestBase {
+
+  @Test
+  public void testLocalPdxQueriesVerifyNoDeserialization() throws Exception {
+    final Host host = Host.getHost(0);
+    final VM server1 = host.getVM(0);
+    final VM server2 = host.getVM(1);
+
+    final int numberOfEntries = 10;
+    final String name = "/" + regionName;
+
+    final String[] queries = {"select * from " + name + " where status = 
'inactive'",
+        "select p from " + name + " p where p.status = 'inactive'",
+        "select * from " + name + " p, p.positions.values v where v.secId = 
'IBM'",
+        "select p.status from " + name + " p where p.status = 'inactive' or 
p.ID > 0",
+        "select * from " + name + " p where p.status = 'inactive' and p.ID >= 
0",
+        "select p.status from " + name + " p where p.status in set 
('inactive', 'active')",
+        "select * from " + name + " p where p.ID > 0 and p.ID < 10",};
+
+    // Start server1
+    server1.invoke(new SerializableCallable("Create Server1") {
+      @Override
+      public Object call() throws Exception {
+        Region r1 = 
getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
+
+        for (int i = 0; i < numberOfEntries; i++) {
+          PortfolioPdx p = new PortfolioPdx(i);
+          r1.put("key-" + i, p);
+        }
+        return null;
+      }
+    });
+
+    // Start server2
+    server2.invoke(new SerializableCallable("Create Server2") {
+      @Override
+      public Object call() throws Exception {
+        Region r1 = 
getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
+
+        QueryService qs = null;
+        SelectResults sr = null;
+        // Execute query locally
+        try {
+          qs = getCache().getQueryService();
+        } catch (Exception e) {
+          Assert.fail("Failed to get QueryService.", e);
+        }
+
+        for (int i = 0; i < queries.length; i++) {
+          try {
+            sr = (SelectResults) qs.newQuery(queries[i]).execute();
+            assertTrue("Size of resultset should be greater than 0 for query: 
" + queries[i],
+                sr.size() > 0);
+          } catch (Exception e) {
+            Assert.fail("Failed executing query " + queries[i], e);
+          }
+        }
+        assertEquals("Unexpected number of objects deserialized ", 0, 
PortfolioPdx.numInstance);
+        return null;
+      }
+    });
+    this.closeClient(server1);
+    this.closeClient(server2);
+
+  }
+
+  @Test
+  public void testLocalPdxQueriesReadSerialized() throws Exception {
+    final Host host = Host.getHost(0);
+    final VM server1 = host.getVM(0);
+    final VM server2 = host.getVM(1);
+
+    final int numberOfEntries = 10;
+    final String name = "/" + regionName;
+
+    final String[] queries = {"select * from " + name + " where position1 = 
$1",
+        "select * from " + name + " where aDay = $1",
+        "select * from " + name + " where status = 'inactive'",
+        "select distinct * from " + name + " where status = 'inactive'",
+        "select p from " + name + " p where p.status = 'inactive'",
+        "select * from " + name + " p, p.positions.values v where v.secId = 
'IBM'",
+        "select * from " + name + " p where p.status = 'inactive' or p.ID > 0",
+        "select * from " + name + " p where p.status = 'inactive' and p.ID >= 
0",
+        "select * from " + name + " p where p.status in set ('inactive', 
'active')",
+        "select * from " + name + " p where p.ID > 0 and p.ID < 10",};
+
+    // Start server1
+    server1.invoke(new SerializableCallable("Create Server1") {
+      @Override
+      public Object call() throws Exception {
+        Region r1 = 
getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
+
+        for (int i = 0; i < numberOfEntries; i++) {
+          PortfolioPdx p = new PortfolioPdx(i);
+          r1.put("key-" + i, p);
+        }
+        return null;
+      }
+    });
+
+    // Start server2
+    server2.invoke(new SerializableCallable("Create Server2") {
+      @Override
+      public Object call() throws Exception {
+        ((GemFireCacheImpl) getCache()).setReadSerialized(true);
+        Region r1 = 
getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
+
+        QueryService qs = null;
+        SelectResults sr = null;
+        // Execute query locally
+        try {
+          qs = getCache().getQueryService();
+        } catch (Exception e) {
+          Assert.fail("Failed to get QueryService.", e);
+        }
+
+        PositionPdx pos = new PositionPdx("IBM", 100);
+        PdxInstanceFactory out = PdxInstanceFactoryImpl
+            .newCreator("org.apache.geode.cache.query.data.PositionPdx", 
false);
+        out.writeLong("avg20DaysVol", 0);
+        out.writeString("bondRating", "");
+        out.writeDouble("convRatio", 0);
+        out.writeString("country", "");
+        out.writeDouble("delta", 0);
+        out.writeLong("industry", 0);
+        out.writeLong("issuer", 0);
+        out.writeDouble("mktValue", pos.getMktValue());
+        out.writeDouble("qty", 0);
+        out.writeString("secId", pos.secId);
+        out.writeString("secIdIndexed", pos.secIdIndexed);
+        out.writeString("secLinks", "");
+        out.writeDouble("sharesOutstanding", pos.getSharesOutstanding());
+        out.writeString("underlyer", "");
+        out.writeLong("volatility", 0);
+        out.writeInt("pid", pos.getPid());
+        out.writeInt("portfolioId", 0);
+        out.markIdentityField("secId");
+        PdxInstance pi = out.create();
+
+        PortfolioPdx.Day pDay = new PortfolioPdx(1).aDay;
+        PdxInstanceEnum pdxEnum = new PdxInstanceEnum(pDay);
+
+        for (int i = 0; i < queries.length; i++) {
+          try {
+            if (i == 0) {
+              sr = (SelectResults) qs.newQuery(queries[i]).execute(new 
Object[] {pi});
+            } else if (i == 1) {
+              sr = (SelectResults) qs.newQuery(queries[i]).execute(new 
Object[] {pdxEnum});
+            } else {
+              sr = (SelectResults) qs.newQuery(queries[i]).execute();
+            }
+            assertTrue("Size of resultset should be greater than 0 for query: 
" + queries[i],
+                sr.size() > 0);
+            for (Object result : sr) {
+              if (result instanceof Struct) {
+                Object[] r = ((Struct) result).getFieldValues();
+                for (int j = 0; j < r.length; j++) {
+                  if (!(r[j] instanceof PdxInstance)) {
+                    fail("Result object should be a PdxInstance  and not an 
instance of "
+                        + r[j].getClass() + " for query: " + queries[i]);
+                  }
+                }
+              } else if (!(result instanceof PdxInstance)) {
+                fail("Result object should be a PdxInstance  and not an 
instance of "
+                    + result.getClass() + " for query: " + queries[i]);
+              }
+            }
+          } catch (Exception e) {
+            Assert.fail("Failed executing query " + queries[i], e);
+          }
+        }
+        return null;
+      }
+    });
+    this.closeClient(server1);
+    this.closeClient(server2);
+
+  }
+
+  @Test
+  public void testLocalPdxQueries() throws Exception {
+    final Host host = Host.getHost(0);
+    final VM server1 = host.getVM(1);
+    final VM client = host.getVM(2);
+
+    final int numberOfEntries = 10;
+    final String name = "/" + regionName;
+    final String name2 = "/" + regionName2;
+    final String[] queries = {"select * from " + name + " where position1 = 
$1",
+        "select * from " + name + " where aDay = $1",
+        "select distinct * from " + name + " p where p.status = 'inactive'", 
// numberOfEntries
+        "select distinct p.status from " + name + " p where p.status = 
'inactive'", // 1
+        "select p from " + name + " p where p.status = 'inactive'", // 
numberOfEntries
+        "select * from " + name + " p, p.positions.values v where v.secId = 
'IBM'", // 4
+        "select v from " + name + " p, p.positions.values v where v.secId = 
'IBM'", // 4
+        "select p.status from " + name + " p where p.status = 'inactive'", // 
numberOfEntries
+        "select distinct * from " + name + " p where p.status = 'inactive' 
order by p.ID", // numberOfEntries
+        "select * from " + name + " p where p.status = 'inactive' or p.ID > 
0", // 19
+        "select * from " + name + " p where p.status = 'inactive' and p.ID >= 
0", // numberOfEntries
+        "select * from " + name + " p where p.status in set ('inactive', 
'active')", // numberOfEntries*2
+        "select * from " + name + " p where p.ID > 0 and p.ID < 10", // 9
+        "select v from " + name + " p, p.positions.values v where p.status = 
'inactive'", // numberOfEntries*2
+        "select v.secId from " + name + " p, p.positions.values v where 
p.status = 'inactive'", // numberOfEntries*2
+        "select distinct p from " + name
+            + " p, p.positions.values v where p.status = 'inactive' and v.pid 
>= 0", // numberOfEntries
+        "select distinct p from " + name
+            + " p, p.positions.values v where p.status = 'inactive' or v.pid > 
0", // numberOfEntries*2
+        "select distinct * from " + name + " p, p.positions.values v where 
p.status = 'inactive'", // numberOfEntries*2
+        "select * from " + name + ".values v where v.status = 'inactive'", // 
numberOfEntries
+        "select v from " + name + " v where v in (select p from " + name + " p 
where p.ID > 0)", // 19
+        "select v from " + name + " v where v.status in (select distinct 
p.status from " + name
+            + " p where p.status = 'inactive')", // numberOfEntries
+        "select * from " + name + " r1, " + name2 + " r2 where r1.status = 
r2.status", // 200
+        "select * from " + name + " r1, " + name2
+            + " r2 where r1.status = r2.status and r1.status = 'active'", // 
100
+        "select r2.status from " + name + " r1, " + name2
+            + " r2 where r1.status = r2.status and r1.status = 'active'", // 
100
+        "select distinct r2.status from " + name + " r1, " + name2
+            + " r2 where r1.status = r2.status and r1.status = 'active'", // 1
+        "select * from " + name + " v where v.status = ELEMENT (select 
distinct p.status from "
+            + name + " p where p.status = 'inactive')", // numberOfEntries
+    };
+
+    final int[] results = {2, 3, numberOfEntries, 1, numberOfEntries, 4, 4, 
numberOfEntries,
+        numberOfEntries, 19, numberOfEntries, numberOfEntries * 2, 9, 
numberOfEntries * 2,
+        numberOfEntries * 2, numberOfEntries, numberOfEntries * 2, 
numberOfEntries * 2,
+        numberOfEntries, 19, numberOfEntries, 200, 100, 100, 1, 
numberOfEntries};
+
+    // Start server1
+    final int port1 = (Integer) server1.invoke(new 
SerializableCallable("Create Server1") {
+      @Override
+      public Object call() throws Exception {
+        Region r1 = 
getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
+        Region r2 = 
getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName2);
+
+        for (int i = 0; i < numberOfEntries; i++) {
+          PortfolioPdx p = new PortfolioPdx(i);
+          r1.put("key-" + i, p);
+          r2.put("key-" + i, p);
+        }
+
+        CacheServer server = getCache().addCacheServer();
+        int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+        server.setPort(port);
+        server.start();
+        return port;
+      }
+    });
+
+    // client loads pdx objects on server
+    client.invoke(new SerializableCallable("Create client") {
+      @Override
+      public Object call() throws Exception {
+        ClientCacheFactory cf = new ClientCacheFactory();
+        cf.addPoolServer(NetworkUtils.getServerHostName(server1.getHost()), 
port1);
+        ClientCache cache = getClientCache(cf);
+
+        Region region =
+            
cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(regionName);
+        Region region2 =
+            
cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(regionName2);
+
+        for (int i = numberOfEntries; i < numberOfEntries * 2; i++) {
+          PortfolioPdx p = new PortfolioPdx(i);
+          region.put("key-" + i, p);
+          region2.put("key-" + i, p);
+        }
+        return null;
+      }
+    });
+
+    // query locally on server1 to verify pdx objects are not deserialized
+    server1.invoke(new SerializableCallable("query locally on server1") {
+      @Override
+      public Object call() throws Exception {
+        GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
+
+        QueryService qs = null;
+        SelectResults sr = null;
+        // Execute query locally
+        try {
+          qs = getCache().getQueryService();
+        } catch (Exception e) {
+          Assert.fail("Failed to get QueryService.", e);
+        }
+
+        PositionPdx pos = new PositionPdx("IBM", 100);
+        PortfolioPdx.Day pDay = new PortfolioPdx(1).aDay;
+
+        for (int i = 0; i < queries.length; i++) {
+          try {
+            if (i == 0) {
+              sr = (SelectResults) qs.newQuery(queries[i]).execute(new 
Object[] {pos});
+            } else if (i == 1) {
+              sr = (SelectResults) qs.newQuery(queries[i]).execute(new 
Object[] {pDay});
+            } else {
+              sr = (SelectResults) qs.newQuery(queries[i]).execute();
+            }
+
+            assertTrue("Size of resultset should be greater than 0 for query: 
" + queries[i],
+                sr.size() > 0);
+            assertEquals("Expected and actual results do not match for query: 
" + queries[i],
+                results[i], sr.size());
+          } catch (Exception e) {
+            Assert.fail("Failed executing query " + queries[i], e);
+          }
+        }
+
+        int extra = 0;
+        if (cache.getLogger().fineEnabled()) {
+          extra = 20;
+        }
+        assertEquals(numberOfEntries * 6 + 1 + extra, 
PortfolioPdx.numInstance);
+
+        // set readserealized and query
+        ((GemFireCacheImpl) getCache()).setReadSerialized(true);
+
+        PdxInstanceFactory out = PdxInstanceFactoryImpl
+            .newCreator("org.apache.geode.cache.query.data.PositionPdx", 
false);
+        out.writeLong("avg20DaysVol", 0);
+        out.writeString("bondRating", "");
+        out.writeDouble("convRatio", 0);
+        out.writeString("country", "");
+        out.writeDouble("delta", 0);
+        out.writeLong("industry", 0);
+        out.writeLong("issuer", 0);
+        out.writeDouble("mktValue", pos.getMktValue());
+        out.writeDouble("qty", 0);
+        out.writeString("secId", pos.secId);
+        out.writeString("secIdIndexed", pos.secIdIndexed);
+        out.writeString("secLinks", "");
+        out.writeDouble("sharesOutstanding", pos.getSharesOutstanding());
+        out.writeString("underlyer", "");
+        out.writeLong("volatility", 0);
+        out.writeInt("pid", pos.getPid());
+        out.writeInt("portfolioId", 0);
+        // Identity Field.
+        out.markIdentityField("secId");
+        PdxInstance pi = out.create();
+
+        PdxInstanceEnum pdxEnum = new PdxInstanceEnum(pDay);
+
+        for (int i = 0; i < queries.length; i++) {
+          try {
+            if (i == 0) {
+              sr = (SelectResults) qs.newQuery(queries[i]).execute(new 
Object[] {pi});
+            } else if (i == 1) {
+              sr = (SelectResults) qs.newQuery(queries[i]).execute(new 
Object[] {pdxEnum});
+            } else {
+              sr = (SelectResults) qs.newQuery(queries[i]).execute();
+            }
+            assertTrue("Size of resultset should be greater than 0 for query: 
" + queries[i],
+                sr.size() > 0);
+            // For distinct queries with a mix of pdx and non pdx objects
+            // the hashcodes should be equal for comparison which are not
+            // in case of PortfolioPdx
+            if (queries[i].indexOf("distinct") == -1) {
+              if (i == 0 || i == 1) {
+                assertEquals("Expected and actual results do not match for 
query: " + queries[i], 1,
+                    sr.size());
+              } else {
+                assertEquals("Expected and actual results do not match for 
query: " + queries[i],
+                    results[i], sr.size());
+              }
+            }
+          } catch (Exception e) {
+            Assert.fail("Failed executing query " + queries[i], e);
+          }
+        }
+
+        // reset readserealized and query
+        ((GemFireCacheImpl) getCache()).setReadSerialized(false);
+        return null;
+      }
+    });
+
+    // query from client
+    client.invoke(new SerializableCallable("Create client") {
+      @Override
+      public Object call() throws Exception {
+        ClientCacheFactory cf = new ClientCacheFactory();
+        cf.addPoolServer(NetworkUtils.getServerHostName(server1.getHost()), 
port1);
+        ClientCache cache = getClientCache(cf);
+
+        QueryService qs = null;
+        SelectResults sr = null;
+        // Execute query remotely
+        try {
+          qs = cache.getQueryService();
+        } catch (Exception e) {
+          Assert.fail("Failed to get QueryService.", e);
+        }
+
+        PositionPdx pos = new PositionPdx("IBM", 100);
+        PortfolioPdx.Day pDay = new PortfolioPdx(1).aDay;
+
+        for (int i = 0; i < queries.length; i++) {
+          try {
+            if (i == 0) {
+              sr = (SelectResults) qs.newQuery(queries[i]).execute(new 
Object[] {pos});
+            } else if (i == 1) {
+              sr = (SelectResults) qs.newQuery(queries[i]).execute(new 
Object[] {pDay});
+            } else {
+              sr = (SelectResults) qs.newQuery(queries[i]).execute();
+            }
+            assertTrue("Size of resultset should be greater than 0 for query: 
" + queries[i],
+                sr.size() > 0);
+            assertEquals("Expected and actual results do not match for query: 
" + queries[i],
+                results[i], sr.size());
+            for (Object result : sr) {
+              if (result instanceof Struct) {
+                Object[] r = ((Struct) result).getFieldValues();
+                for (int j = 0; j < r.length; j++) {
+                  if (r[j] instanceof PdxInstance || r[j] instanceof 
PdxString) {
+                    fail("Result object should be a domain object and not an 
instance of "
+                        + r[j].getClass() + " for query: " + queries[i]);
+                  }
+                }
+              } else if (result instanceof PdxInstance || result instanceof 
PdxString) {
+                fail("Result object should be a domain object and not an 
instance of "
+                    + result.getClass() + " for query: " + queries[i]);
+              }
+            }
+          } catch (Exception e) {
+            Assert.fail("Failed executing query " + queries[i], e);
+          }
+        }
+
+        return null;
+      }
+    });
+
+    // query locally on server1
+    server1.invoke(new SerializableCallable("query locally on server1") {
+      @Override
+      public Object call() throws Exception {
+        GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
+
+        QueryService qs = null;
+        SelectResults[][] sr = new SelectResults[queries.length][2];
+        // Execute query locally
+        try {
+          qs = getCache().getQueryService();
+        } catch (Exception e) {
+          Assert.fail("Failed to get QueryService.", e);
+        }
+        int cnt = PositionPdx.cnt;
+        PositionPdx pos = new PositionPdx("IBM", 100);
+        PortfolioPdx.Day pDay = new PortfolioPdx(1).aDay;
+
+        for (int i = 0; i < queries.length; i++) {
+          try {
+            if (i == 0) {
+              sr[i][0] = (SelectResults) qs.newQuery(queries[i]).execute(new 
Object[] {pos});
+            } else if (i == 1) {
+              sr[i][0] = (SelectResults) qs.newQuery(queries[i]).execute(new 
Object[] {pDay});
+            } else {
+              sr[i][0] = (SelectResults) qs.newQuery(queries[i]).execute();
+            }
+
+            assertTrue("Size of resultset should be greater than 0 for query: 
" + queries[i],
+                sr[i][0].size() > 0);
+            assertEquals("Expected and actual results do not match for query: 
" + queries[i],
+                results[i], sr[i][0].size());
+            for (Object result : sr[i][0]) {
+              if (result instanceof Struct) {
+                Object[] r = ((Struct) result).getFieldValues();
+                for (int j = 0; j < r.length; j++) {
+                  if (r[j] instanceof PdxInstance || r[j] instanceof 
PdxString) {
+                    fail("Result object should be a domain object and not an 
instance of "
+                        + r[j].getClass() + " for query: " + queries[i]);
+                  }
+                }
+              } else if (result instanceof PdxInstance || result instanceof 
PdxString) {
+                fail("Result object should be a domain object and not an 
instance of "
+                    + result.getClass() + " for query: " + queries[i]);
+              }
+            }
+          } catch (Exception e) {
+            Assert.fail("Failed executing query " + queries[i], e);
+          }
+        }
+
+        // create index
+        qs.createIndex("statusIndex", "status", name);
+        qs.createIndex("IDIndex", "ID", name);
+        qs.createIndex("pIdIndex", "pos.getPid()", name + " p, 
p.positions.values pos");
+        qs.createIndex("secIdIndex", "pos.secId", name + " p, 
p.positions.values pos");
+
+        for (int i = 0; i < queries.length; i++) {
+          try {
+            if (i == 0) {
+              sr[i][1] = (SelectResults) qs.newQuery(queries[i]).execute(new 
Object[] {pos});
+            } else if (i == 1) {
+              sr[i][1] = (SelectResults) qs.newQuery(queries[i]).execute(new 
Object[] {pDay});
+            } else {
+              sr[i][1] = (SelectResults) qs.newQuery(queries[i]).execute();
+            }
+
+            assertTrue("Size of resultset should be greater than 0 for query: 
" + queries[i],
+                sr[i][1].size() > 0);
+            assertEquals("Expected and actual results do not match for query: 
" + queries[i],
+                results[i], sr[i][1].size());
+            for (Object result : sr[i][1]) {
+              if (result instanceof Struct) {
+                Object[] r = ((Struct) result).getFieldValues();
+                for (int j = 0; j < r.length; j++) {
+                  if (r[j] instanceof PdxInstance || r[j] instanceof 
PdxString) {
+                    fail("Result object should be a domain object and not an 
instance of "
+                        + r[j].getClass() + " for query: " + queries[i]);
+                  }
+                }
+              } else if (result instanceof PdxInstance || result instanceof 
PdxString) {
+                fail("Result object should be a domain object and not an 
instance of "
+                    + result.getClass() + " for query: " + queries[i]);
+              }
+            }
+          } catch (Exception e) {
+            Assert.fail("Failed executing query " + queries[i], e);
+          }
+        }
+
+        StructSetOrResultsSet ssOrrs = new StructSetOrResultsSet();
+        ssOrrs.CompareQueryResultsWithoutAndWithIndexes(sr, queries.length, 
queries);
+        return null;
+      }
+    });
+
+    this.closeClient(client);
+    this.closeClient(server1);
+
+  }
+
+  @Test
+  public void testLocalPdxQueriesOnPR() throws Exception {
+    final Host host = Host.getHost(0);
+    final VM server1 = host.getVM(0);
+    final VM server2 = host.getVM(1);
+    final VM client = host.getVM(2);
+
+    final int numberOfEntries = 10;
+    final String name = "/" + regionName;
+    final String[] queries = {"select * from " + name + " where position1 = 
$1",
+        "select * from " + name + " where aDay = $1",
+        "select distinct * from " + name + " p where p.status = 'inactive'", 
// numberOfEntries
+        "select distinct p.status from " + name + " p where p.status = 
'inactive'", // 1
+        "select p from " + name + " p where p.status = 'inactive'", // 
numberOfEntries
+        "select * from " + name + " p, p.positions.values v where v.secId = 
'IBM'", // 4
+        "select v from " + name + " p, p.positions.values v where v.secId = 
'IBM'", // 4
+        "select p.status from " + name + " p where p.status = 'inactive'", // 
numberOfEntries
+        "select distinct * from " + name + " p where p.status = 'inactive' 
order by p.ID", // numberOfEntries
+        "select * from " + name + " p where p.status = 'inactive' or p.ID > 
0", // 19
+        "select * from " + name + " p where p.status = 'inactive' and p.ID >= 
0", // numberOfEntries
+        "select * from " + name + " p where p.status in set ('inactive', 
'active')", // numberOfEntries*2
+        "select * from " + name + " p where p.ID > 0 and p.ID < 10", // 9
+        "select v from " + name + " p, p.positions.values v where p.status = 
'inactive'", // numberOfEntries*2
+        "select v.secId from " + name + " p, p.positions.values v where 
p.status = 'inactive'", // numberOfEntries*2
+        "select distinct p from " + name
+            + " p, p.positions.values v where p.status = 'inactive' and v.pid 
>= 0", // numberOfEntries
+        "select distinct p from " + name
+            + " p, p.positions.values v where p.status = 'inactive' or v.pid > 
0", // numberOfEntries*2
+        "select distinct * from " + name + " p, p.positions.values v where 
p.status = 'inactive'", // numberOfEntries*2
+        "select * from " + name + ".values v where v.status = 'inactive'", // 
numberOfEntries
+        "select v from " + name + " v where v in (select p from " + name + " p 
where p.ID > 0)", // 19
+        "select v from " + name + " v where v.status in (select distinct 
p.status from " + name
+            + " p where p.status = 'inactive')", // numberOfEntries
+        "select * from " + name + " v where v.status = ELEMENT (select 
distinct p.status from "
+            + name + " p where p.status = 'inactive')", // numberOfEntries
+    };
+
+    final int[] results = {2, 3, numberOfEntries, 1, numberOfEntries, 4, 4, 
numberOfEntries,
+        numberOfEntries, 19, numberOfEntries, numberOfEntries * 2, 9, 
numberOfEntries * 2,
+        numberOfEntries * 2, numberOfEntries, numberOfEntries * 2, 
numberOfEntries * 2,
+        numberOfEntries, 19, numberOfEntries, numberOfEntries};
+
+    // Start server1
+    final int port1 = (Integer) server1.invoke(new 
SerializableCallable("Create Server1") {
+      @Override
+      public Object call() throws Exception {
+        Region r1 = 
getCache().createRegionFactory(RegionShortcut.PARTITION).create(regionName);
+        for (int i = 0; i < numberOfEntries; i++) {
+          r1.put("key-" + i, new PortfolioPdx(i));
+        }
+        CacheServer server = getCache().addCacheServer();
+        int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+        server.setPort(port);
+        server.start();
+        return port;
+      }
+    });
+
+    // Start server2
+    final int port2 = (Integer) server2.invoke(new 
SerializableCallable("Create Server2") {
+      @Override
+      public Object call() throws Exception {
+        Region r1 = 
getCache().createRegionFactory(RegionShortcut.PARTITION).create(regionName);
+        CacheServer server = getCache().addCacheServer();
+        int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+        server.setPort(port);
+        server.start();
+        return port;
+      }
+    });
+
+    // client loads pdx objects on server
+    client.invoke(new SerializableCallable("Create client") {
+      @Override
+      public Object call() throws Exception {
+        ClientCacheFactory cf = new ClientCacheFactory();
+        cf.addPoolServer(NetworkUtils.getServerHostName(server1.getHost()), 
port1);
+        ClientCache cache = getClientCache(cf);
+        Region region =
+            
cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(regionName);
+
+        for (int i = numberOfEntries; i < numberOfEntries * 2; i++) {
+          region.put("key-" + i, new PortfolioPdx(i));
+        }
+
+        QueryService qs = null;
+        SelectResults sr = null;
+        // Execute query remotely
+        try {
+          qs = cache.getQueryService();
+        } catch (Exception e) {
+          Assert.fail("Failed to get QueryService.", e);
+        }
+        PositionPdx pos = new PositionPdx("IBM", 100);
+        PortfolioPdx.Day pDay = new PortfolioPdx(1).aDay;
+
+        for (int i = 0; i < queries.length; i++) {
+          try {
+            if (i == 0) {
+              sr = (SelectResults) qs.newQuery(queries[i]).execute(new 
Object[] {pos});
+            } else if (i == 1) {
+              sr = (SelectResults) qs.newQuery(queries[i]).execute(new 
Object[] {pDay});
+            } else {
+              sr = (SelectResults) qs.newQuery(queries[i]).execute();
+            }
+            assertTrue("Size of resultset should be greater than 0 for query: 
" + queries[i],
+                sr.size() > 0);
+            assertEquals("Expected and actual results do not match for query: 
" + queries[i],
+                results[i], sr.size());
+
+            for (Object result : sr) {
+              if (result instanceof Struct) {
+                Object[] r = ((Struct) result).getFieldValues();
+                for (int j = 0; j < r.length; j++) {
+                  if (r[j] instanceof PdxInstance || r[j] instanceof 
PdxString) {
+                    fail("Result object should be a domain object and not an 
instance of "
+                        + r[j].getClass() + " for query: " + queries[i]);
+                  }
+                }
+              } else if (result instanceof PdxInstance || result instanceof 
PdxString) {
+                fail("Result object should be a domain object and not an 
instance of "
+                    + result.getClass() + " for query: " + queries[i]);
+              }
+            }
+          } catch (Exception e) {
+            Assert.fail("Failed executing query " + queries[i], e);
+          }
+        }
+
+        return null;
+      }
+    });
+
+    // query locally on server1
+    server1.invoke(new SerializableCallable("query locally on server1") {
+      @Override
+      public Object call() throws Exception {
+        GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
+
+        QueryService qs = null;
+        SelectResults sr = null;
+        // Execute query locally
+        try {
+          qs = getCache().getQueryService();
+        } catch (Exception e) {
+          Assert.fail("Failed to get QueryService.", e);
+        }
+
+        PositionPdx pos = new PositionPdx("IBM", 100);
+        PortfolioPdx.Day pDay = new PortfolioPdx(1).aDay;
+
+        for (int i = 0; i < queries.length; i++) {
+          try {
+            if (i == 0) {
+              sr = (SelectResults) qs.newQuery(queries[i]).execute(new 
Object[] {pos});
+            } else if (i == 1) {
+              sr = (SelectResults) qs.newQuery(queries[i]).execute(new 
Object[] {pDay});
+            } else {
+              sr = (SelectResults) qs.newQuery(queries[i]).execute();
+            }
+            assertTrue("Size of resultset should be greater than 0 for query: 
" + queries[i],
+                sr.size() > 0);
+            assertEquals("Expected and actual results do not match for query: 
" + queries[i],
+                results[i], sr.size());
+
+            for (Object result : sr) {
+              if (result instanceof Struct) {
+                Object[] r = ((Struct) result).getFieldValues();
+                for (int j = 0; j < r.length; j++) {
+                  if (r[j] instanceof PdxInstance || r[j] instanceof 
PdxString) {
+                    fail("Result object should be a domain object and not an 
instance of "
+                        + r[j].getClass() + " for query: " + queries[i]);
+                  }
+                }
+              } else if (result instanceof PdxInstance || result instanceof 
PdxString) {
+                fail("Result object should be a domain object and not an 
instance of "
+                    + result.getClass() + " for query: " + queries[i]);
+              }
+            }
+          } catch (Exception e) {
+            Assert.fail("Failed executing query " + queries[i], e);
+          }
+        }
+
+        return null;
+      }
+    });
+
+    // query locally on server2
+    server2.invoke(new SerializableCallable("query locally on server2") {
+      @Override
+      public Object call() throws Exception {
+        GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
+
+        QueryService qs = null;
+        SelectResults[][] sr = new SelectResults[queries.length][2];
+        // Execute query locally
+        try {
+          qs = getCache().getQueryService();
+        } catch (Exception e) {
+          Assert.fail("Failed to get QueryService.", e);
+        }
+
+        PositionPdx pos = new PositionPdx("IBM", 100);
+        PortfolioPdx.Day pDay = new PortfolioPdx(1).aDay;
+
+        for (int i = 0; i < queries.length; i++) {
+          try {
+            if (i == 0) {
+              sr[i][0] = (SelectResults) qs.newQuery(queries[i]).execute(new 
Object[] {pos});
+            } else if (i == 1) {
+              sr[i][0] = (SelectResults) qs.newQuery(queries[i]).execute(new 
Object[] {pDay});
+            } else {
+              sr[i][0] = (SelectResults) qs.newQuery(queries[i]).execute();
+            }
+            assertTrue("Size of resultset should be greater than 0 for query: 
" + queries[i],
+                sr[i][0].size() > 0);
+            assertEquals("Expected and actual results do not match for query: 
" + queries[i],
+                results[i], sr[i][0].size());
+
+            for (Object result : sr[i][0]) {
+              if (result instanceof Struct) {
+                Object[] r = ((Struct) result).getFieldValues();
+                for (int j = 0; j < r.length; j++) {
+                  if (r[j] instanceof PdxInstance || r[j] instanceof 
PdxString) {
+                    fail("Result object should be a domain object and not an 
instance of "
+                        + r[j].getClass() + " for query: " + queries[i]);
+                  }
+                }
+              } else if (result instanceof PdxInstance || result instanceof 
PdxString) {
+                fail("Result object should be a domain object and not an 
instance of "
+                    + result.getClass() + " for query: " + queries[i]);
+              }
+            }
+          } catch (Exception e) {
+            Assert.fail("Failed executing query " + queries[i], e);
+          }
+        }
+
+        // create index
+        qs.createIndex("statusIndex", "p.status", name + " p");
+        qs.createIndex("IDIndex", "ID", name);
+        qs.createIndex("pIdIndex", "pos.getPid()", name + " p, 
p.positions.values pos");
+        qs.createIndex("secIdIndex", "pos.secId", name + " p, 
p.positions.values pos");
+
+        for (int i = 0; i < queries.length; i++) {
+          try {
+            if (i == 0) {
+              sr[i][1] = (SelectResults) qs.newQuery(queries[i]).execute(new 
Object[] {pos});
+            } else if (i == 1) {
+              sr[i][1] = (SelectResults) qs.newQuery(queries[i]).execute(new 
Object[] {pDay});
+            } else {
+              sr[i][1] = (SelectResults) qs.newQuery(queries[i]).execute();
+            }
+            assertTrue("Size of resultset should be greater than 0 for query: 
" + queries[i],
+                sr[i][1].size() > 0);
+            assertEquals("Expected and actual results do not match for query: 
" + queries[i],
+                results[i], sr[i][1].size());
+
+            for (Object result : sr[i][1]) {
+              if (result instanceof Struct) {
+                Object[] r = ((Struct) result).getFieldValues();
+                for (int j = 0; j < r.length; j++) {
+                  if (r[j] instanceof PdxInstance || r[j] instanceof 
PdxString) {
+                    fail("Result object should be a domain object and not an 
instance of "
+                        + r[j].getClass() + " for query: " + queries[i]);
+                  }
+                }
+              } else if (result instanceof PdxInstance || result instanceof 
PdxString) {
+                fail("Result object should be a domain object and not an 
instance of "
+                    + result.getClass() + " for query: " + queries[i]);
+              }
+            }
+          } catch (Exception e) {
+            Assert.fail("Failed executing query " + queries[i], e);
+          }
+        }
+
+        StructSetOrResultsSet ssOrrs = new StructSetOrResultsSet();
+        ssOrrs.CompareQueryResultsWithoutAndWithIndexes(sr, queries.length, 
queries);
+
+        return null;
+      }
+    });
+
+    this.closeClient(client);
+    this.closeClient(server1);
+    this.closeClient(server2);
+  }
+
+  /* Close Client */
+  public void closeClient(VM client) {
+    SerializableRunnable closeCache = new CacheSerializableRunnable("Close 
Client") {
+      public void run2() throws CacheException {
+        LogWriterUtils.getLogWriter().info("### Close Client. ###");
+        try {
+          closeCache();
+          disconnectFromDS();
+        } catch (Exception ex) {
+          LogWriterUtils.getLogWriter().info("### Failed to get close client. 
###");
+        }
+      }
+    };
+
+    client.invoke(closeCache);
+  }
+
+  @Override
+  protected final void preTearDownPDXQueryTestBase() throws Exception {
+    disconnectAllFromDS(); // tests all expect to create a new ds
+    // Reset the testObject numinstance for the next test.
+    TestObject.numInstance = 0;
+    PortfolioPdx.DEBUG = false;
+    // In all VM.
+    resetTestObjectInstanceCount();
+  }
+
+  @Override
+  public final void postSetUp() throws Exception {
+    resetTestObjectInstanceCount();
+  }
+
+  private void resetTestObjectInstanceCount() {
+    final Host host = Host.getHost(0);
+    for (int i = 0; i < 4; i++) {
+      VM vm = host.getVM(i);
+      vm.invoke(new CacheSerializableRunnable("Create Bridge Server") {
+        public void run2() throws CacheException {
+          TestObject.numInstance = 0;
+          PortfolioPdx.numInstance = 0;
+          PositionPdx.numInstance = 0;
+          PositionPdx.cnt = 0;
+          TestObject2.numInstance = 0;
+        }
+      });
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a64f7f60/geode-core/src/test/java/org/apache/geode/cache/query/dunit/PdxLocalQueryVersionedClassDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/cache/query/dunit/PdxLocalQueryVersionedClassDUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/PdxLocalQueryVersionedClassDUnitTest.java
new file mode 100644
index 0000000..f83e2ff
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/cache/query/dunit/PdxLocalQueryVersionedClassDUnitTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.query.dunit;
+
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.junit.Test;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.pdx.PdxInstance;
+import org.apache.geode.pdx.PdxInstanceFactory;
+import org.apache.geode.pdx.internal.PdxInstanceFactoryImpl;
+import org.apache.geode.test.dunit.Assert;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.NetworkUtils;
+import org.apache.geode.test.dunit.SerializableCallable;
+import org.apache.geode.test.dunit.ThreadUtils;
+import org.apache.geode.test.dunit.VM;
+import org.junit.experimental.categories.Category;
+
+@Category(DistributedTest.class)
+public class PdxLocalQueryVersionedClassDUnitTest extends PDXQueryTestBase {
+
+
+
+  /**
+   * Testing the isRemote flag which could be inconsistent when bind queries 
are being executed in
+   * multiple threads. Bug #49662 is caused because of this inconsistent 
behavior.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testIsRemoteFlagForRemoteQueries() throws Exception {
+    final Host host = Host.getHost(0);
+    final VM server = host.getVM(0);
+    final VM client = host.getVM(1);
+
+    final int numberOfEntries = 1000;
+    final String name = "/" + regionName;
+
+    final String query =
+        "select distinct * from " + name + " where id > $1 and id < $2 and 
status = 'active'";
+
+    // Start server
+    final int port1 = (Integer) server.invoke(new SerializableCallable("Create 
Server") {
+      @Override
+      public Object call() throws Exception {
+        Region r1 = 
getCache().createRegionFactory(RegionShortcut.REPLICATE).create(regionName);
+        CacheServer server = getCache().addCacheServer();
+        int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+        server.setPort(port);
+        server.start();
+        return port;
+      }
+    });
+
+    // Start client and put version1 objects on server
+    // Server does not have version1 classes in classpath
+    client.invoke(new SerializableCallable("Create client") {
+      @Override
+      public Object call() throws Exception {
+        ClientCacheFactory cf = new ClientCacheFactory();
+        cf.addPoolServer(NetworkUtils.getServerHostName(server.getHost()), 
port1);
+        ClientCache cache = getClientCache(cf);
+        Region region = 
cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY)
+            .create(regionName);
+
+        for (int i = 0; i < numberOfEntries; i++) {
+          PdxInstanceFactory pdxInstanceFactory =
+              PdxInstanceFactoryImpl.newCreator("PdxVersionedNewPortfolio", 
false);
+          pdxInstanceFactory.writeInt("id", i);
+          pdxInstanceFactory.writeString("status", (i % 2 == 0 ? "active" : 
"inactive"));
+          PdxInstance pdxInstance = pdxInstanceFactory.create();
+          region.put("key-" + i, pdxInstance);
+        }
+
+        return null;
+      }
+    });
+
+    // Execute same query remotely from client using 2 threads
+    // Since this is a bind query, the query object will be shared
+    // between the 2 threads.
+    AsyncInvocation a1 = client.invokeAsync(new SerializableCallable("Query 
from client") {
+      @Override
+      public Object call() throws Exception {
+        QueryService qs = null;
+        SelectResults sr = null;
+        // Execute query remotely
+        try {
+          qs = getCache().getQueryService();
+        } catch (Exception e) {
+          Assert.fail("Failed to get QueryService.", e);
+        }
+
+        try {
+          for (int i = 0; i < 100; i++) {
+            sr = (SelectResults) qs.newQuery(query).execute(new Object[] {1, 
1000});
+          }
+          Assert.assertTrue("Size of resultset should be greater than 0 for 
query: " + query,
+              sr.size() > 0);
+        } catch (Exception e) {
+          Assert.fail("Failed executing query " + query, e);
+        }
+
+        return null;
+      }
+    });
+
+    AsyncInvocation a2 = client.invokeAsync(new SerializableCallable("Query 
from client") {
+      @Override
+      public Object call() throws Exception {
+
+        QueryService qs = null;
+        SelectResults sr = null;
+        // Execute query remotely
+        try {
+          qs = getCache().getQueryService();
+        } catch (Exception e) {
+          Assert.fail("Failed to get QueryService.", e);
+        }
+
+        try {
+          for (int i = 0; i < 100; i++) {
+            sr = (SelectResults) qs.newQuery(query).execute(new Object[] {997, 
1000});
+          }
+          Assert.assertTrue("Size of resultset should be greater than 0 for 
query: " + query,
+              sr.size() > 0);
+        } catch (Exception e) {
+          Assert.fail("Failed executing query " + query, e);
+        }
+
+        return null;
+      }
+    });
+
+    ThreadUtils.join(a1, 60 * 1000);
+    ThreadUtils.join(a2, 60 * 1000);
+
+    if (a1.exceptionOccurred()) {
+      Assert.fail("Failed query execution " + a1.getException().getMessage());
+    }
+
+    if (a2.exceptionOccurred()) {
+      Assert.fail("Failed query execution " + a2.getException());
+    }
+
+    this.closeClient(client);
+    this.closeClient(server);
+
+  }
+
+}

Reply via email to