GEODE-37 change package name from io.pivotal.geode (for 
./geode-functions/src/main/java/io/pivotal)to org.apache.geode for(to 
./geode-functions/src/main/java/org/apache)


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/54cf6bf5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/54cf6bf5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/54cf6bf5

Branch: refs/heads/develop
Commit: 54cf6bf5d826814a4cd82b100a9f638896e48945
Parents: f9a022a
Author: Hitesh Khamesra <hkhame...@pivotal.io>
Authored: Tue Sep 20 15:44:10 2016 -0700
Committer: Hitesh Khamesra <hkhame...@pivotal.io>
Committed: Tue Sep 20 16:01:02 2016 -0700

----------------------------------------------------------------------
 .../connector/internal/RegionMetadata.java      |  93 --------
 .../internal/geodefunctions/QueryFunction.java  |  99 ---------
 .../geodefunctions/RetrieveRegionFunction.java  | 208 ------------------
 .../RetrieveRegionMetadataFunction.java         | 118 ----------
 .../StructStreamingResultSender.java            | 219 -------------------
 .../connector/internal/RegionMetadata.java      |  93 ++++++++
 .../internal/geodefunctions/QueryFunction.java  |  99 +++++++++
 .../geodefunctions/RetrieveRegionFunction.java  | 208 ++++++++++++++++++
 .../RetrieveRegionMetadataFunction.java         | 118 ++++++++++
 .../StructStreamingResultSender.java            | 219 +++++++++++++++++++
 10 files changed, 737 insertions(+), 737 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/54cf6bf5/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/RegionMetadata.java
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/RegionMetadata.java
 
b/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/RegionMetadata.java
deleted file mode 100644
index 4fee0e0..0000000
--- 
a/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/RegionMetadata.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal;
-
-import org.apache.geode.distributed.internal.ServerLocation;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.io.Serializable;
-
-/**
- * This class contains all info required by GemFire RDD partitioner to create 
partitions.
- */
-public class RegionMetadata implements Serializable {
-
-  private String  regionPath;
-  private boolean isPartitioned;
-  private int     totalBuckets;
-  private HashMap<ServerLocation, HashSet<Integer>> serverBucketMap;
-  private String  keyTypeName;
-  private String  valueTypeName;
-
-  /**
-   * Default constructor.
-   * @param regionPath the full path of the given region
-   * @param isPartitioned true for partitioned region, false otherwise
-   * @param totalBuckets number of total buckets for partitioned region, 
ignored otherwise
-   * @param serverBucketMap geode server (host:port pair) to bucket set map
-   * @param keyTypeName region key class name
-   * @param valueTypeName region value class name                    
-   */
-  public RegionMetadata(String regionPath, boolean isPartitioned, int 
totalBuckets, HashMap<ServerLocation, HashSet<Integer>> serverBucketMap,
-                        String keyTypeName, String valueTypeName) {
-    this.regionPath = regionPath;
-    this.isPartitioned = isPartitioned;
-    this.totalBuckets = totalBuckets;
-    this.serverBucketMap = serverBucketMap;
-    this.keyTypeName = keyTypeName;
-    this.valueTypeName = valueTypeName;
-  }
-
-  public RegionMetadata(String regionPath, boolean isPartitioned, int 
totalBuckets, HashMap<ServerLocation, HashSet<Integer>> serverBucketMap) {
-    this(regionPath, isPartitioned, totalBuckets, serverBucketMap, null, null);
-  }
-
-  public String getRegionPath() {
-    return regionPath;
-  }
-
-  public boolean isPartitioned() {
-    return isPartitioned;
-  }
-
-  public int getTotalBuckets() {
-    return totalBuckets;
-  }
-  
-  public HashMap<ServerLocation, HashSet<Integer>> getServerBucketMap() {
-    return serverBucketMap;
-  }
-
-  public String getKeyTypeName() {
-    return keyTypeName;
-  }
-
-  public String getValueTypeName() {
-    return valueTypeName;
-  }
-
-  public String toString() {
-    StringBuilder buf = new StringBuilder();
-    buf.append("RegionMetadata(region=").append(regionPath)
-       .append("(").append(keyTypeName).append(", 
").append(valueTypeName).append(")")
-       .append(", partitioned=").append(isPartitioned).append(", 
#buckets=").append(totalBuckets)
-       .append(", map=").append(serverBucketMap).append(")");
-    return buf.toString();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/54cf6bf5/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/QueryFunction.java
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/QueryFunction.java
 
b/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/QueryFunction.java
deleted file mode 100644
index 6e6e295..0000000
--- 
a/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/QueryFunction.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal.geodefunctions;
-
-import org.apache.geode.DataSerializer;
-import org.apache.geode.cache.CacheFactory;
-import org.apache.geode.cache.execute.*;
-import org.apache.geode.cache.query.SelectResults;
-import org.apache.geode.cache.query.Query;
-import org.apache.geode.internal.HeapDataOutputStream;
-import org.apache.geode.internal.cache.LocalRegion;
-import org.apache.geode.internal.cache.execute.InternalRegionFunctionContext;
-import org.apache.geode.internal.logging.LogService;
-import org.apache.logging.log4j.Logger;
-import java.util.Iterator;
-
-public class QueryFunction implements Function {
-
-  private static final long serialVersionUID = 4866641340803692882L;
-
-  public final static String ID = "geode-spark-query-function";
-
-  private final static QueryFunction instance = new QueryFunction();
-
-  private static final Logger logger = LogService.getLogger();
-
-  private static final int CHUNK_SIZE = 1024;
-
-  @Override
-  public String getId() {
-    return ID;
-  }
-
-  public static QueryFunction getInstance() {
-    return instance;
-  }
-
-  @Override
-  public boolean optimizeForWrite() {
-    return true;
-  }
-
-  @Override
-  public boolean isHA() {
-    return true;
-  }
-
-  @Override
-  public boolean hasResult() {
-    return true;
-  }
-
-  @Override
-  public void execute(FunctionContext context) {
-    try {
-      String[] args = (String[]) context.getArguments();
-      String queryString = args[0];
-      String bucketSet = args[1];
-      InternalRegionFunctionContext irfc = (InternalRegionFunctionContext) 
context;
-      LocalRegion localRegion = (LocalRegion) irfc.getDataSet();
-      boolean partitioned = localRegion.getDataPolicy().withPartitioning();
-      Query query = 
CacheFactory.getAnyInstance().getQueryService().newQuery(queryString);
-      Object result = partitioned ? 
query.execute((InternalRegionFunctionContext) context) : query.execute();
-      ResultSender<Object> sender = context.getResultSender();
-      HeapDataOutputStream buf = new HeapDataOutputStream(CHUNK_SIZE, null);
-      Iterator<Object> iter = ((SelectResults) result).asList().iterator();
-      while (iter.hasNext()) {
-        Object row = iter.next();
-        DataSerializer.writeObject(row, buf);
-        if (buf.size() > CHUNK_SIZE) {
-          sender.sendResult(buf.toByteArray());
-          logger.debug("OQL query=" + queryString + " bucket set=" + bucketSet 
+ " sendResult(), data size=" + buf.size());
-          buf.reset();
-        }
-      }
-      sender.lastResult(buf.toByteArray());
-      logger.debug("OQL query=" + queryString + " bucket set=" + bucketSet + " 
lastResult(), data size=" + buf.size());
-      buf.reset();
-    }
-    catch(Exception e) {
-      throw new FunctionException(e);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/54cf6bf5/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/RetrieveRegionFunction.java
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/RetrieveRegionFunction.java
 
b/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/RetrieveRegionFunction.java
deleted file mode 100644
index d3a2572..0000000
--- 
a/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/RetrieveRegionFunction.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal.geodefunctions;
-
-import java.util.Iterator;
-import org.apache.logging.log4j.Logger;
-
-import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.CacheFactory;
-import org.apache.geode.cache.execute.FunctionException;
-import org.apache.geode.cache.query.Query;
-import org.apache.geode.cache.query.QueryService;
-import org.apache.geode.cache.query.SelectResults;
-import org.apache.geode.cache.query.Struct;
-import org.apache.geode.internal.cache.*;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.execute.Function;
-import org.apache.geode.cache.execute.FunctionContext;
-import org.apache.geode.cache.partition.PartitionRegionHelper;
-import org.apache.geode.internal.cache.execute.InternalRegionFunctionContext;
-import org.apache.geode.internal.cache.execute.InternalResultSender;
-import org.apache.geode.internal.cache.partitioned.PREntriesIterator;
-import org.apache.geode.internal.logging.LogService;
-
-/**
- * GemFire function that is used by `SparkContext.geodeRegion(regionPath, 
whereClause)`
- * to retrieve region data set for the given bucket set as a RDD partition 
- **/
-public class RetrieveRegionFunction implements Function {
-
-  public final static String ID = "spark-geode-retrieve-region";
-  private static final Logger logger = LogService.getLogger();
-  private static final RetrieveRegionFunction instance = new 
RetrieveRegionFunction();
-
-  public RetrieveRegionFunction() {
-  }
-
-  /** ------------------------------------------ */
-  /**     interface Function implementation      */
-  /** ------------------------------------------ */
-
-  public static RetrieveRegionFunction getInstance() {
-    return instance;
-  }
-
-  @Override
-  public String getId() {
-    return ID;
-  }
-
-  @Override
-  public boolean hasResult() {
-    return true;
-  }
-
-  @Override
-  public boolean optimizeForWrite() {
-    return true;
-  }
-
-  @Override
-  public boolean isHA() {
-    return true;
-  }
-
-  @Override
-  public void execute(FunctionContext context) {
-    String[] args = (String[]) context.getArguments();
-    String where = args[0];
-    String taskDesc = args[1];
-    InternalRegionFunctionContext irfc = (InternalRegionFunctionContext) 
context;
-    LocalRegion localRegion = (LocalRegion) irfc.getDataSet();
-    boolean partitioned = localRegion.getDataPolicy().withPartitioning();
-    if (where.trim().isEmpty())
-      retrieveFullRegion(irfc, partitioned, taskDesc);
-    else
-      retrieveRegionWithWhereClause(irfc, localRegion, partitioned, where, 
taskDesc);
-  }
-
-  /** ------------------------------------------ */
-  /**    Retrieve region data with where clause  */
-  /** ------------------------------------------ */
-
-  private void retrieveRegionWithWhereClause(
-    InternalRegionFunctionContext context, LocalRegion localRegion, boolean 
partitioned, String where, String desc) {
-    String regionPath = localRegion.getFullPath();
-    String qstr = "select key, value from " + regionPath + ".entries where " + 
where;
-    logger.info(desc + ": " + qstr);
-
-    try {
-      Cache cache = CacheFactory.getAnyInstance();
-      QueryService queryService = cache.getQueryService();
-      Query query = queryService.newQuery(qstr);
-      SelectResults<Struct> results =
-        (SelectResults<Struct>) (partitioned ?  query.execute(context) : 
query.execute());
-
-      Iterator<Object[]> entries = 
getStructIteratorWrapper(results.asList().iterator());
-      InternalResultSender irs = (InternalResultSender) 
context.getResultSender();
-      StructStreamingResultSender sender = new 
StructStreamingResultSender(irs, null, entries, desc);
-      sender.send();
-    } catch (Exception e) {
-      throw new FunctionException(e);
-    }
-  }
-
-  private Iterator<Object[]> getStructIteratorWrapper(Iterator<Struct> 
entries) {
-    return new WrapperIterator<Struct, Iterator<Struct>>(entries) {
-      @Override public Object[] next() {
-        return  delegate.next().getFieldValues();
-      }
-    };
-  }
-
-  /** ------------------------------------------ */
-  /**         Retrieve full region data          */
-  /** ------------------------------------------ */
-
-  private void retrieveFullRegion(InternalRegionFunctionContext context, 
boolean partitioned, String desc) {
-    Iterator<Object[]> entries;
-    if (partitioned) {
-      PREntriesIterator<Region.Entry> iter = (PREntriesIterator<Region.Entry>)
-              ((LocalDataSet) 
PartitionRegionHelper.getLocalDataForContext(context)).entrySet().iterator();
-      // entries = getPREntryIterator(iter);
-      entries = getSimpleEntryIterator(iter);
-    } else {
-      LocalRegion owner = (LocalRegion) context.getDataSet();
-      Iterator<Region.Entry> iter = (Iterator<Region.Entry>) 
owner.entrySet().iterator();
-      // entries = getRREntryIterator(iter, owner);
-      entries = getSimpleEntryIterator(iter);
-    }
-    InternalResultSender irs = (InternalResultSender) 
context.getResultSender();
-    StructStreamingResultSender sender = new StructStreamingResultSender(irs, 
null, entries, desc);
-    sender.send();
-  }
-
-//  /** An iterator for partitioned region that uses internal API to get 
serialized value */
-//  private Iterator<Object[]> 
getPREntryIterator(PREntriesIterator<Region.Entry> iterator) {
-//    return new WrapperIterator<Region.Entry, 
PREntriesIterator<Region.Entry>>(iterator) {
-//      @Override public Object[] next() {
-//        Region.Entry entry = delegate.next();
-//        int bucketId = delegate.getBucketId();
-//        KeyInfo keyInfo = new KeyInfo(entry.getKey(), null, bucketId);
-//        // owner needs to be the bucket region not the enclosing partition 
region
-//        LocalRegion owner = ((PartitionedRegion) 
entry.getRegion()).getDataStore().getLocalBucketById(bucketId);
-//        Object value = owner.getDeserializedValue(keyInfo, false, true, 
true, null, false);
-//        return new Object[] {keyInfo.getKey(), value};
-//      }
-//    };
-//  }
-//
-//  /** An iterator for replicated region that uses internal API to get 
serialized value */
-//  private Iterator<Object[]> getRREntryIterator(Iterator<Region.Entry> 
iterator, LocalRegion region) {
-//    final LocalRegion owner = region;
-//    return new WrapperIterator<Region.Entry, 
Iterator<Region.Entry>>(iterator) {
-//      @Override public Object[] next() {
-//        Region.Entry entry =  delegate.next();
-//        KeyInfo keyInfo = new KeyInfo(entry.getKey(), null, null);
-//        Object value = owner.getDeserializedValue(keyInfo, false, true, 
true, null, false);
-//        return new Object[] {keyInfo.getKey(), value};
-//      }
-//    };
-//  }
-
-  // todo. compare performance of regular and simple iterator
-  /** An general iterator for both partitioned and replicated region that 
returns un-serialized value */
-  private Iterator<Object[]> getSimpleEntryIterator(Iterator<Region.Entry> 
iterator) {
-    return new WrapperIterator<Region.Entry, Iterator<Region.Entry>>(iterator) 
{
-      @Override public Object[] next() {
-        Region.Entry entry = delegate.next();
-        return new Object[] {entry.getKey(), entry.getValue()};
-      }
-    };
-  }
-
-  /** ------------------------------------------ */
-  /**        abstract wrapper iterator           */
-  /** ------------------------------------------ */
-
-  /** An abstract wrapper iterator to reduce duplicated code of anonymous 
iterators */
-  abstract class WrapperIterator<T, S extends Iterator<T>> implements 
Iterator<Object[]> {
-
-    final S delegate;
-
-    protected WrapperIterator(S delegate) {
-      this.delegate = delegate;
-    }
-
-    @Override public boolean hasNext() {
-      return delegate.hasNext();
-    }
-
-    @Override public void remove() { }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/54cf6bf5/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/RetrieveRegionMetadataFunction.java
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/RetrieveRegionMetadataFunction.java
 
b/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/RetrieveRegionMetadataFunction.java
deleted file mode 100644
index 6041b70..0000000
--- 
a/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/RetrieveRegionMetadataFunction.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal.geodefunctions;
-
-import org.apache.geode.cache.execute.Function;
-import org.apache.geode.cache.execute.FunctionContext;
-import org.apache.geode.cache.execute.ResultSender;
-import org.apache.geode.distributed.internal.ServerLocation;
-import org.apache.geode.internal.cache.BucketServerLocation66;
-import org.apache.geode.internal.cache.LocalRegion;
-import org.apache.geode.internal.cache.PartitionedRegion;
-import org.apache.geode.internal.cache.execute.InternalRegionFunctionContext;
-import io.pivotal.geode.spark.connector.internal.RegionMetadata;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This GemFire function retrieve region metadata
- */
-public class RetrieveRegionMetadataFunction implements Function {
-
-  public final static String ID = "geode-spark-retrieve-region-metadata";
-  
-  private static final RetrieveRegionMetadataFunction instance = new 
RetrieveRegionMetadataFunction();
-
-  public RetrieveRegionMetadataFunction() {
-  }
-
-  public static RetrieveRegionMetadataFunction getInstance() {
-    return instance;    
-  }
-  
-  @Override
-  public String getId() {
-    return ID;
-  }
-
-  @Override
-  public boolean optimizeForWrite() {
-    return false;
-  }
-
-  @Override
-  public boolean isHA() {
-    return true;
-  }
-
-  @Override
-  public boolean hasResult() {
-    return true;
-  }
-
-  @Override
-  public void execute(FunctionContext context) {
-    LocalRegion region = (LocalRegion) ((InternalRegionFunctionContext) 
context).getDataSet();
-    String regionPath = region.getFullPath();
-    boolean isPartitioned = region.getDataPolicy().withPartitioning();
-    String kTypeName = 
getTypeClassName(region.getAttributes().getKeyConstraint());
-    String vTypeName = 
getTypeClassName(region.getAttributes().getValueConstraint());
-
-    RegionMetadata metadata;
-    if (! isPartitioned) {
-      metadata = new RegionMetadata(regionPath, false, 0, null, kTypeName, 
vTypeName);
-    } else {
-      PartitionedRegion pregion = (PartitionedRegion) region;
-      int totalBuckets = 
pregion.getAttributes().getPartitionAttributes().getTotalNumBuckets();
-      Map<Integer, List<BucketServerLocation66>> bucketMap = 
pregion.getRegionAdvisor().getAllClientBucketProfiles();
-      HashMap<ServerLocation, HashSet<Integer>> serverMap = 
bucketServerMap2ServerBucketSetMap(bucketMap);
-      metadata = new RegionMetadata(regionPath, true, totalBuckets, serverMap, 
kTypeName, vTypeName);
-    }
-    
-    ResultSender<RegionMetadata> sender = context.getResultSender();
-    sender.lastResult(metadata);
-  }
-  
-  private String getTypeClassName(Class clazz) {
-    return clazz == null ? null : clazz.getCanonicalName();
-  }
-  
-  /** convert bucket to server map to server to bucket set map */
-  private HashMap<ServerLocation, HashSet<Integer>>
-    bucketServerMap2ServerBucketSetMap(Map<Integer, 
List<BucketServerLocation66>> map) {
-    HashMap<ServerLocation, HashSet<Integer>> serverBucketMap = new 
HashMap<>();
-    for (Integer id : map.keySet()) {
-      List<BucketServerLocation66> locations = map.get(id);
-      for (BucketServerLocation66 location : locations) {
-        ServerLocation server = new ServerLocation(location.getHostName(), 
location.getPort());
-        if (location.isPrimary()) {
-          HashSet<Integer> set = serverBucketMap.get(server);
-          if (set == null) {
-            set = new HashSet<>();
-            serverBucketMap.put(server, set);
-          }
-          set.add(id);
-          break;
-        }
-      }
-    }
-    return serverBucketMap;    
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/54cf6bf5/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/StructStreamingResultSender.java
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/StructStreamingResultSender.java
 
b/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/StructStreamingResultSender.java
deleted file mode 100644
index 9a7dc9d..0000000
--- 
a/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/StructStreamingResultSender.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.pivotal.geode.spark.connector.internal.geodefunctions;
-
-import org.apache.geode.DataSerializer;
-import org.apache.geode.cache.execute.ResultSender;
-import org.apache.geode.cache.query.internal.types.ObjectTypeImpl;
-import org.apache.geode.cache.query.internal.types.StructTypeImpl;
-import org.apache.geode.cache.query.types.ObjectType;
-import org.apache.geode.cache.query.types.StructType;
-import org.apache.geode.internal.HeapDataOutputStream;
-import org.apache.geode.internal.cache.CachedDeserializable;
-import org.apache.geode.internal.logging.LogService;
-import org.apache.logging.log4j.Logger;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-/**
- * StructStreamingResultSender and StructStreamingResultCollector  are paired
- * to transfer result of list of `org.apache.geode.cache.query.Struct`
- * from GemFire server to Spark Connector (the client of GemFire server)
- * in streaming, i.e., while sender sending the result, the collector can
- * start processing the arrived result without waiting for full result to
- * become available.
- */
-public class StructStreamingResultSender {
-
-  public static final byte TYPE_CHUNK   = 0x30;
-  public static final byte DATA_CHUNK   = 0x31;
-  public static final byte ERROR_CHUNK  = 0x32;
-  public static final byte SER_DATA     = 0x41;
-  public static final byte UNSER_DATA   = 0x42;
-  public static final byte BYTEARR_DATA = 0x43;
-
-  private static ObjectTypeImpl ObjField = new 
ObjectTypeImpl(java.lang.Object.class);
-  public static StructTypeImpl KeyValueType = new StructTypeImpl(new 
String[]{"key", "value"}, new ObjectType[]{ObjField, ObjField});
-
-  private static final Logger logger = LogService.getLogger();
-  private static final int CHUNK_SIZE = 4096;
-  
-  // Note: The type of ResultSender returned from GemFire FunctionContext is
-  //  always ResultSender<Object>, so can't use ResultSender<byte[]> here
-  private final ResultSender<Object> sender;
-  private final StructType structType;
-  private final Iterator<Object[]> rows;
-  private String desc;
-  private boolean closed = false;
-
-  /**
-   * the Constructor 
-   * @param sender the base ResultSender that send data in byte array
-   * @param type the StructType of result record
-   * @param rows the iterator of the collection of results
-   * @param desc description of this result (used for logging)           
-   */
-  public StructStreamingResultSender(
-    ResultSender<Object> sender, StructType type, Iterator<Object[]> rows, 
String desc) {
-    if (sender == null || rows == null)
-      throw new NullPointerException("sender=" + sender + ", rows=" + rows);
-    this.sender = sender;
-    this.structType = type;
-    this.rows = rows;
-    this.desc = desc;
-  }
-
-  /** the Constructor with default `desc` */
-  public StructStreamingResultSender(
-          ResultSender<Object> sender, StructType type, Iterator<Object[]> 
rows) {
-    this(sender, type, rows, "StructStreamingResultSender");
-  }
-  
-  /**
-   * Send the result in chunks. There are 3 types of chunk: TYPE, DATA, and 
ERROR.
-   * TYPE chunk for sending struct type info, DATA chunk for sending data, and
-   * ERROR chunk for sending exception. There are at most 1 TYPE chunk (omitted
-   * for `KeyValueType`) and 1 ERROR chunk (if there's error), but usually
-   * there are multiple DATA chunks. Each DATA chunk contains multiple rows
-   * of data. The chunk size is determined by the const `CHUNK_SIZE`. If an
-   * exception is thrown, it is serialized and sent as the last chunk  of the 
-   * result (in the form of ERROR chunk).
-   */
-  public void send() {
-    if (closed) throw new RuntimeException("sender is closed.");
-
-    HeapDataOutputStream buf = new HeapDataOutputStream(CHUNK_SIZE + 2048, 
null);
-    String dataType = null;
-    int typeSize = 0;
-    int rowCount = 0;
-    int dataSize = 0;            
-    try {
-      if (rows.hasNext()) {
-        // Note: only send type info if there's data with it
-        typeSize = sendType(buf);
-        buf.writeByte(DATA_CHUNK);
-        int rowSize = structType == null ? 2 : 
structType.getFieldNames().length;
-        while (rows.hasNext()) {
-          rowCount ++;
-          Object[] row = rows.next();          
-          if (rowCount < 2) dataType = entryDataType(row);
-          if (rowSize != row.length) 
-            throw new IOException(rowToString("Expect "  + rowSize + " 
columns, but got ", row));
-          serializeRowToBuffer(row, buf);
-          if (buf.size() > CHUNK_SIZE) {
-            dataSize += sendBufferredData(buf, false);
-            buf.writeByte(DATA_CHUNK);
-          }
-        }
-      }
-      // send last piece of data or empty byte array
-      dataSize += sendBufferredData(buf, true);
-      logger.info(desc + ": " + rowCount + " rows, type=" + dataType + ", 
type.size=" +
-                  typeSize + ", data.size=" + dataSize + ", row.avg.size=" +
-                  (rowCount == 0 ? "NaN" : String.format("%.1f", ((float) 
dataSize)/rowCount)));
-    } catch (IOException | RuntimeException e) {
-      sendException(buf, e);
-    } finally {
-      closed = true;
-    }
-  }
-
-  private String rowToString(String rowDesc, Object[] row) {
-    StringBuilder buf = new StringBuilder();
-    buf.append(rowDesc).append("(");
-    for (int i = 0; i < row.length; i++) buf.append(i ==0 ? "" : " 
,").append(row[i]);
-    return buf.append(")") .toString();    
-  }
-
-  private String entryDataType(Object[] row) {
-    StringBuilder buf = new StringBuilder();
-    buf.append("(");
-    for (int i = 0; i < row.length; i++) {
-      if (i != 0) buf.append(", ");
-      buf.append(row[i].getClass().getCanonicalName());
-    }
-    return buf.append(")").toString();
-  }
-  
-  private void serializeRowToBuffer(Object[] row, HeapDataOutputStream buf) 
throws IOException {
-    for (Object data : row) {
-      if (data instanceof CachedDeserializable) {
-        buf.writeByte(SER_DATA);
-        DataSerializer.writeByteArray(((CachedDeserializable) 
data).getSerializedValue(), buf);
-      } else if (data instanceof byte[]) {
-        buf.writeByte(BYTEARR_DATA);
-        DataSerializer.writeByteArray((byte[]) data, buf);
-      } else {
-        buf.writeByte(UNSER_DATA);
-        DataSerializer.writeObject(data, buf);
-      }
-    }
-  }
-  
-  /** return the size of type data */
-  private int sendType(HeapDataOutputStream buf) throws IOException {
-    // logger.info(desc + " struct type: " + structType);
-    if (structType != null) {
-      buf.writeByte(TYPE_CHUNK);
-      DataSerializer.writeObject(structType, buf);
-      return sendBufferredData(buf, false);      
-    } else {
-      return 0;  // default KeyValue type, no type info send
-    }
-  }
-  
-  private int sendBufferredData(HeapDataOutputStream buf, boolean isLast) 
throws IOException {
-    if (isLast) sender.lastResult(buf.toByteArray());
-    else sender.sendResult(buf.toByteArray());
-    // logData(buf.toByteArray(), desc);
-    int s = buf.size();
-    buf.reset();
-    return s;    
-  }
-
-  /** Send the exception as the last chunk of the result. */
-  private void sendException(HeapDataOutputStream buf, Exception e) {
-    // Note: if exception happens during the serialization, the `buf` may 
contain
-    // partial serialized data, which may cause de-serialization hang or error.
-    // Therefore, always empty the buffer before sending the exception
-    if (buf.size() > 0) buf.reset();
-    
-    try {
-      buf.writeByte(ERROR_CHUNK);
-      DataSerializer.writeObject(e, buf);
-    } catch (IOException ioe) {
-      logger.error("StructStreamingResultSender failed to send the result:", 
e);
-      logger.error("StructStreamingResultSender failed to serialize the 
exception:", ioe);
-      buf.reset();
-    }
-    // Note: send empty chunk as the last result if serialization of exception 
-    // failed, and the error is logged on the GemFire server side.
-    sender.lastResult(buf.toByteArray());
-    // logData(buf.toByteArray(), desc);
-  }
-
-//  private void logData(byte[] data, String desc) {
-//    StringBuilder buf = new StringBuilder();
-//    buf.append(desc);
-//    for (byte b : data) {
-//      buf.append(" ").append(b);
-//    }
-//    logger.info(buf.toString());
-//  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/54cf6bf5/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/RegionMetadata.java
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/RegionMetadata.java
 
b/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/RegionMetadata.java
new file mode 100644
index 0000000..4fee0e0
--- /dev/null
+++ 
b/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/RegionMetadata.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal;
+
+import org.apache.geode.distributed.internal.ServerLocation;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.io.Serializable;
+
+/**
+ * This class contains all info required by GemFire RDD partitioner to create 
partitions.
+ */
+public class RegionMetadata implements Serializable {
+
+  private String  regionPath;
+  private boolean isPartitioned;
+  private int     totalBuckets;
+  private HashMap<ServerLocation, HashSet<Integer>> serverBucketMap;
+  private String  keyTypeName;
+  private String  valueTypeName;
+
+  /**
+   * Default constructor.
+   * @param regionPath the full path of the given region
+   * @param isPartitioned true for partitioned region, false otherwise
+   * @param totalBuckets number of total buckets for partitioned region, 
ignored otherwise
+   * @param serverBucketMap geode server (host:port pair) to bucket set map
+   * @param keyTypeName region key class name
+   * @param valueTypeName region value class name                    
+   */
+  public RegionMetadata(String regionPath, boolean isPartitioned, int 
totalBuckets, HashMap<ServerLocation, HashSet<Integer>> serverBucketMap,
+                        String keyTypeName, String valueTypeName) {
+    this.regionPath = regionPath;
+    this.isPartitioned = isPartitioned;
+    this.totalBuckets = totalBuckets;
+    this.serverBucketMap = serverBucketMap;
+    this.keyTypeName = keyTypeName;
+    this.valueTypeName = valueTypeName;
+  }
+
+  public RegionMetadata(String regionPath, boolean isPartitioned, int 
totalBuckets, HashMap<ServerLocation, HashSet<Integer>> serverBucketMap) {
+    this(regionPath, isPartitioned, totalBuckets, serverBucketMap, null, null);
+  }
+
+  public String getRegionPath() {
+    return regionPath;
+  }
+
+  public boolean isPartitioned() {
+    return isPartitioned;
+  }
+
+  public int getTotalBuckets() {
+    return totalBuckets;
+  }
+  
+  public HashMap<ServerLocation, HashSet<Integer>> getServerBucketMap() {
+    return serverBucketMap;
+  }
+
+  public String getKeyTypeName() {
+    return keyTypeName;
+  }
+
+  public String getValueTypeName() {
+    return valueTypeName;
+  }
+
+  public String toString() {
+    StringBuilder buf = new StringBuilder();
+    buf.append("RegionMetadata(region=").append(regionPath)
+       .append("(").append(keyTypeName).append(", 
").append(valueTypeName).append(")")
+       .append(", partitioned=").append(isPartitioned).append(", 
#buckets=").append(totalBuckets)
+       .append(", map=").append(serverBucketMap).append(")");
+    return buf.toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/54cf6bf5/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/QueryFunction.java
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/QueryFunction.java
 
b/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/QueryFunction.java
new file mode 100644
index 0000000..6e6e295
--- /dev/null
+++ 
b/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/QueryFunction.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.geodefunctions;
+
+import org.apache.geode.DataSerializer;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.execute.*;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.cache.query.Query;
+import org.apache.geode.internal.HeapDataOutputStream;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.execute.InternalRegionFunctionContext;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.logging.log4j.Logger;
+import java.util.Iterator;
+
+public class QueryFunction implements Function {
+
+  private static final long serialVersionUID = 4866641340803692882L;
+
+  public final static String ID = "geode-spark-query-function";
+
+  private final static QueryFunction instance = new QueryFunction();
+
+  private static final Logger logger = LogService.getLogger();
+
+  private static final int CHUNK_SIZE = 1024;
+
+  @Override
+  public String getId() {
+    return ID;
+  }
+
+  public static QueryFunction getInstance() {
+    return instance;
+  }
+
+  @Override
+  public boolean optimizeForWrite() {
+    return true;
+  }
+
+  @Override
+  public boolean isHA() {
+    return true;
+  }
+
+  @Override
+  public boolean hasResult() {
+    return true;
+  }
+
+  @Override
+  public void execute(FunctionContext context) {
+    try {
+      String[] args = (String[]) context.getArguments();
+      String queryString = args[0];
+      String bucketSet = args[1];
+      InternalRegionFunctionContext irfc = (InternalRegionFunctionContext) 
context;
+      LocalRegion localRegion = (LocalRegion) irfc.getDataSet();
+      boolean partitioned = localRegion.getDataPolicy().withPartitioning();
+      Query query = 
CacheFactory.getAnyInstance().getQueryService().newQuery(queryString);
+      Object result = partitioned ? 
query.execute((InternalRegionFunctionContext) context) : query.execute();
+      ResultSender<Object> sender = context.getResultSender();
+      HeapDataOutputStream buf = new HeapDataOutputStream(CHUNK_SIZE, null);
+      Iterator<Object> iter = ((SelectResults) result).asList().iterator();
+      while (iter.hasNext()) {
+        Object row = iter.next();
+        DataSerializer.writeObject(row, buf);
+        if (buf.size() > CHUNK_SIZE) {
+          sender.sendResult(buf.toByteArray());
+          logger.debug("OQL query=" + queryString + " bucket set=" + bucketSet 
+ " sendResult(), data size=" + buf.size());
+          buf.reset();
+        }
+      }
+      sender.lastResult(buf.toByteArray());
+      logger.debug("OQL query=" + queryString + " bucket set=" + bucketSet + " 
lastResult(), data size=" + buf.size());
+      buf.reset();
+    }
+    catch(Exception e) {
+      throw new FunctionException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/54cf6bf5/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/RetrieveRegionFunction.java
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/RetrieveRegionFunction.java
 
b/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/RetrieveRegionFunction.java
new file mode 100644
index 0000000..d3a2572
--- /dev/null
+++ 
b/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/RetrieveRegionFunction.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.geodefunctions;
+
+import java.util.Iterator;
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.execute.FunctionException;
+import org.apache.geode.cache.query.Query;
+import org.apache.geode.cache.query.QueryService;
+import org.apache.geode.cache.query.SelectResults;
+import org.apache.geode.cache.query.Struct;
+import org.apache.geode.internal.cache.*;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.internal.cache.execute.InternalRegionFunctionContext;
+import org.apache.geode.internal.cache.execute.InternalResultSender;
+import org.apache.geode.internal.cache.partitioned.PREntriesIterator;
+import org.apache.geode.internal.logging.LogService;
+
+/**
+ * GemFire function that is used by `SparkContext.geodeRegion(regionPath, 
whereClause)`
+ * to retrieve region data set for the given bucket set as a RDD partition 
+ **/
+public class RetrieveRegionFunction implements Function {
+
+  public final static String ID = "spark-geode-retrieve-region";
+  private static final Logger logger = LogService.getLogger();
+  private static final RetrieveRegionFunction instance = new 
RetrieveRegionFunction();
+
+  public RetrieveRegionFunction() {
+  }
+
+  /** ------------------------------------------ */
+  /**     interface Function implementation      */
+  /** ------------------------------------------ */
+
+  public static RetrieveRegionFunction getInstance() {
+    return instance;
+  }
+
+  @Override
+  public String getId() {
+    return ID;
+  }
+
+  @Override
+  public boolean hasResult() {
+    return true;
+  }
+
+  @Override
+  public boolean optimizeForWrite() {
+    return true;
+  }
+
+  @Override
+  public boolean isHA() {
+    return true;
+  }
+
+  @Override
+  public void execute(FunctionContext context) {
+    String[] args = (String[]) context.getArguments();
+    String where = args[0];
+    String taskDesc = args[1];
+    InternalRegionFunctionContext irfc = (InternalRegionFunctionContext) 
context;
+    LocalRegion localRegion = (LocalRegion) irfc.getDataSet();
+    boolean partitioned = localRegion.getDataPolicy().withPartitioning();
+    if (where.trim().isEmpty())
+      retrieveFullRegion(irfc, partitioned, taskDesc);
+    else
+      retrieveRegionWithWhereClause(irfc, localRegion, partitioned, where, 
taskDesc);
+  }
+
+  /** ------------------------------------------ */
+  /**    Retrieve region data with where clause  */
+  /** ------------------------------------------ */
+
+  private void retrieveRegionWithWhereClause(
+    InternalRegionFunctionContext context, LocalRegion localRegion, boolean 
partitioned, String where, String desc) {
+    String regionPath = localRegion.getFullPath();
+    String qstr = "select key, value from " + regionPath + ".entries where " + 
where;
+    logger.info(desc + ": " + qstr);
+
+    try {
+      Cache cache = CacheFactory.getAnyInstance();
+      QueryService queryService = cache.getQueryService();
+      Query query = queryService.newQuery(qstr);
+      SelectResults<Struct> results =
+        (SelectResults<Struct>) (partitioned ?  query.execute(context) : 
query.execute());
+
+      Iterator<Object[]> entries = 
getStructIteratorWrapper(results.asList().iterator());
+      InternalResultSender irs = (InternalResultSender) 
context.getResultSender();
+      StructStreamingResultSender sender = new 
StructStreamingResultSender(irs, null, entries, desc);
+      sender.send();
+    } catch (Exception e) {
+      throw new FunctionException(e);
+    }
+  }
+
+  private Iterator<Object[]> getStructIteratorWrapper(Iterator<Struct> 
entries) {
+    return new WrapperIterator<Struct, Iterator<Struct>>(entries) {
+      @Override public Object[] next() {
+        return  delegate.next().getFieldValues();
+      }
+    };
+  }
+
+  /** ------------------------------------------ */
+  /**         Retrieve full region data          */
+  /** ------------------------------------------ */
+
+  private void retrieveFullRegion(InternalRegionFunctionContext context, 
boolean partitioned, String desc) {
+    Iterator<Object[]> entries;
+    if (partitioned) {
+      PREntriesIterator<Region.Entry> iter = (PREntriesIterator<Region.Entry>)
+              ((LocalDataSet) 
PartitionRegionHelper.getLocalDataForContext(context)).entrySet().iterator();
+      // entries = getPREntryIterator(iter);
+      entries = getSimpleEntryIterator(iter);
+    } else {
+      LocalRegion owner = (LocalRegion) context.getDataSet();
+      Iterator<Region.Entry> iter = (Iterator<Region.Entry>) 
owner.entrySet().iterator();
+      // entries = getRREntryIterator(iter, owner);
+      entries = getSimpleEntryIterator(iter);
+    }
+    InternalResultSender irs = (InternalResultSender) 
context.getResultSender();
+    StructStreamingResultSender sender = new StructStreamingResultSender(irs, 
null, entries, desc);
+    sender.send();
+  }
+
+//  /** An iterator for partitioned region that uses internal API to get 
serialized value */
+//  private Iterator<Object[]> 
getPREntryIterator(PREntriesIterator<Region.Entry> iterator) {
+//    return new WrapperIterator<Region.Entry, 
PREntriesIterator<Region.Entry>>(iterator) {
+//      @Override public Object[] next() {
+//        Region.Entry entry = delegate.next();
+//        int bucketId = delegate.getBucketId();
+//        KeyInfo keyInfo = new KeyInfo(entry.getKey(), null, bucketId);
+//        // owner needs to be the bucket region not the enclosing partition 
region
+//        LocalRegion owner = ((PartitionedRegion) 
entry.getRegion()).getDataStore().getLocalBucketById(bucketId);
+//        Object value = owner.getDeserializedValue(keyInfo, false, true, 
true, null, false);
+//        return new Object[] {keyInfo.getKey(), value};
+//      }
+//    };
+//  }
+//
+//  /** An iterator for replicated region that uses internal API to get 
serialized value */
+//  private Iterator<Object[]> getRREntryIterator(Iterator<Region.Entry> 
iterator, LocalRegion region) {
+//    final LocalRegion owner = region;
+//    return new WrapperIterator<Region.Entry, 
Iterator<Region.Entry>>(iterator) {
+//      @Override public Object[] next() {
+//        Region.Entry entry =  delegate.next();
+//        KeyInfo keyInfo = new KeyInfo(entry.getKey(), null, null);
+//        Object value = owner.getDeserializedValue(keyInfo, false, true, 
true, null, false);
+//        return new Object[] {keyInfo.getKey(), value};
+//      }
+//    };
+//  }
+
+  // todo. compare performance of regular and simple iterator
+  /** An general iterator for both partitioned and replicated region that 
returns un-serialized value */
+  private Iterator<Object[]> getSimpleEntryIterator(Iterator<Region.Entry> 
iterator) {
+    return new WrapperIterator<Region.Entry, Iterator<Region.Entry>>(iterator) 
{
+      @Override public Object[] next() {
+        Region.Entry entry = delegate.next();
+        return new Object[] {entry.getKey(), entry.getValue()};
+      }
+    };
+  }
+
+  /** ------------------------------------------ */
+  /**        abstract wrapper iterator           */
+  /** ------------------------------------------ */
+
+  /** An abstract wrapper iterator to reduce duplicated code of anonymous 
iterators */
+  abstract class WrapperIterator<T, S extends Iterator<T>> implements 
Iterator<Object[]> {
+
+    final S delegate;
+
+    protected WrapperIterator(S delegate) {
+      this.delegate = delegate;
+    }
+
+    @Override public boolean hasNext() {
+      return delegate.hasNext();
+    }
+
+    @Override public void remove() { }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/54cf6bf5/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/RetrieveRegionMetadataFunction.java
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/RetrieveRegionMetadataFunction.java
 
b/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/RetrieveRegionMetadataFunction.java
new file mode 100644
index 0000000..6041b70
--- /dev/null
+++ 
b/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/RetrieveRegionMetadataFunction.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.geodefunctions;
+
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.ResultSender;
+import org.apache.geode.distributed.internal.ServerLocation;
+import org.apache.geode.internal.cache.BucketServerLocation66;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.execute.InternalRegionFunctionContext;
+import io.pivotal.geode.spark.connector.internal.RegionMetadata;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This GemFire function retrieve region metadata
+ */
+public class RetrieveRegionMetadataFunction implements Function {
+
+  public final static String ID = "geode-spark-retrieve-region-metadata";
+  
+  private static final RetrieveRegionMetadataFunction instance = new 
RetrieveRegionMetadataFunction();
+
+  public RetrieveRegionMetadataFunction() {
+  }
+
+  public static RetrieveRegionMetadataFunction getInstance() {
+    return instance;    
+  }
+  
+  @Override
+  public String getId() {
+    return ID;
+  }
+
+  @Override
+  public boolean optimizeForWrite() {
+    return false;
+  }
+
+  @Override
+  public boolean isHA() {
+    return true;
+  }
+
+  @Override
+  public boolean hasResult() {
+    return true;
+  }
+
+  @Override
+  public void execute(FunctionContext context) {
+    LocalRegion region = (LocalRegion) ((InternalRegionFunctionContext) 
context).getDataSet();
+    String regionPath = region.getFullPath();
+    boolean isPartitioned = region.getDataPolicy().withPartitioning();
+    String kTypeName = 
getTypeClassName(region.getAttributes().getKeyConstraint());
+    String vTypeName = 
getTypeClassName(region.getAttributes().getValueConstraint());
+
+    RegionMetadata metadata;
+    if (! isPartitioned) {
+      metadata = new RegionMetadata(regionPath, false, 0, null, kTypeName, 
vTypeName);
+    } else {
+      PartitionedRegion pregion = (PartitionedRegion) region;
+      int totalBuckets = 
pregion.getAttributes().getPartitionAttributes().getTotalNumBuckets();
+      Map<Integer, List<BucketServerLocation66>> bucketMap = 
pregion.getRegionAdvisor().getAllClientBucketProfiles();
+      HashMap<ServerLocation, HashSet<Integer>> serverMap = 
bucketServerMap2ServerBucketSetMap(bucketMap);
+      metadata = new RegionMetadata(regionPath, true, totalBuckets, serverMap, 
kTypeName, vTypeName);
+    }
+    
+    ResultSender<RegionMetadata> sender = context.getResultSender();
+    sender.lastResult(metadata);
+  }
+  
+  private String getTypeClassName(Class clazz) {
+    return clazz == null ? null : clazz.getCanonicalName();
+  }
+  
+  /** convert bucket to server map to server to bucket set map */
+  private HashMap<ServerLocation, HashSet<Integer>>
+    bucketServerMap2ServerBucketSetMap(Map<Integer, 
List<BucketServerLocation66>> map) {
+    HashMap<ServerLocation, HashSet<Integer>> serverBucketMap = new 
HashMap<>();
+    for (Integer id : map.keySet()) {
+      List<BucketServerLocation66> locations = map.get(id);
+      for (BucketServerLocation66 location : locations) {
+        ServerLocation server = new ServerLocation(location.getHostName(), 
location.getPort());
+        if (location.isPrimary()) {
+          HashSet<Integer> set = serverBucketMap.get(server);
+          if (set == null) {
+            set = new HashSet<>();
+            serverBucketMap.put(server, set);
+          }
+          set.add(id);
+          break;
+        }
+      }
+    }
+    return serverBucketMap;    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/54cf6bf5/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/StructStreamingResultSender.java
----------------------------------------------------------------------
diff --git 
a/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/StructStreamingResultSender.java
 
b/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/StructStreamingResultSender.java
new file mode 100644
index 0000000..9a7dc9d
--- /dev/null
+++ 
b/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/StructStreamingResultSender.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.pivotal.geode.spark.connector.internal.geodefunctions;
+
+import org.apache.geode.DataSerializer;
+import org.apache.geode.cache.execute.ResultSender;
+import org.apache.geode.cache.query.internal.types.ObjectTypeImpl;
+import org.apache.geode.cache.query.internal.types.StructTypeImpl;
+import org.apache.geode.cache.query.types.ObjectType;
+import org.apache.geode.cache.query.types.StructType;
+import org.apache.geode.internal.HeapDataOutputStream;
+import org.apache.geode.internal.cache.CachedDeserializable;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * StructStreamingResultSender and StructStreamingResultCollector  are paired
+ * to transfer result of list of `org.apache.geode.cache.query.Struct`
+ * from GemFire server to Spark Connector (the client of GemFire server)
+ * in streaming, i.e., while sender sending the result, the collector can
+ * start processing the arrived result without waiting for full result to
+ * become available.
+ */
+public class StructStreamingResultSender {
+
+  public static final byte TYPE_CHUNK   = 0x30;
+  public static final byte DATA_CHUNK   = 0x31;
+  public static final byte ERROR_CHUNK  = 0x32;
+  public static final byte SER_DATA     = 0x41;
+  public static final byte UNSER_DATA   = 0x42;
+  public static final byte BYTEARR_DATA = 0x43;
+
+  private static ObjectTypeImpl ObjField = new 
ObjectTypeImpl(java.lang.Object.class);
+  public static StructTypeImpl KeyValueType = new StructTypeImpl(new 
String[]{"key", "value"}, new ObjectType[]{ObjField, ObjField});
+
+  private static final Logger logger = LogService.getLogger();
+  private static final int CHUNK_SIZE = 4096;
+  
+  // Note: The type of ResultSender returned from GemFire FunctionContext is
+  //  always ResultSender<Object>, so can't use ResultSender<byte[]> here
+  private final ResultSender<Object> sender;
+  private final StructType structType;
+  private final Iterator<Object[]> rows;
+  private String desc;
+  private boolean closed = false;
+
+  /**
+   * the Constructor 
+   * @param sender the base ResultSender that send data in byte array
+   * @param type the StructType of result record
+   * @param rows the iterator of the collection of results
+   * @param desc description of this result (used for logging)           
+   */
+  public StructStreamingResultSender(
+    ResultSender<Object> sender, StructType type, Iterator<Object[]> rows, 
String desc) {
+    if (sender == null || rows == null)
+      throw new NullPointerException("sender=" + sender + ", rows=" + rows);
+    this.sender = sender;
+    this.structType = type;
+    this.rows = rows;
+    this.desc = desc;
+  }
+
+  /** the Constructor with default `desc` */
+  public StructStreamingResultSender(
+          ResultSender<Object> sender, StructType type, Iterator<Object[]> 
rows) {
+    this(sender, type, rows, "StructStreamingResultSender");
+  }
+  
+  /**
+   * Send the result in chunks. There are 3 types of chunk: TYPE, DATA, and 
ERROR.
+   * TYPE chunk for sending struct type info, DATA chunk for sending data, and
+   * ERROR chunk for sending exception. There are at most 1 TYPE chunk (omitted
+   * for `KeyValueType`) and 1 ERROR chunk (if there's error), but usually
+   * there are multiple DATA chunks. Each DATA chunk contains multiple rows
+   * of data. The chunk size is determined by the const `CHUNK_SIZE`. If an
+   * exception is thrown, it is serialized and sent as the last chunk  of the 
+   * result (in the form of ERROR chunk).
+   */
+  public void send() {
+    if (closed) throw new RuntimeException("sender is closed.");
+
+    HeapDataOutputStream buf = new HeapDataOutputStream(CHUNK_SIZE + 2048, 
null);
+    String dataType = null;
+    int typeSize = 0;
+    int rowCount = 0;
+    int dataSize = 0;            
+    try {
+      if (rows.hasNext()) {
+        // Note: only send type info if there's data with it
+        typeSize = sendType(buf);
+        buf.writeByte(DATA_CHUNK);
+        int rowSize = structType == null ? 2 : 
structType.getFieldNames().length;
+        while (rows.hasNext()) {
+          rowCount ++;
+          Object[] row = rows.next();          
+          if (rowCount < 2) dataType = entryDataType(row);
+          if (rowSize != row.length) 
+            throw new IOException(rowToString("Expect "  + rowSize + " 
columns, but got ", row));
+          serializeRowToBuffer(row, buf);
+          if (buf.size() > CHUNK_SIZE) {
+            dataSize += sendBufferredData(buf, false);
+            buf.writeByte(DATA_CHUNK);
+          }
+        }
+      }
+      // send last piece of data or empty byte array
+      dataSize += sendBufferredData(buf, true);
+      logger.info(desc + ": " + rowCount + " rows, type=" + dataType + ", 
type.size=" +
+                  typeSize + ", data.size=" + dataSize + ", row.avg.size=" +
+                  (rowCount == 0 ? "NaN" : String.format("%.1f", ((float) 
dataSize)/rowCount)));
+    } catch (IOException | RuntimeException e) {
+      sendException(buf, e);
+    } finally {
+      closed = true;
+    }
+  }
+
+  private String rowToString(String rowDesc, Object[] row) {
+    StringBuilder buf = new StringBuilder();
+    buf.append(rowDesc).append("(");
+    for (int i = 0; i < row.length; i++) buf.append(i ==0 ? "" : " 
,").append(row[i]);
+    return buf.append(")") .toString();    
+  }
+
+  private String entryDataType(Object[] row) {
+    StringBuilder buf = new StringBuilder();
+    buf.append("(");
+    for (int i = 0; i < row.length; i++) {
+      if (i != 0) buf.append(", ");
+      buf.append(row[i].getClass().getCanonicalName());
+    }
+    return buf.append(")").toString();
+  }
+  
+  private void serializeRowToBuffer(Object[] row, HeapDataOutputStream buf) 
throws IOException {
+    for (Object data : row) {
+      if (data instanceof CachedDeserializable) {
+        buf.writeByte(SER_DATA);
+        DataSerializer.writeByteArray(((CachedDeserializable) 
data).getSerializedValue(), buf);
+      } else if (data instanceof byte[]) {
+        buf.writeByte(BYTEARR_DATA);
+        DataSerializer.writeByteArray((byte[]) data, buf);
+      } else {
+        buf.writeByte(UNSER_DATA);
+        DataSerializer.writeObject(data, buf);
+      }
+    }
+  }
+  
+  /** return the size of type data */
+  private int sendType(HeapDataOutputStream buf) throws IOException {
+    // logger.info(desc + " struct type: " + structType);
+    if (structType != null) {
+      buf.writeByte(TYPE_CHUNK);
+      DataSerializer.writeObject(structType, buf);
+      return sendBufferredData(buf, false);      
+    } else {
+      return 0;  // default KeyValue type, no type info send
+    }
+  }
+  
+  private int sendBufferredData(HeapDataOutputStream buf, boolean isLast) 
throws IOException {
+    if (isLast) sender.lastResult(buf.toByteArray());
+    else sender.sendResult(buf.toByteArray());
+    // logData(buf.toByteArray(), desc);
+    int s = buf.size();
+    buf.reset();
+    return s;    
+  }
+
+  /** Send the exception as the last chunk of the result. */
+  private void sendException(HeapDataOutputStream buf, Exception e) {
+    // Note: if exception happens during the serialization, the `buf` may 
contain
+    // partial serialized data, which may cause de-serialization hang or error.
+    // Therefore, always empty the buffer before sending the exception
+    if (buf.size() > 0) buf.reset();
+    
+    try {
+      buf.writeByte(ERROR_CHUNK);
+      DataSerializer.writeObject(e, buf);
+    } catch (IOException ioe) {
+      logger.error("StructStreamingResultSender failed to send the result:", 
e);
+      logger.error("StructStreamingResultSender failed to serialize the 
exception:", ioe);
+      buf.reset();
+    }
+    // Note: send empty chunk as the last result if serialization of exception 
+    // failed, and the error is logged on the GemFire server side.
+    sender.lastResult(buf.toByteArray());
+    // logData(buf.toByteArray(), desc);
+  }
+
+//  private void logData(byte[] data, String desc) {
+//    StringBuilder buf = new StringBuilder();
+//    buf.append(desc);
+//    for (byte b : data) {
+//      buf.append(" ").append(b);
+//    }
+//    logger.info(buf.toString());
+//  }
+
+}

Reply via email to