GEODE-37 change package name from io.pivotal.geode (for ./geode-functions/src/main/java/io/pivotal)to org.apache.geode for(to ./geode-functions/src/main/java/org/apache)
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/54cf6bf5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/54cf6bf5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/54cf6bf5 Branch: refs/heads/develop Commit: 54cf6bf5d826814a4cd82b100a9f638896e48945 Parents: f9a022a Author: Hitesh Khamesra <hkhame...@pivotal.io> Authored: Tue Sep 20 15:44:10 2016 -0700 Committer: Hitesh Khamesra <hkhame...@pivotal.io> Committed: Tue Sep 20 16:01:02 2016 -0700 ---------------------------------------------------------------------- .../connector/internal/RegionMetadata.java | 93 -------- .../internal/geodefunctions/QueryFunction.java | 99 --------- .../geodefunctions/RetrieveRegionFunction.java | 208 ------------------ .../RetrieveRegionMetadataFunction.java | 118 ---------- .../StructStreamingResultSender.java | 219 ------------------- .../connector/internal/RegionMetadata.java | 93 ++++++++ .../internal/geodefunctions/QueryFunction.java | 99 +++++++++ .../geodefunctions/RetrieveRegionFunction.java | 208 ++++++++++++++++++ .../RetrieveRegionMetadataFunction.java | 118 ++++++++++ .../StructStreamingResultSender.java | 219 +++++++++++++++++++ 10 files changed, 737 insertions(+), 737 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/54cf6bf5/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/RegionMetadata.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/RegionMetadata.java b/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/RegionMetadata.java deleted file mode 100644 index 4fee0e0..0000000 --- a/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/RegionMetadata.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.internal; - -import org.apache.geode.distributed.internal.ServerLocation; - -import java.util.HashMap; -import java.util.HashSet; -import java.io.Serializable; - -/** - * This class contains all info required by GemFire RDD partitioner to create partitions. - */ -public class RegionMetadata implements Serializable { - - private String regionPath; - private boolean isPartitioned; - private int totalBuckets; - private HashMap<ServerLocation, HashSet<Integer>> serverBucketMap; - private String keyTypeName; - private String valueTypeName; - - /** - * Default constructor. - * @param regionPath the full path of the given region - * @param isPartitioned true for partitioned region, false otherwise - * @param totalBuckets number of total buckets for partitioned region, ignored otherwise - * @param serverBucketMap geode server (host:port pair) to bucket set map - * @param keyTypeName region key class name - * @param valueTypeName region value class name - */ - public RegionMetadata(String regionPath, boolean isPartitioned, int totalBuckets, HashMap<ServerLocation, HashSet<Integer>> serverBucketMap, - String keyTypeName, String valueTypeName) { - this.regionPath = regionPath; - this.isPartitioned = isPartitioned; - this.totalBuckets = totalBuckets; - this.serverBucketMap = serverBucketMap; - this.keyTypeName = keyTypeName; - this.valueTypeName = valueTypeName; - } - - public RegionMetadata(String regionPath, boolean isPartitioned, int totalBuckets, HashMap<ServerLocation, HashSet<Integer>> serverBucketMap) { - this(regionPath, isPartitioned, totalBuckets, serverBucketMap, null, null); - } - - public String getRegionPath() { - return regionPath; - } - - public boolean isPartitioned() { - return isPartitioned; - } - - public int getTotalBuckets() { - return totalBuckets; - } - - public HashMap<ServerLocation, HashSet<Integer>> getServerBucketMap() { - return serverBucketMap; - } - - public String getKeyTypeName() { - return keyTypeName; - } - - public String getValueTypeName() { - return valueTypeName; - } - - public String toString() { - StringBuilder buf = new StringBuilder(); - buf.append("RegionMetadata(region=").append(regionPath) - .append("(").append(keyTypeName).append(", ").append(valueTypeName).append(")") - .append(", partitioned=").append(isPartitioned).append(", #buckets=").append(totalBuckets) - .append(", map=").append(serverBucketMap).append(")"); - return buf.toString(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/54cf6bf5/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/QueryFunction.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/QueryFunction.java b/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/QueryFunction.java deleted file mode 100644 index 6e6e295..0000000 --- a/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/QueryFunction.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.internal.geodefunctions; - -import org.apache.geode.DataSerializer; -import org.apache.geode.cache.CacheFactory; -import org.apache.geode.cache.execute.*; -import org.apache.geode.cache.query.SelectResults; -import org.apache.geode.cache.query.Query; -import org.apache.geode.internal.HeapDataOutputStream; -import org.apache.geode.internal.cache.LocalRegion; -import org.apache.geode.internal.cache.execute.InternalRegionFunctionContext; -import org.apache.geode.internal.logging.LogService; -import org.apache.logging.log4j.Logger; -import java.util.Iterator; - -public class QueryFunction implements Function { - - private static final long serialVersionUID = 4866641340803692882L; - - public final static String ID = "geode-spark-query-function"; - - private final static QueryFunction instance = new QueryFunction(); - - private static final Logger logger = LogService.getLogger(); - - private static final int CHUNK_SIZE = 1024; - - @Override - public String getId() { - return ID; - } - - public static QueryFunction getInstance() { - return instance; - } - - @Override - public boolean optimizeForWrite() { - return true; - } - - @Override - public boolean isHA() { - return true; - } - - @Override - public boolean hasResult() { - return true; - } - - @Override - public void execute(FunctionContext context) { - try { - String[] args = (String[]) context.getArguments(); - String queryString = args[0]; - String bucketSet = args[1]; - InternalRegionFunctionContext irfc = (InternalRegionFunctionContext) context; - LocalRegion localRegion = (LocalRegion) irfc.getDataSet(); - boolean partitioned = localRegion.getDataPolicy().withPartitioning(); - Query query = CacheFactory.getAnyInstance().getQueryService().newQuery(queryString); - Object result = partitioned ? query.execute((InternalRegionFunctionContext) context) : query.execute(); - ResultSender<Object> sender = context.getResultSender(); - HeapDataOutputStream buf = new HeapDataOutputStream(CHUNK_SIZE, null); - Iterator<Object> iter = ((SelectResults) result).asList().iterator(); - while (iter.hasNext()) { - Object row = iter.next(); - DataSerializer.writeObject(row, buf); - if (buf.size() > CHUNK_SIZE) { - sender.sendResult(buf.toByteArray()); - logger.debug("OQL query=" + queryString + " bucket set=" + bucketSet + " sendResult(), data size=" + buf.size()); - buf.reset(); - } - } - sender.lastResult(buf.toByteArray()); - logger.debug("OQL query=" + queryString + " bucket set=" + bucketSet + " lastResult(), data size=" + buf.size()); - buf.reset(); - } - catch(Exception e) { - throw new FunctionException(e); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/54cf6bf5/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/RetrieveRegionFunction.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/RetrieveRegionFunction.java b/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/RetrieveRegionFunction.java deleted file mode 100644 index d3a2572..0000000 --- a/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/RetrieveRegionFunction.java +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.internal.geodefunctions; - -import java.util.Iterator; -import org.apache.logging.log4j.Logger; - -import org.apache.geode.cache.Cache; -import org.apache.geode.cache.CacheFactory; -import org.apache.geode.cache.execute.FunctionException; -import org.apache.geode.cache.query.Query; -import org.apache.geode.cache.query.QueryService; -import org.apache.geode.cache.query.SelectResults; -import org.apache.geode.cache.query.Struct; -import org.apache.geode.internal.cache.*; -import org.apache.geode.cache.Region; -import org.apache.geode.cache.execute.Function; -import org.apache.geode.cache.execute.FunctionContext; -import org.apache.geode.cache.partition.PartitionRegionHelper; -import org.apache.geode.internal.cache.execute.InternalRegionFunctionContext; -import org.apache.geode.internal.cache.execute.InternalResultSender; -import org.apache.geode.internal.cache.partitioned.PREntriesIterator; -import org.apache.geode.internal.logging.LogService; - -/** - * GemFire function that is used by `SparkContext.geodeRegion(regionPath, whereClause)` - * to retrieve region data set for the given bucket set as a RDD partition - **/ -public class RetrieveRegionFunction implements Function { - - public final static String ID = "spark-geode-retrieve-region"; - private static final Logger logger = LogService.getLogger(); - private static final RetrieveRegionFunction instance = new RetrieveRegionFunction(); - - public RetrieveRegionFunction() { - } - - /** ------------------------------------------ */ - /** interface Function implementation */ - /** ------------------------------------------ */ - - public static RetrieveRegionFunction getInstance() { - return instance; - } - - @Override - public String getId() { - return ID; - } - - @Override - public boolean hasResult() { - return true; - } - - @Override - public boolean optimizeForWrite() { - return true; - } - - @Override - public boolean isHA() { - return true; - } - - @Override - public void execute(FunctionContext context) { - String[] args = (String[]) context.getArguments(); - String where = args[0]; - String taskDesc = args[1]; - InternalRegionFunctionContext irfc = (InternalRegionFunctionContext) context; - LocalRegion localRegion = (LocalRegion) irfc.getDataSet(); - boolean partitioned = localRegion.getDataPolicy().withPartitioning(); - if (where.trim().isEmpty()) - retrieveFullRegion(irfc, partitioned, taskDesc); - else - retrieveRegionWithWhereClause(irfc, localRegion, partitioned, where, taskDesc); - } - - /** ------------------------------------------ */ - /** Retrieve region data with where clause */ - /** ------------------------------------------ */ - - private void retrieveRegionWithWhereClause( - InternalRegionFunctionContext context, LocalRegion localRegion, boolean partitioned, String where, String desc) { - String regionPath = localRegion.getFullPath(); - String qstr = "select key, value from " + regionPath + ".entries where " + where; - logger.info(desc + ": " + qstr); - - try { - Cache cache = CacheFactory.getAnyInstance(); - QueryService queryService = cache.getQueryService(); - Query query = queryService.newQuery(qstr); - SelectResults<Struct> results = - (SelectResults<Struct>) (partitioned ? query.execute(context) : query.execute()); - - Iterator<Object[]> entries = getStructIteratorWrapper(results.asList().iterator()); - InternalResultSender irs = (InternalResultSender) context.getResultSender(); - StructStreamingResultSender sender = new StructStreamingResultSender(irs, null, entries, desc); - sender.send(); - } catch (Exception e) { - throw new FunctionException(e); - } - } - - private Iterator<Object[]> getStructIteratorWrapper(Iterator<Struct> entries) { - return new WrapperIterator<Struct, Iterator<Struct>>(entries) { - @Override public Object[] next() { - return delegate.next().getFieldValues(); - } - }; - } - - /** ------------------------------------------ */ - /** Retrieve full region data */ - /** ------------------------------------------ */ - - private void retrieveFullRegion(InternalRegionFunctionContext context, boolean partitioned, String desc) { - Iterator<Object[]> entries; - if (partitioned) { - PREntriesIterator<Region.Entry> iter = (PREntriesIterator<Region.Entry>) - ((LocalDataSet) PartitionRegionHelper.getLocalDataForContext(context)).entrySet().iterator(); - // entries = getPREntryIterator(iter); - entries = getSimpleEntryIterator(iter); - } else { - LocalRegion owner = (LocalRegion) context.getDataSet(); - Iterator<Region.Entry> iter = (Iterator<Region.Entry>) owner.entrySet().iterator(); - // entries = getRREntryIterator(iter, owner); - entries = getSimpleEntryIterator(iter); - } - InternalResultSender irs = (InternalResultSender) context.getResultSender(); - StructStreamingResultSender sender = new StructStreamingResultSender(irs, null, entries, desc); - sender.send(); - } - -// /** An iterator for partitioned region that uses internal API to get serialized value */ -// private Iterator<Object[]> getPREntryIterator(PREntriesIterator<Region.Entry> iterator) { -// return new WrapperIterator<Region.Entry, PREntriesIterator<Region.Entry>>(iterator) { -// @Override public Object[] next() { -// Region.Entry entry = delegate.next(); -// int bucketId = delegate.getBucketId(); -// KeyInfo keyInfo = new KeyInfo(entry.getKey(), null, bucketId); -// // owner needs to be the bucket region not the enclosing partition region -// LocalRegion owner = ((PartitionedRegion) entry.getRegion()).getDataStore().getLocalBucketById(bucketId); -// Object value = owner.getDeserializedValue(keyInfo, false, true, true, null, false); -// return new Object[] {keyInfo.getKey(), value}; -// } -// }; -// } -// -// /** An iterator for replicated region that uses internal API to get serialized value */ -// private Iterator<Object[]> getRREntryIterator(Iterator<Region.Entry> iterator, LocalRegion region) { -// final LocalRegion owner = region; -// return new WrapperIterator<Region.Entry, Iterator<Region.Entry>>(iterator) { -// @Override public Object[] next() { -// Region.Entry entry = delegate.next(); -// KeyInfo keyInfo = new KeyInfo(entry.getKey(), null, null); -// Object value = owner.getDeserializedValue(keyInfo, false, true, true, null, false); -// return new Object[] {keyInfo.getKey(), value}; -// } -// }; -// } - - // todo. compare performance of regular and simple iterator - /** An general iterator for both partitioned and replicated region that returns un-serialized value */ - private Iterator<Object[]> getSimpleEntryIterator(Iterator<Region.Entry> iterator) { - return new WrapperIterator<Region.Entry, Iterator<Region.Entry>>(iterator) { - @Override public Object[] next() { - Region.Entry entry = delegate.next(); - return new Object[] {entry.getKey(), entry.getValue()}; - } - }; - } - - /** ------------------------------------------ */ - /** abstract wrapper iterator */ - /** ------------------------------------------ */ - - /** An abstract wrapper iterator to reduce duplicated code of anonymous iterators */ - abstract class WrapperIterator<T, S extends Iterator<T>> implements Iterator<Object[]> { - - final S delegate; - - protected WrapperIterator(S delegate) { - this.delegate = delegate; - } - - @Override public boolean hasNext() { - return delegate.hasNext(); - } - - @Override public void remove() { } - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/54cf6bf5/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/RetrieveRegionMetadataFunction.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/RetrieveRegionMetadataFunction.java b/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/RetrieveRegionMetadataFunction.java deleted file mode 100644 index 6041b70..0000000 --- a/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/RetrieveRegionMetadataFunction.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.internal.geodefunctions; - -import org.apache.geode.cache.execute.Function; -import org.apache.geode.cache.execute.FunctionContext; -import org.apache.geode.cache.execute.ResultSender; -import org.apache.geode.distributed.internal.ServerLocation; -import org.apache.geode.internal.cache.BucketServerLocation66; -import org.apache.geode.internal.cache.LocalRegion; -import org.apache.geode.internal.cache.PartitionedRegion; -import org.apache.geode.internal.cache.execute.InternalRegionFunctionContext; -import io.pivotal.geode.spark.connector.internal.RegionMetadata; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; - -/** - * This GemFire function retrieve region metadata - */ -public class RetrieveRegionMetadataFunction implements Function { - - public final static String ID = "geode-spark-retrieve-region-metadata"; - - private static final RetrieveRegionMetadataFunction instance = new RetrieveRegionMetadataFunction(); - - public RetrieveRegionMetadataFunction() { - } - - public static RetrieveRegionMetadataFunction getInstance() { - return instance; - } - - @Override - public String getId() { - return ID; - } - - @Override - public boolean optimizeForWrite() { - return false; - } - - @Override - public boolean isHA() { - return true; - } - - @Override - public boolean hasResult() { - return true; - } - - @Override - public void execute(FunctionContext context) { - LocalRegion region = (LocalRegion) ((InternalRegionFunctionContext) context).getDataSet(); - String regionPath = region.getFullPath(); - boolean isPartitioned = region.getDataPolicy().withPartitioning(); - String kTypeName = getTypeClassName(region.getAttributes().getKeyConstraint()); - String vTypeName = getTypeClassName(region.getAttributes().getValueConstraint()); - - RegionMetadata metadata; - if (! isPartitioned) { - metadata = new RegionMetadata(regionPath, false, 0, null, kTypeName, vTypeName); - } else { - PartitionedRegion pregion = (PartitionedRegion) region; - int totalBuckets = pregion.getAttributes().getPartitionAttributes().getTotalNumBuckets(); - Map<Integer, List<BucketServerLocation66>> bucketMap = pregion.getRegionAdvisor().getAllClientBucketProfiles(); - HashMap<ServerLocation, HashSet<Integer>> serverMap = bucketServerMap2ServerBucketSetMap(bucketMap); - metadata = new RegionMetadata(regionPath, true, totalBuckets, serverMap, kTypeName, vTypeName); - } - - ResultSender<RegionMetadata> sender = context.getResultSender(); - sender.lastResult(metadata); - } - - private String getTypeClassName(Class clazz) { - return clazz == null ? null : clazz.getCanonicalName(); - } - - /** convert bucket to server map to server to bucket set map */ - private HashMap<ServerLocation, HashSet<Integer>> - bucketServerMap2ServerBucketSetMap(Map<Integer, List<BucketServerLocation66>> map) { - HashMap<ServerLocation, HashSet<Integer>> serverBucketMap = new HashMap<>(); - for (Integer id : map.keySet()) { - List<BucketServerLocation66> locations = map.get(id); - for (BucketServerLocation66 location : locations) { - ServerLocation server = new ServerLocation(location.getHostName(), location.getPort()); - if (location.isPrimary()) { - HashSet<Integer> set = serverBucketMap.get(server); - if (set == null) { - set = new HashSet<>(); - serverBucketMap.put(server, set); - } - set.add(id); - break; - } - } - } - return serverBucketMap; - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/54cf6bf5/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/StructStreamingResultSender.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/StructStreamingResultSender.java b/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/StructStreamingResultSender.java deleted file mode 100644 index 9a7dc9d..0000000 --- a/geode-spark-connector/geode-functions/src/main/java/io/pivotal/geode/spark/connector/internal/geodefunctions/StructStreamingResultSender.java +++ /dev/null @@ -1,219 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.pivotal.geode.spark.connector.internal.geodefunctions; - -import org.apache.geode.DataSerializer; -import org.apache.geode.cache.execute.ResultSender; -import org.apache.geode.cache.query.internal.types.ObjectTypeImpl; -import org.apache.geode.cache.query.internal.types.StructTypeImpl; -import org.apache.geode.cache.query.types.ObjectType; -import org.apache.geode.cache.query.types.StructType; -import org.apache.geode.internal.HeapDataOutputStream; -import org.apache.geode.internal.cache.CachedDeserializable; -import org.apache.geode.internal.logging.LogService; -import org.apache.logging.log4j.Logger; - -import java.io.IOException; -import java.util.Iterator; - -/** - * StructStreamingResultSender and StructStreamingResultCollector are paired - * to transfer result of list of `org.apache.geode.cache.query.Struct` - * from GemFire server to Spark Connector (the client of GemFire server) - * in streaming, i.e., while sender sending the result, the collector can - * start processing the arrived result without waiting for full result to - * become available. - */ -public class StructStreamingResultSender { - - public static final byte TYPE_CHUNK = 0x30; - public static final byte DATA_CHUNK = 0x31; - public static final byte ERROR_CHUNK = 0x32; - public static final byte SER_DATA = 0x41; - public static final byte UNSER_DATA = 0x42; - public static final byte BYTEARR_DATA = 0x43; - - private static ObjectTypeImpl ObjField = new ObjectTypeImpl(java.lang.Object.class); - public static StructTypeImpl KeyValueType = new StructTypeImpl(new String[]{"key", "value"}, new ObjectType[]{ObjField, ObjField}); - - private static final Logger logger = LogService.getLogger(); - private static final int CHUNK_SIZE = 4096; - - // Note: The type of ResultSender returned from GemFire FunctionContext is - // always ResultSender<Object>, so can't use ResultSender<byte[]> here - private final ResultSender<Object> sender; - private final StructType structType; - private final Iterator<Object[]> rows; - private String desc; - private boolean closed = false; - - /** - * the Constructor - * @param sender the base ResultSender that send data in byte array - * @param type the StructType of result record - * @param rows the iterator of the collection of results - * @param desc description of this result (used for logging) - */ - public StructStreamingResultSender( - ResultSender<Object> sender, StructType type, Iterator<Object[]> rows, String desc) { - if (sender == null || rows == null) - throw new NullPointerException("sender=" + sender + ", rows=" + rows); - this.sender = sender; - this.structType = type; - this.rows = rows; - this.desc = desc; - } - - /** the Constructor with default `desc` */ - public StructStreamingResultSender( - ResultSender<Object> sender, StructType type, Iterator<Object[]> rows) { - this(sender, type, rows, "StructStreamingResultSender"); - } - - /** - * Send the result in chunks. There are 3 types of chunk: TYPE, DATA, and ERROR. - * TYPE chunk for sending struct type info, DATA chunk for sending data, and - * ERROR chunk for sending exception. There are at most 1 TYPE chunk (omitted - * for `KeyValueType`) and 1 ERROR chunk (if there's error), but usually - * there are multiple DATA chunks. Each DATA chunk contains multiple rows - * of data. The chunk size is determined by the const `CHUNK_SIZE`. If an - * exception is thrown, it is serialized and sent as the last chunk of the - * result (in the form of ERROR chunk). - */ - public void send() { - if (closed) throw new RuntimeException("sender is closed."); - - HeapDataOutputStream buf = new HeapDataOutputStream(CHUNK_SIZE + 2048, null); - String dataType = null; - int typeSize = 0; - int rowCount = 0; - int dataSize = 0; - try { - if (rows.hasNext()) { - // Note: only send type info if there's data with it - typeSize = sendType(buf); - buf.writeByte(DATA_CHUNK); - int rowSize = structType == null ? 2 : structType.getFieldNames().length; - while (rows.hasNext()) { - rowCount ++; - Object[] row = rows.next(); - if (rowCount < 2) dataType = entryDataType(row); - if (rowSize != row.length) - throw new IOException(rowToString("Expect " + rowSize + " columns, but got ", row)); - serializeRowToBuffer(row, buf); - if (buf.size() > CHUNK_SIZE) { - dataSize += sendBufferredData(buf, false); - buf.writeByte(DATA_CHUNK); - } - } - } - // send last piece of data or empty byte array - dataSize += sendBufferredData(buf, true); - logger.info(desc + ": " + rowCount + " rows, type=" + dataType + ", type.size=" + - typeSize + ", data.size=" + dataSize + ", row.avg.size=" + - (rowCount == 0 ? "NaN" : String.format("%.1f", ((float) dataSize)/rowCount))); - } catch (IOException | RuntimeException e) { - sendException(buf, e); - } finally { - closed = true; - } - } - - private String rowToString(String rowDesc, Object[] row) { - StringBuilder buf = new StringBuilder(); - buf.append(rowDesc).append("("); - for (int i = 0; i < row.length; i++) buf.append(i ==0 ? "" : " ,").append(row[i]); - return buf.append(")") .toString(); - } - - private String entryDataType(Object[] row) { - StringBuilder buf = new StringBuilder(); - buf.append("("); - for (int i = 0; i < row.length; i++) { - if (i != 0) buf.append(", "); - buf.append(row[i].getClass().getCanonicalName()); - } - return buf.append(")").toString(); - } - - private void serializeRowToBuffer(Object[] row, HeapDataOutputStream buf) throws IOException { - for (Object data : row) { - if (data instanceof CachedDeserializable) { - buf.writeByte(SER_DATA); - DataSerializer.writeByteArray(((CachedDeserializable) data).getSerializedValue(), buf); - } else if (data instanceof byte[]) { - buf.writeByte(BYTEARR_DATA); - DataSerializer.writeByteArray((byte[]) data, buf); - } else { - buf.writeByte(UNSER_DATA); - DataSerializer.writeObject(data, buf); - } - } - } - - /** return the size of type data */ - private int sendType(HeapDataOutputStream buf) throws IOException { - // logger.info(desc + " struct type: " + structType); - if (structType != null) { - buf.writeByte(TYPE_CHUNK); - DataSerializer.writeObject(structType, buf); - return sendBufferredData(buf, false); - } else { - return 0; // default KeyValue type, no type info send - } - } - - private int sendBufferredData(HeapDataOutputStream buf, boolean isLast) throws IOException { - if (isLast) sender.lastResult(buf.toByteArray()); - else sender.sendResult(buf.toByteArray()); - // logData(buf.toByteArray(), desc); - int s = buf.size(); - buf.reset(); - return s; - } - - /** Send the exception as the last chunk of the result. */ - private void sendException(HeapDataOutputStream buf, Exception e) { - // Note: if exception happens during the serialization, the `buf` may contain - // partial serialized data, which may cause de-serialization hang or error. - // Therefore, always empty the buffer before sending the exception - if (buf.size() > 0) buf.reset(); - - try { - buf.writeByte(ERROR_CHUNK); - DataSerializer.writeObject(e, buf); - } catch (IOException ioe) { - logger.error("StructStreamingResultSender failed to send the result:", e); - logger.error("StructStreamingResultSender failed to serialize the exception:", ioe); - buf.reset(); - } - // Note: send empty chunk as the last result if serialization of exception - // failed, and the error is logged on the GemFire server side. - sender.lastResult(buf.toByteArray()); - // logData(buf.toByteArray(), desc); - } - -// private void logData(byte[] data, String desc) { -// StringBuilder buf = new StringBuilder(); -// buf.append(desc); -// for (byte b : data) { -// buf.append(" ").append(b); -// } -// logger.info(buf.toString()); -// } - -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/54cf6bf5/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/RegionMetadata.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/RegionMetadata.java b/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/RegionMetadata.java new file mode 100644 index 0000000..4fee0e0 --- /dev/null +++ b/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/RegionMetadata.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.internal; + +import org.apache.geode.distributed.internal.ServerLocation; + +import java.util.HashMap; +import java.util.HashSet; +import java.io.Serializable; + +/** + * This class contains all info required by GemFire RDD partitioner to create partitions. + */ +public class RegionMetadata implements Serializable { + + private String regionPath; + private boolean isPartitioned; + private int totalBuckets; + private HashMap<ServerLocation, HashSet<Integer>> serverBucketMap; + private String keyTypeName; + private String valueTypeName; + + /** + * Default constructor. + * @param regionPath the full path of the given region + * @param isPartitioned true for partitioned region, false otherwise + * @param totalBuckets number of total buckets for partitioned region, ignored otherwise + * @param serverBucketMap geode server (host:port pair) to bucket set map + * @param keyTypeName region key class name + * @param valueTypeName region value class name + */ + public RegionMetadata(String regionPath, boolean isPartitioned, int totalBuckets, HashMap<ServerLocation, HashSet<Integer>> serverBucketMap, + String keyTypeName, String valueTypeName) { + this.regionPath = regionPath; + this.isPartitioned = isPartitioned; + this.totalBuckets = totalBuckets; + this.serverBucketMap = serverBucketMap; + this.keyTypeName = keyTypeName; + this.valueTypeName = valueTypeName; + } + + public RegionMetadata(String regionPath, boolean isPartitioned, int totalBuckets, HashMap<ServerLocation, HashSet<Integer>> serverBucketMap) { + this(regionPath, isPartitioned, totalBuckets, serverBucketMap, null, null); + } + + public String getRegionPath() { + return regionPath; + } + + public boolean isPartitioned() { + return isPartitioned; + } + + public int getTotalBuckets() { + return totalBuckets; + } + + public HashMap<ServerLocation, HashSet<Integer>> getServerBucketMap() { + return serverBucketMap; + } + + public String getKeyTypeName() { + return keyTypeName; + } + + public String getValueTypeName() { + return valueTypeName; + } + + public String toString() { + StringBuilder buf = new StringBuilder(); + buf.append("RegionMetadata(region=").append(regionPath) + .append("(").append(keyTypeName).append(", ").append(valueTypeName).append(")") + .append(", partitioned=").append(isPartitioned).append(", #buckets=").append(totalBuckets) + .append(", map=").append(serverBucketMap).append(")"); + return buf.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/54cf6bf5/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/QueryFunction.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/QueryFunction.java b/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/QueryFunction.java new file mode 100644 index 0000000..6e6e295 --- /dev/null +++ b/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/QueryFunction.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.internal.geodefunctions; + +import org.apache.geode.DataSerializer; +import org.apache.geode.cache.CacheFactory; +import org.apache.geode.cache.execute.*; +import org.apache.geode.cache.query.SelectResults; +import org.apache.geode.cache.query.Query; +import org.apache.geode.internal.HeapDataOutputStream; +import org.apache.geode.internal.cache.LocalRegion; +import org.apache.geode.internal.cache.execute.InternalRegionFunctionContext; +import org.apache.geode.internal.logging.LogService; +import org.apache.logging.log4j.Logger; +import java.util.Iterator; + +public class QueryFunction implements Function { + + private static final long serialVersionUID = 4866641340803692882L; + + public final static String ID = "geode-spark-query-function"; + + private final static QueryFunction instance = new QueryFunction(); + + private static final Logger logger = LogService.getLogger(); + + private static final int CHUNK_SIZE = 1024; + + @Override + public String getId() { + return ID; + } + + public static QueryFunction getInstance() { + return instance; + } + + @Override + public boolean optimizeForWrite() { + return true; + } + + @Override + public boolean isHA() { + return true; + } + + @Override + public boolean hasResult() { + return true; + } + + @Override + public void execute(FunctionContext context) { + try { + String[] args = (String[]) context.getArguments(); + String queryString = args[0]; + String bucketSet = args[1]; + InternalRegionFunctionContext irfc = (InternalRegionFunctionContext) context; + LocalRegion localRegion = (LocalRegion) irfc.getDataSet(); + boolean partitioned = localRegion.getDataPolicy().withPartitioning(); + Query query = CacheFactory.getAnyInstance().getQueryService().newQuery(queryString); + Object result = partitioned ? query.execute((InternalRegionFunctionContext) context) : query.execute(); + ResultSender<Object> sender = context.getResultSender(); + HeapDataOutputStream buf = new HeapDataOutputStream(CHUNK_SIZE, null); + Iterator<Object> iter = ((SelectResults) result).asList().iterator(); + while (iter.hasNext()) { + Object row = iter.next(); + DataSerializer.writeObject(row, buf); + if (buf.size() > CHUNK_SIZE) { + sender.sendResult(buf.toByteArray()); + logger.debug("OQL query=" + queryString + " bucket set=" + bucketSet + " sendResult(), data size=" + buf.size()); + buf.reset(); + } + } + sender.lastResult(buf.toByteArray()); + logger.debug("OQL query=" + queryString + " bucket set=" + bucketSet + " lastResult(), data size=" + buf.size()); + buf.reset(); + } + catch(Exception e) { + throw new FunctionException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/54cf6bf5/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/RetrieveRegionFunction.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/RetrieveRegionFunction.java b/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/RetrieveRegionFunction.java new file mode 100644 index 0000000..d3a2572 --- /dev/null +++ b/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/RetrieveRegionFunction.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.internal.geodefunctions; + +import java.util.Iterator; +import org.apache.logging.log4j.Logger; + +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.CacheFactory; +import org.apache.geode.cache.execute.FunctionException; +import org.apache.geode.cache.query.Query; +import org.apache.geode.cache.query.QueryService; +import org.apache.geode.cache.query.SelectResults; +import org.apache.geode.cache.query.Struct; +import org.apache.geode.internal.cache.*; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.execute.Function; +import org.apache.geode.cache.execute.FunctionContext; +import org.apache.geode.cache.partition.PartitionRegionHelper; +import org.apache.geode.internal.cache.execute.InternalRegionFunctionContext; +import org.apache.geode.internal.cache.execute.InternalResultSender; +import org.apache.geode.internal.cache.partitioned.PREntriesIterator; +import org.apache.geode.internal.logging.LogService; + +/** + * GemFire function that is used by `SparkContext.geodeRegion(regionPath, whereClause)` + * to retrieve region data set for the given bucket set as a RDD partition + **/ +public class RetrieveRegionFunction implements Function { + + public final static String ID = "spark-geode-retrieve-region"; + private static final Logger logger = LogService.getLogger(); + private static final RetrieveRegionFunction instance = new RetrieveRegionFunction(); + + public RetrieveRegionFunction() { + } + + /** ------------------------------------------ */ + /** interface Function implementation */ + /** ------------------------------------------ */ + + public static RetrieveRegionFunction getInstance() { + return instance; + } + + @Override + public String getId() { + return ID; + } + + @Override + public boolean hasResult() { + return true; + } + + @Override + public boolean optimizeForWrite() { + return true; + } + + @Override + public boolean isHA() { + return true; + } + + @Override + public void execute(FunctionContext context) { + String[] args = (String[]) context.getArguments(); + String where = args[0]; + String taskDesc = args[1]; + InternalRegionFunctionContext irfc = (InternalRegionFunctionContext) context; + LocalRegion localRegion = (LocalRegion) irfc.getDataSet(); + boolean partitioned = localRegion.getDataPolicy().withPartitioning(); + if (where.trim().isEmpty()) + retrieveFullRegion(irfc, partitioned, taskDesc); + else + retrieveRegionWithWhereClause(irfc, localRegion, partitioned, where, taskDesc); + } + + /** ------------------------------------------ */ + /** Retrieve region data with where clause */ + /** ------------------------------------------ */ + + private void retrieveRegionWithWhereClause( + InternalRegionFunctionContext context, LocalRegion localRegion, boolean partitioned, String where, String desc) { + String regionPath = localRegion.getFullPath(); + String qstr = "select key, value from " + regionPath + ".entries where " + where; + logger.info(desc + ": " + qstr); + + try { + Cache cache = CacheFactory.getAnyInstance(); + QueryService queryService = cache.getQueryService(); + Query query = queryService.newQuery(qstr); + SelectResults<Struct> results = + (SelectResults<Struct>) (partitioned ? query.execute(context) : query.execute()); + + Iterator<Object[]> entries = getStructIteratorWrapper(results.asList().iterator()); + InternalResultSender irs = (InternalResultSender) context.getResultSender(); + StructStreamingResultSender sender = new StructStreamingResultSender(irs, null, entries, desc); + sender.send(); + } catch (Exception e) { + throw new FunctionException(e); + } + } + + private Iterator<Object[]> getStructIteratorWrapper(Iterator<Struct> entries) { + return new WrapperIterator<Struct, Iterator<Struct>>(entries) { + @Override public Object[] next() { + return delegate.next().getFieldValues(); + } + }; + } + + /** ------------------------------------------ */ + /** Retrieve full region data */ + /** ------------------------------------------ */ + + private void retrieveFullRegion(InternalRegionFunctionContext context, boolean partitioned, String desc) { + Iterator<Object[]> entries; + if (partitioned) { + PREntriesIterator<Region.Entry> iter = (PREntriesIterator<Region.Entry>) + ((LocalDataSet) PartitionRegionHelper.getLocalDataForContext(context)).entrySet().iterator(); + // entries = getPREntryIterator(iter); + entries = getSimpleEntryIterator(iter); + } else { + LocalRegion owner = (LocalRegion) context.getDataSet(); + Iterator<Region.Entry> iter = (Iterator<Region.Entry>) owner.entrySet().iterator(); + // entries = getRREntryIterator(iter, owner); + entries = getSimpleEntryIterator(iter); + } + InternalResultSender irs = (InternalResultSender) context.getResultSender(); + StructStreamingResultSender sender = new StructStreamingResultSender(irs, null, entries, desc); + sender.send(); + } + +// /** An iterator for partitioned region that uses internal API to get serialized value */ +// private Iterator<Object[]> getPREntryIterator(PREntriesIterator<Region.Entry> iterator) { +// return new WrapperIterator<Region.Entry, PREntriesIterator<Region.Entry>>(iterator) { +// @Override public Object[] next() { +// Region.Entry entry = delegate.next(); +// int bucketId = delegate.getBucketId(); +// KeyInfo keyInfo = new KeyInfo(entry.getKey(), null, bucketId); +// // owner needs to be the bucket region not the enclosing partition region +// LocalRegion owner = ((PartitionedRegion) entry.getRegion()).getDataStore().getLocalBucketById(bucketId); +// Object value = owner.getDeserializedValue(keyInfo, false, true, true, null, false); +// return new Object[] {keyInfo.getKey(), value}; +// } +// }; +// } +// +// /** An iterator for replicated region that uses internal API to get serialized value */ +// private Iterator<Object[]> getRREntryIterator(Iterator<Region.Entry> iterator, LocalRegion region) { +// final LocalRegion owner = region; +// return new WrapperIterator<Region.Entry, Iterator<Region.Entry>>(iterator) { +// @Override public Object[] next() { +// Region.Entry entry = delegate.next(); +// KeyInfo keyInfo = new KeyInfo(entry.getKey(), null, null); +// Object value = owner.getDeserializedValue(keyInfo, false, true, true, null, false); +// return new Object[] {keyInfo.getKey(), value}; +// } +// }; +// } + + // todo. compare performance of regular and simple iterator + /** An general iterator for both partitioned and replicated region that returns un-serialized value */ + private Iterator<Object[]> getSimpleEntryIterator(Iterator<Region.Entry> iterator) { + return new WrapperIterator<Region.Entry, Iterator<Region.Entry>>(iterator) { + @Override public Object[] next() { + Region.Entry entry = delegate.next(); + return new Object[] {entry.getKey(), entry.getValue()}; + } + }; + } + + /** ------------------------------------------ */ + /** abstract wrapper iterator */ + /** ------------------------------------------ */ + + /** An abstract wrapper iterator to reduce duplicated code of anonymous iterators */ + abstract class WrapperIterator<T, S extends Iterator<T>> implements Iterator<Object[]> { + + final S delegate; + + protected WrapperIterator(S delegate) { + this.delegate = delegate; + } + + @Override public boolean hasNext() { + return delegate.hasNext(); + } + + @Override public void remove() { } + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/54cf6bf5/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/RetrieveRegionMetadataFunction.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/RetrieveRegionMetadataFunction.java b/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/RetrieveRegionMetadataFunction.java new file mode 100644 index 0000000..6041b70 --- /dev/null +++ b/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/RetrieveRegionMetadataFunction.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.internal.geodefunctions; + +import org.apache.geode.cache.execute.Function; +import org.apache.geode.cache.execute.FunctionContext; +import org.apache.geode.cache.execute.ResultSender; +import org.apache.geode.distributed.internal.ServerLocation; +import org.apache.geode.internal.cache.BucketServerLocation66; +import org.apache.geode.internal.cache.LocalRegion; +import org.apache.geode.internal.cache.PartitionedRegion; +import org.apache.geode.internal.cache.execute.InternalRegionFunctionContext; +import io.pivotal.geode.spark.connector.internal.RegionMetadata; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +/** + * This GemFire function retrieve region metadata + */ +public class RetrieveRegionMetadataFunction implements Function { + + public final static String ID = "geode-spark-retrieve-region-metadata"; + + private static final RetrieveRegionMetadataFunction instance = new RetrieveRegionMetadataFunction(); + + public RetrieveRegionMetadataFunction() { + } + + public static RetrieveRegionMetadataFunction getInstance() { + return instance; + } + + @Override + public String getId() { + return ID; + } + + @Override + public boolean optimizeForWrite() { + return false; + } + + @Override + public boolean isHA() { + return true; + } + + @Override + public boolean hasResult() { + return true; + } + + @Override + public void execute(FunctionContext context) { + LocalRegion region = (LocalRegion) ((InternalRegionFunctionContext) context).getDataSet(); + String regionPath = region.getFullPath(); + boolean isPartitioned = region.getDataPolicy().withPartitioning(); + String kTypeName = getTypeClassName(region.getAttributes().getKeyConstraint()); + String vTypeName = getTypeClassName(region.getAttributes().getValueConstraint()); + + RegionMetadata metadata; + if (! isPartitioned) { + metadata = new RegionMetadata(regionPath, false, 0, null, kTypeName, vTypeName); + } else { + PartitionedRegion pregion = (PartitionedRegion) region; + int totalBuckets = pregion.getAttributes().getPartitionAttributes().getTotalNumBuckets(); + Map<Integer, List<BucketServerLocation66>> bucketMap = pregion.getRegionAdvisor().getAllClientBucketProfiles(); + HashMap<ServerLocation, HashSet<Integer>> serverMap = bucketServerMap2ServerBucketSetMap(bucketMap); + metadata = new RegionMetadata(regionPath, true, totalBuckets, serverMap, kTypeName, vTypeName); + } + + ResultSender<RegionMetadata> sender = context.getResultSender(); + sender.lastResult(metadata); + } + + private String getTypeClassName(Class clazz) { + return clazz == null ? null : clazz.getCanonicalName(); + } + + /** convert bucket to server map to server to bucket set map */ + private HashMap<ServerLocation, HashSet<Integer>> + bucketServerMap2ServerBucketSetMap(Map<Integer, List<BucketServerLocation66>> map) { + HashMap<ServerLocation, HashSet<Integer>> serverBucketMap = new HashMap<>(); + for (Integer id : map.keySet()) { + List<BucketServerLocation66> locations = map.get(id); + for (BucketServerLocation66 location : locations) { + ServerLocation server = new ServerLocation(location.getHostName(), location.getPort()); + if (location.isPrimary()) { + HashSet<Integer> set = serverBucketMap.get(server); + if (set == null) { + set = new HashSet<>(); + serverBucketMap.put(server, set); + } + set.add(id); + break; + } + } + } + return serverBucketMap; + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/54cf6bf5/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/StructStreamingResultSender.java ---------------------------------------------------------------------- diff --git a/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/StructStreamingResultSender.java b/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/StructStreamingResultSender.java new file mode 100644 index 0000000..9a7dc9d --- /dev/null +++ b/geode-spark-connector/geode-functions/src/main/java/org/apache/geode/spark/connector/internal/geodefunctions/StructStreamingResultSender.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.pivotal.geode.spark.connector.internal.geodefunctions; + +import org.apache.geode.DataSerializer; +import org.apache.geode.cache.execute.ResultSender; +import org.apache.geode.cache.query.internal.types.ObjectTypeImpl; +import org.apache.geode.cache.query.internal.types.StructTypeImpl; +import org.apache.geode.cache.query.types.ObjectType; +import org.apache.geode.cache.query.types.StructType; +import org.apache.geode.internal.HeapDataOutputStream; +import org.apache.geode.internal.cache.CachedDeserializable; +import org.apache.geode.internal.logging.LogService; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.util.Iterator; + +/** + * StructStreamingResultSender and StructStreamingResultCollector are paired + * to transfer result of list of `org.apache.geode.cache.query.Struct` + * from GemFire server to Spark Connector (the client of GemFire server) + * in streaming, i.e., while sender sending the result, the collector can + * start processing the arrived result without waiting for full result to + * become available. + */ +public class StructStreamingResultSender { + + public static final byte TYPE_CHUNK = 0x30; + public static final byte DATA_CHUNK = 0x31; + public static final byte ERROR_CHUNK = 0x32; + public static final byte SER_DATA = 0x41; + public static final byte UNSER_DATA = 0x42; + public static final byte BYTEARR_DATA = 0x43; + + private static ObjectTypeImpl ObjField = new ObjectTypeImpl(java.lang.Object.class); + public static StructTypeImpl KeyValueType = new StructTypeImpl(new String[]{"key", "value"}, new ObjectType[]{ObjField, ObjField}); + + private static final Logger logger = LogService.getLogger(); + private static final int CHUNK_SIZE = 4096; + + // Note: The type of ResultSender returned from GemFire FunctionContext is + // always ResultSender<Object>, so can't use ResultSender<byte[]> here + private final ResultSender<Object> sender; + private final StructType structType; + private final Iterator<Object[]> rows; + private String desc; + private boolean closed = false; + + /** + * the Constructor + * @param sender the base ResultSender that send data in byte array + * @param type the StructType of result record + * @param rows the iterator of the collection of results + * @param desc description of this result (used for logging) + */ + public StructStreamingResultSender( + ResultSender<Object> sender, StructType type, Iterator<Object[]> rows, String desc) { + if (sender == null || rows == null) + throw new NullPointerException("sender=" + sender + ", rows=" + rows); + this.sender = sender; + this.structType = type; + this.rows = rows; + this.desc = desc; + } + + /** the Constructor with default `desc` */ + public StructStreamingResultSender( + ResultSender<Object> sender, StructType type, Iterator<Object[]> rows) { + this(sender, type, rows, "StructStreamingResultSender"); + } + + /** + * Send the result in chunks. There are 3 types of chunk: TYPE, DATA, and ERROR. + * TYPE chunk for sending struct type info, DATA chunk for sending data, and + * ERROR chunk for sending exception. There are at most 1 TYPE chunk (omitted + * for `KeyValueType`) and 1 ERROR chunk (if there's error), but usually + * there are multiple DATA chunks. Each DATA chunk contains multiple rows + * of data. The chunk size is determined by the const `CHUNK_SIZE`. If an + * exception is thrown, it is serialized and sent as the last chunk of the + * result (in the form of ERROR chunk). + */ + public void send() { + if (closed) throw new RuntimeException("sender is closed."); + + HeapDataOutputStream buf = new HeapDataOutputStream(CHUNK_SIZE + 2048, null); + String dataType = null; + int typeSize = 0; + int rowCount = 0; + int dataSize = 0; + try { + if (rows.hasNext()) { + // Note: only send type info if there's data with it + typeSize = sendType(buf); + buf.writeByte(DATA_CHUNK); + int rowSize = structType == null ? 2 : structType.getFieldNames().length; + while (rows.hasNext()) { + rowCount ++; + Object[] row = rows.next(); + if (rowCount < 2) dataType = entryDataType(row); + if (rowSize != row.length) + throw new IOException(rowToString("Expect " + rowSize + " columns, but got ", row)); + serializeRowToBuffer(row, buf); + if (buf.size() > CHUNK_SIZE) { + dataSize += sendBufferredData(buf, false); + buf.writeByte(DATA_CHUNK); + } + } + } + // send last piece of data or empty byte array + dataSize += sendBufferredData(buf, true); + logger.info(desc + ": " + rowCount + " rows, type=" + dataType + ", type.size=" + + typeSize + ", data.size=" + dataSize + ", row.avg.size=" + + (rowCount == 0 ? "NaN" : String.format("%.1f", ((float) dataSize)/rowCount))); + } catch (IOException | RuntimeException e) { + sendException(buf, e); + } finally { + closed = true; + } + } + + private String rowToString(String rowDesc, Object[] row) { + StringBuilder buf = new StringBuilder(); + buf.append(rowDesc).append("("); + for (int i = 0; i < row.length; i++) buf.append(i ==0 ? "" : " ,").append(row[i]); + return buf.append(")") .toString(); + } + + private String entryDataType(Object[] row) { + StringBuilder buf = new StringBuilder(); + buf.append("("); + for (int i = 0; i < row.length; i++) { + if (i != 0) buf.append(", "); + buf.append(row[i].getClass().getCanonicalName()); + } + return buf.append(")").toString(); + } + + private void serializeRowToBuffer(Object[] row, HeapDataOutputStream buf) throws IOException { + for (Object data : row) { + if (data instanceof CachedDeserializable) { + buf.writeByte(SER_DATA); + DataSerializer.writeByteArray(((CachedDeserializable) data).getSerializedValue(), buf); + } else if (data instanceof byte[]) { + buf.writeByte(BYTEARR_DATA); + DataSerializer.writeByteArray((byte[]) data, buf); + } else { + buf.writeByte(UNSER_DATA); + DataSerializer.writeObject(data, buf); + } + } + } + + /** return the size of type data */ + private int sendType(HeapDataOutputStream buf) throws IOException { + // logger.info(desc + " struct type: " + structType); + if (structType != null) { + buf.writeByte(TYPE_CHUNK); + DataSerializer.writeObject(structType, buf); + return sendBufferredData(buf, false); + } else { + return 0; // default KeyValue type, no type info send + } + } + + private int sendBufferredData(HeapDataOutputStream buf, boolean isLast) throws IOException { + if (isLast) sender.lastResult(buf.toByteArray()); + else sender.sendResult(buf.toByteArray()); + // logData(buf.toByteArray(), desc); + int s = buf.size(); + buf.reset(); + return s; + } + + /** Send the exception as the last chunk of the result. */ + private void sendException(HeapDataOutputStream buf, Exception e) { + // Note: if exception happens during the serialization, the `buf` may contain + // partial serialized data, which may cause de-serialization hang or error. + // Therefore, always empty the buffer before sending the exception + if (buf.size() > 0) buf.reset(); + + try { + buf.writeByte(ERROR_CHUNK); + DataSerializer.writeObject(e, buf); + } catch (IOException ioe) { + logger.error("StructStreamingResultSender failed to send the result:", e); + logger.error("StructStreamingResultSender failed to serialize the exception:", ioe); + buf.reset(); + } + // Note: send empty chunk as the last result if serialization of exception + // failed, and the error is logged on the GemFire server side. + sender.lastResult(buf.toByteArray()); + // logData(buf.toByteArray(), desc); + } + +// private void logData(byte[] data, String desc) { +// StringBuilder buf = new StringBuilder(); +// buf.append(desc); +// for (byte b : data) { +// buf.append(" ").append(b); +// } +// logger.info(buf.toString()); +// } + +}