[S2GRAPH-57]: Change package names into org.apahce.s2graph.

  refactor package names on all projects.

JIRA:
  [S2GRAPH-57] https://issues.apache.org/jira/browse/S2GRAPH-57

Pull Request:
  Closes #40


Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/b8a15217
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/b8a15217
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/b8a15217

Branch: refs/heads/master
Commit: b8a1521764150ab7be6bb736108da924c626d045
Parents: e71264d
Author: DO YUNG YOON <[email protected]>
Authored: Tue Mar 15 18:26:19 2016 +0900
Committer: DO YUNG YOON <[email protected]>
Committed: Tue Mar 15 23:17:41 2016 +0900

----------------------------------------------------------------------
 CHANGES                                         |    2 +
 dev_support/docker-compose.yml                  |    3 -
 .../loader/spark/BulkLoadPartitioner.scala      |   56 +
 .../loader/spark/FamilyHFileWriteOptions.scala  |   35 +
 .../s2graph/loader/spark/HBaseContext.scala     |  849 ++++++++++++
 .../loader/spark/HBaseDStreamFunctions.scala    |  158 +++
 .../loader/spark/HBaseRDDFunctions.scala        |  207 +++
 .../s2graph/loader/spark/JavaHBaseContext.scala |  342 +++++
 .../loader/spark/KeyFamilyQualifier.scala       |   46 +
 .../loader/subscriber/GraphSubscriber.scala     |  335 +++++
 .../loader/subscriber/TransferToHFile.scala     |  194 +++
 .../s2graph/loader/subscriber/WalLogStat.scala  |   90 ++
 .../loader/subscriber/WalLogToHDFS.scala        |  148 +++
 .../main/scala/spark/BulkLoadPartitioner.scala  |   56 -
 .../ColumnFamilyQualifierMapKeyWrapper.scala    |   73 --
 loader/src/main/scala/spark/DefaultSource.scala |  982 --------------
 .../scala/spark/FamilyHFileWriteOptions.scala   |   35 -
 loader/src/main/scala/spark/HBaseContext.scala  |  849 ------------
 .../scala/spark/HBaseDStreamFunctions.scala     |  158 ---
 .../main/scala/spark/HBaseRDDFunctions.scala    |  207 ---
 .../src/main/scala/spark/JavaHBaseContext.scala |  342 -----
 .../main/scala/spark/KeyFamilyQualifier.scala   |   46 -
 .../main/scala/subscriber/GraphSubscriber.scala |  339 -----
 .../subscriber/GraphSubscriberStreaming.scala   |  103 --
 .../src/main/scala/subscriber/KafkaToHdfs.scala |  111 --
 .../main/scala/subscriber/TestEdgeBuilder.scala |   66 -
 .../main/scala/subscriber/TransferToHFile.scala |  195 ---
 .../scala/subscriber/VertexDegreeBuilder.scala  |  102 --
 .../src/main/scala/subscriber/WalLogStat.scala  |   94 --
 .../main/scala/subscriber/WalLogToHDFS.scala    |  149 ---
 .../loader/subscriber/GraphSubscriberTest.scala |   45 +
 .../loader/subscriber/TransferToHFileTest.scala |  169 +++
 .../scala/subscriber/GraphSubscriberTest.scala  |   45 -
 .../scala/subscriber/TransferToHFileTest.scala  |  169 ---
 .../scala/com/kakao/s2graph/core/Edge.scala     |  568 --------
 .../kakao/s2graph/core/ExceptionHandler.scala   |  162 ---
 .../scala/com/kakao/s2graph/core/Graph.scala    |  381 ------
 .../com/kakao/s2graph/core/GraphElement.scala   |   12 -
 .../kakao/s2graph/core/GraphExceptions.scala    |   29 -
 .../com/kakao/s2graph/core/GraphUtil.scala      |  138 --
 .../com/kakao/s2graph/core/JSONParser.scala     |  134 --
 .../com/kakao/s2graph/core/Management.scala     |  368 ------
 .../com/kakao/s2graph/core/OrderingUtil.scala   |  146 ---
 .../com/kakao/s2graph/core/PostProcess.scala    |  554 --------
 .../com/kakao/s2graph/core/QueryParam.scala     |  583 ---------
 .../com/kakao/s2graph/core/QueryResult.scala    |   42 -
 .../scala/com/kakao/s2graph/core/Vertex.scala   |  107 --
 .../com/kakao/s2graph/core/mysqls/Bucket.scala  |   66 -
 .../kakao/s2graph/core/mysqls/ColumnMeta.scala  |  110 --
 .../kakao/s2graph/core/mysqls/Experiment.scala  |   78 --
 .../com/kakao/s2graph/core/mysqls/Label.scala   |  372 ------
 .../kakao/s2graph/core/mysqls/LabelIndex.scala  |  143 --
 .../kakao/s2graph/core/mysqls/LabelMeta.scala   |  151 ---
 .../com/kakao/s2graph/core/mysqls/Model.scala   |  100 --
 .../com/kakao/s2graph/core/mysqls/Service.scala |   96 --
 .../s2graph/core/mysqls/ServiceColumn.scala     |   82 --
 .../s2graph/core/parsers/WhereParser.scala      |  207 ---
 .../kakao/s2graph/core/rest/RequestParser.scala |  609 ---------
 .../kakao/s2graph/core/rest/RestHandler.scala   |  245 ----
 .../s2graph/core/storage/Deserializable.scala   |   25 -
 .../kakao/s2graph/core/storage/SKeyValue.scala  |   47 -
 .../s2graph/core/storage/Serializable.scala     |   10 -
 .../kakao/s2graph/core/storage/Storage.scala    | 1224 ------------------
 .../core/storage/StorageDeserializable.scala    |   95 --
 .../core/storage/StorageSerializable.scala      |   41 -
 .../core/storage/hbase/AsynchbaseStorage.scala  |  533 --------
 .../tall/IndexEdgeDeserializable.scala          |  132 --
 .../indexedge/tall/IndexEdgeSerializable.scala  |   53 -
 .../wide/IndexEdgeDeserializable.scala          |  116 --
 .../indexedge/wide/IndexEdgeSerializable.scala  |   53 -
 .../tall/SnapshotEdgeDeserializable.scala       |   84 --
 .../tall/SnapshotEdgeSerializable.scala         |   47 -
 .../wide/SnapshotEdgeDeserializable.scala       |   70 -
 .../wide/SnapshotEdgeSerializable.scala         |   50 -
 .../serde/vertex/VertexDeserializable.scala     |   46 -
 .../serde/vertex/VertexSerializable.scala       |   20 -
 .../kakao/s2graph/core/types/HBaseType.scala    |  163 ---
 .../kakao/s2graph/core/types/InnerValLike.scala |  246 ----
 .../s2graph/core/types/LabelWithDirection.scala |   65 -
 .../com/kakao/s2graph/core/types/VertexId.scala |  145 ---
 .../kakao/s2graph/core/types/v1/InnerVal.scala  |  226 ----
 .../kakao/s2graph/core/types/v2/InnerVal.scala  |  162 ---
 .../kakao/s2graph/core/utils/DeferCache.scala   |   82 --
 .../kakao/s2graph/core/utils/Extentions.scala   |   72 --
 .../kakao/s2graph/core/utils/FutureCache.scala  |   82 --
 .../com/kakao/s2graph/core/utils/Logger.scala   |   44 -
 .../s2graph/core/utils/SafeUpdateCache.scala    |   62 -
 .../scala/org/apache/s2graph/core/Edge.scala    |  567 ++++++++
 .../apache/s2graph/core/ExceptionHandler.scala  |  159 +++
 .../scala/org/apache/s2graph/core/Graph.scala   |  379 ++++++
 .../org/apache/s2graph/core/GraphElement.scala  |   10 +
 .../apache/s2graph/core/GraphExceptions.scala   |   28 +
 .../org/apache/s2graph/core/GraphUtil.scala     |  139 ++
 .../org/apache/s2graph/core/JSONParser.scala    |  134 ++
 .../org/apache/s2graph/core/Management.scala    |  368 ++++++
 .../org/apache/s2graph/core/OrderingUtil.scala  |  146 +++
 .../org/apache/s2graph/core/PostProcess.scala   |  553 ++++++++
 .../org/apache/s2graph/core/QueryParam.scala    |  583 +++++++++
 .../org/apache/s2graph/core/QueryResult.scala   |   42 +
 .../scala/org/apache/s2graph/core/Vertex.scala  |  101 ++
 .../org/apache/s2graph/core/mysqls/Bucket.scala |   66 +
 .../apache/s2graph/core/mysqls/ColumnMeta.scala |  110 ++
 .../apache/s2graph/core/mysqls/Experiment.scala |   77 ++
 .../org/apache/s2graph/core/mysqls/Label.scala  |  372 ++++++
 .../apache/s2graph/core/mysqls/LabelIndex.scala |  143 ++
 .../apache/s2graph/core/mysqls/LabelMeta.scala  |  150 +++
 .../org/apache/s2graph/core/mysqls/Model.scala  |  100 ++
 .../apache/s2graph/core/mysqls/Service.scala    |   94 ++
 .../s2graph/core/mysqls/ServiceColumn.scala     |   82 ++
 .../s2graph/core/parsers/WhereParser.scala      |  207 +++
 .../s2graph/core/rest/RequestParser.scala       |  610 +++++++++
 .../apache/s2graph/core/rest/RestHandler.scala  |  244 ++++
 .../s2graph/core/storage/Deserializable.scala   |   24 +
 .../apache/s2graph/core/storage/SKeyValue.scala |   47 +
 .../s2graph/core/storage/Serializable.scala     |    8 +
 .../apache/s2graph/core/storage/Storage.scala   | 1224 ++++++++++++++++++
 .../core/storage/StorageDeserializable.scala    |   95 ++
 .../core/storage/StorageSerializable.scala      |   41 +
 .../core/storage/hbase/AsynchbaseStorage.scala  |  534 ++++++++
 .../tall/IndexEdgeDeserializable.scala          |  132 ++
 .../indexedge/tall/IndexEdgeSerializable.scala  |   53 +
 .../wide/IndexEdgeDeserializable.scala          |  116 ++
 .../indexedge/wide/IndexEdgeSerializable.scala  |   52 +
 .../tall/SnapshotEdgeDeserializable.scala       |   84 ++
 .../tall/SnapshotEdgeSerializable.scala         |   47 +
 .../wide/SnapshotEdgeDeserializable.scala       |   70 +
 .../wide/SnapshotEdgeSerializable.scala         |   51 +
 .../serde/vertex/VertexDeserializable.scala     |   47 +
 .../serde/vertex/VertexSerializable.scala       |   19 +
 .../apache/s2graph/core/types/HBaseType.scala   |  160 +++
 .../s2graph/core/types/InnerValLike.scala       |  243 ++++
 .../s2graph/core/types/LabelWithDirection.scala |   61 +
 .../apache/s2graph/core/types/VertexId.scala    |  142 ++
 .../apache/s2graph/core/types/v1/InnerVal.scala |  224 ++++
 .../apache/s2graph/core/types/v2/InnerVal.scala |  160 +++
 .../apache/s2graph/core/utils/DeferCache.scala  |   82 ++
 .../apache/s2graph/core/utils/Extentions.scala  |   73 ++
 .../apache/s2graph/core/utils/FutureCache.scala |   82 ++
 .../org/apache/s2graph/core/utils/Logger.scala  |   44 +
 .../s2graph/core/utils/SafeUpdateCache.scala    |   62 +
 .../scala/com/kakao/s2graph/core/EdgeTest.scala |  536 --------
 .../kakao/s2graph/core/Integrate/CrudTest.scala |  226 ----
 .../core/Integrate/IntegrateCommon.scala        |  309 -----
 .../s2graph/core/Integrate/QueryTest.scala      |  909 -------------
 .../core/Integrate/StrongLabelDeleteTest.scala  |  283 ----
 .../core/Integrate/VertexTestHelper.scala       |   71 -
 .../core/Integrate/WeakLabelDeleteTest.scala    |  136 --
 .../com/kakao/s2graph/core/JsonParserTest.scala |   66 -
 .../kakao/s2graph/core/OrderingUtilTest.scala   |  130 --
 .../com/kakao/s2graph/core/QueryParamTest.scala |   86 --
 .../com/kakao/s2graph/core/TestCommon.scala     |  192 ---
 .../s2graph/core/TestCommonWithModels.scala     |  197 ---
 .../com/kakao/s2graph/core/VertexTest.scala     |   79 --
 .../kakao/s2graph/core/models/ModelTest.scala   |  135 --
 .../s2graph/core/mysqls/ExperimentSpec.scala    |   64 -
 .../s2graph/core/parsers/WhereParserTest.scala  |  234 ----
 .../hbase/AsynchbaseQueryBuilderTest.scala      |   53 -
 .../storage/hbase/AsynchbaseStorageTest.scala   |   36 -
 .../core/storage/hbase/IndexEdgeTest.scala      |   81 --
 .../s2graph/core/types/CompositeIdTest.scala    |  109 --
 .../kakao/s2graph/core/types/EdgeTypeTest.scala |   69 -
 .../kakao/s2graph/core/types/InnerValTest.scala |  130 --
 .../s2graph/core/types/SourceVertexIdTest.scala |   52 -
 .../s2graph/core/types/TargetVertexIdTest.scala |   53 -
 .../kakao/s2graph/core/types/VertexIdTest.scala |   52 -
 .../s2graph/core/types/VertexTypeTest.scala     |   42 -
 .../org/apache/s2graph/core/EdgeTest.scala      |  535 ++++++++
 .../s2graph/core/Integrate/CrudTest.scala       |  226 ++++
 .../core/Integrate/IntegrateCommon.scala        |  309 +++++
 .../s2graph/core/Integrate/QueryTest.scala      |  906 +++++++++++++
 .../core/Integrate/StrongLabelDeleteTest.scala  |  283 ++++
 .../core/Integrate/VertexTestHelper.scala       |   71 +
 .../core/Integrate/WeakLabelDeleteTest.scala    |  136 ++
 .../apache/s2graph/core/JsonParserTest.scala    |   66 +
 .../apache/s2graph/core/OrderingUtilTest.scala  |  130 ++
 .../apache/s2graph/core/QueryParamTest.scala    |   86 ++
 .../org/apache/s2graph/core/TestCommon.scala    |  188 +++
 .../s2graph/core/TestCommonWithModels.scala     |  194 +++
 .../apache/s2graph/core/models/ModelTest.scala  |  130 ++
 .../s2graph/core/mysqls/ExperimentSpec.scala    |   64 +
 .../s2graph/core/parsers/WhereParserTest.scala  |  234 ++++
 .../storage/hbase/AsynchbaseStorageTest.scala   |   33 +
 .../core/storage/hbase/IndexEdgeTest.scala      |   81 ++
 .../s2graph/core/types/InnerValTest.scala       |  128 ++
 .../org/apache/s2graph/counter/TrxLog.scala     |   25 +
 .../counter/config/ConfigFunctions.scala        |   30 +
 .../counter/config/S2CounterConfig.scala        |   44 +
 .../apache/s2graph/counter/core/BytesUtil.scala |   13 +
 .../s2graph/counter/core/ExactCounter.scala     |  243 ++++
 .../apache/s2graph/counter/core/ExactKey.scala  |   26 +
 .../s2graph/counter/core/ExactQualifier.scala   |   73 ++
 .../s2graph/counter/core/ExactStorage.scala     |   32 +
 .../s2graph/counter/core/RankingCounter.scala   |  101 ++
 .../s2graph/counter/core/RankingKey.scala       |    4 +
 .../s2graph/counter/core/RankingResult.scala    |    3 +
 .../s2graph/counter/core/RankingStorage.scala   |   17 +
 .../s2graph/counter/core/RankingValue.scala     |   15 +
 .../s2graph/counter/core/RateRankingValue.scala |   15 +
 .../s2graph/counter/core/TimedQualifier.scala   |  215 +++
 .../s2graph/counter/core/v1/BytesUtilV1.scala   |   60 +
 .../core/v1/ExactStorageAsyncHBase.scala        |  317 +++++
 .../counter/core/v1/ExactStorageHBase.scala     |  326 +++++
 .../counter/core/v1/RankingStorageRedis.scala   |  185 +++
 .../s2graph/counter/core/v2/BytesUtilV2.scala   |   88 ++
 .../counter/core/v2/ExactStorageGraph.scala     |  343 +++++
 .../counter/core/v2/GraphOperation.scala        |   48 +
 .../counter/core/v2/RankingStorageGraph.scala   |  395 ++++++
 .../s2graph/counter/decay/DecayFormula.scala    |    6 +
 .../s2graph/counter/decay/ExpDecayFormula.scala |   25 +
 .../s2graph/counter/helper/CounterAdmin.scala   |  113 ++
 .../counter/helper/DistributedScanner.scala     |   70 +
 .../counter/helper/HashShardingJedis.scala      |  153 +++
 .../s2graph/counter/helper/Management.scala     |  143 ++
 .../s2graph/counter/helper/WithHBase.scala      |   98 ++
 .../s2graph/counter/helper/WithRedis.scala      |   59 +
 .../s2graph/counter/models/CachedDBModel.scala  |   11 +
 .../apache/s2graph/counter/models/Counter.scala |  210 +++
 .../apache/s2graph/counter/models/DBModel.scala |   24 +
 .../org/apache/s2graph/counter/package.scala    |    8 +
 .../s2graph/counter/util/CartesianProduct.scala |    8 +
 .../s2graph/counter/util/CollectionCache.scala  |   66 +
 .../s2graph/counter/util/FunctionParser.scala   |   21 +
 .../apache/s2graph/counter/util/Hashes.scala    |   21 +
 .../s2graph/counter/util/ReduceMapValue.scala   |    9 +
 .../org/apache/s2graph/counter/util/Retry.scala |   44 +
 .../s2graph/counter/util/SplitBytes.scala       |   21 +
 .../s2graph/counter/util/UnitConverter.scala    |   25 +
 .../main/scala/s2/config/ConfigFunctions.scala  |   33 -
 .../main/scala/s2/config/S2CounterConfig.scala  |   47 -
 .../src/main/scala/s2/counter/TrxLog.scala      |   28 -
 .../main/scala/s2/counter/core/BytesUtil.scala  |   15 -
 .../scala/s2/counter/core/ExactCounter.scala    |  247 ----
 .../main/scala/s2/counter/core/ExactKey.scala   |   29 -
 .../scala/s2/counter/core/ExactQualifier.scala  |   78 --
 .../scala/s2/counter/core/ExactStorage.scala    |   35 -
 .../scala/s2/counter/core/RankingCounter.scala  |  105 --
 .../main/scala/s2/counter/core/RankingKey.scala |    6 -
 .../scala/s2/counter/core/RankingResult.scala   |    6 -
 .../scala/s2/counter/core/RankingStorage.scala  |   19 -
 .../scala/s2/counter/core/RankingValue.scala    |   19 -
 .../s2/counter/core/RateRankingValue.scala      |   18 -
 .../scala/s2/counter/core/TimedQualifier.scala  |  218 ----
 .../scala/s2/counter/core/v1/BytesUtilV1.scala  |   64 -
 .../core/v1/ExactStorageAsyncHBase.scala        |  321 -----
 .../s2/counter/core/v1/ExactStorageHBase.scala  |  327 -----
 .../counter/core/v1/RankingStorageRedis.scala   |  189 ---
 .../scala/s2/counter/core/v2/BytesUtilV2.scala  |   91 --
 .../s2/counter/core/v2/ExactStorageGraph.scala  |  346 -----
 .../s2/counter/core/v2/GraphOperation.scala     |   52 -
 .../counter/core/v2/RankingStorageGraph.scala   |  399 ------
 .../scala/s2/counter/decay/DecayFormula.scala   |    9 -
 .../s2/counter/decay/ExpDecayFormula.scala      |   28 -
 .../src/main/scala/s2/counter/package.scala     |   11 -
 .../src/main/scala/s2/helper/CounterAdmin.scala |  114 --
 .../scala/s2/helper/DistributedScanner.scala    |   74 --
 .../scala/s2/helper/HashShardingJedis.scala     |  156 ---
 .../src/main/scala/s2/helper/Management.scala   |  146 ---
 .../src/main/scala/s2/helper/WithHBase.scala    |  101 --
 .../src/main/scala/s2/helper/WithRedis.scala    |   62 -
 .../main/scala/s2/models/CachedDBModel.scala    |   14 -
 .../src/main/scala/s2/models/Counter.scala      |  213 ---
 .../src/main/scala/s2/models/DBModel.scala      |   27 -
 .../main/scala/s2/util/CartesianProduct.scala   |   11 -
 .../main/scala/s2/util/CollectionCache.scala    |   69 -
 .../src/main/scala/s2/util/FunctionParser.scala |   24 -
 .../src/main/scala/s2/util/Hashes.scala         |   24 -
 .../src/main/scala/s2/util/ReduceMapValue.scala |   12 -
 .../src/main/scala/s2/util/Retry.scala          |   47 -
 .../src/main/scala/s2/util/SplitBytes.scala     |   24 -
 .../src/main/scala/s2/util/UnitConverter.scala  |   28 -
 .../counter/core/RankingCounterSpec.scala       |  166 +++
 .../counter/models/CounterModelSpec.scala       |   50 +
 .../s2graph/counter/models/CounterSpec.scala    |   33 +
 .../s2/counter/core/RankingCounterSpec.scala    |  169 ---
 .../test/scala/s2/models/CounterModelSpec.scala |   53 -
 .../src/test/scala/s2/models/CounterSpec.scala  |   36 -
 .../counter/loader/CounterBulkLoader.scala      |   78 ++
 .../counter/loader/EraseDailyCounter.scala      |  133 ++
 .../counter/loader/config/StreamingConfig.scala |   24 +
 .../loader/core/CounterEtlFunctions.scala       |   86 ++
 .../counter/loader/core/CounterEtlItem.scala    |   39 +
 .../counter/loader/core/CounterFunctions.scala  |  446 +++++++
 .../counter/loader/core/DimensionProps.scala    |  150 +++
 .../loader/models/DefaultCounterModel.scala     |    6 +
 .../counter/loader/stream/EtlStreaming.scala    |  114 ++
 .../loader/stream/ExactCounterStreaming.scala   |   69 +
 .../loader/stream/GraphToETLStreaming.scala     |   80 ++
 .../loader/stream/RankingCounterStreaming.scala |   72 ++
 .../main/scala/s2/config/StreamingConfig.scala  |   24 -
 .../scala/s2/counter/CounterBulkLoader.scala    |   78 --
 .../scala/s2/counter/EraseDailyCounter.scala    |  133 --
 .../s2/counter/core/CounterEtlFunctions.scala   |   89 --
 .../scala/s2/counter/core/CounterEtlItem.scala  |   43 -
 .../s2/counter/core/CounterFunctions.scala      |  446 -------
 .../scala/s2/counter/core/DimensionProps.scala  |  154 ---
 .../scala/s2/counter/stream/EtlStreaming.scala  |  116 --
 .../counter/stream/ExactCounterStreaming.scala  |   72 --
 .../s2/counter/stream/GraphToETLStreaming.scala |   83 --
 .../stream/RankingCounterStreaming.scala        |   74 --
 .../scala/s2/models/DefaultCounterModel.scala   |    8 -
 .../loader/core/CounterEtlFunctionsSpec.scala   |   33 +
 .../loader/core/DimensionPropsTest.scala        |   46 +
 .../stream/ExactCounterStreamingSpec.scala      |  198 +++
 .../stream/RankingCounterStreamingSpec.scala    |  451 +++++++
 .../counter/core/CounterEtlFunctionsSpec.scala  |   33 -
 .../s2/counter/core/DimensionPropsTest.scala    |   46 -
 .../stream/ExactCounterStreamingSpec.scala      |  196 ---
 .../stream/RankingCounterStreamingSpec.scala    |  448 -------
 .../src/main/resources/application.conf         |    0
 s2rest_netty/src/main/scala/Server.scala        |  202 ---
 .../org/apache/s2graph/rest/netty/Server.scala  |  200 +++
 s2rest_play/app/Bootstrap.scala                 |   81 --
 s2rest_play/app/actors/QueueActor.scala         |   92 --
 s2rest_play/app/config/Config.scala             |   41 -
 s2rest_play/app/config/CounterConfig.scala      |   10 -
 .../app/controllers/AdminController.scala       |  424 ------
 .../app/controllers/ApplicationController.scala |  101 --
 .../app/controllers/CounterController.scala     |  747 -----------
 .../app/controllers/EdgeController.scala        |  219 ----
 .../app/controllers/ExperimentController.scala  |   24 -
 .../app/controllers/JsonBodyParser.scala        |   86 --
 .../app/controllers/PublishController.scala     |   54 -
 .../app/controllers/QueryController.scala       |   52 -
 .../app/controllers/TestController.scala        |   24 -
 .../app/controllers/VertexController.scala      |   86 --
 s2rest_play/app/models/ExactCounterItem.scala   |   38 -
 s2rest_play/app/models/RankCounterItem.scala    |   40 -
 s2rest_play/app/models/package.scala            |    8 -
 .../apache/s2graph/rest/play/Bootstrap.scala    |   81 ++
 .../s2graph/rest/play/actors/QueueActor.scala   |   89 ++
 .../s2graph/rest/play/config/Config.scala       |   41 +
 .../rest/play/config/CounterConfig.scala        |   10 +
 .../rest/play/controllers/AdminController.scala |  424 ++++++
 .../controllers/ApplicationController.scala     |  101 ++
 .../play/controllers/CounterController.scala    |  744 +++++++++++
 .../rest/play/controllers/EdgeController.scala  |  219 ++++
 .../play/controllers/ExperimentController.scala |   23 +
 .../rest/play/controllers/JsonBodyParser.scala  |   86 ++
 .../play/controllers/PublishController.scala    |   54 +
 .../rest/play/controllers/QueryController.scala |   52 +
 .../play/controllers/VertexController.scala     |   85 ++
 .../rest/play/models/ExactCounterItem.scala     |   38 +
 .../rest/play/models/RankCounterItem.scala      |   40 +
 .../s2graph/rest/play/models/package.scala      |   10 +
 s2rest_play/app/util/TestDataLoader.scala       |   70 -
 s2rest_play/conf/reference.conf                 |    2 +-
 s2rest_play/conf/routes                         |  165 ++-
 .../test/benchmark/BenchmarkCommon.scala        |   24 -
 s2rest_play/test/benchmark/GraphUtilSpec.scala  |  125 --
 .../test/benchmark/JsonBenchmarkSpec.scala      |   46 -
 .../benchmark/OrderingUtilBenchmarkSpec.scala   |  100 --
 .../test/benchmark/SamplingBenchmarkSpec.scala  |   85 --
 .../test/controllers/PostProcessSpec.scala      |  112 --
 .../rest/play/benchmark/BenchmarkCommon.scala   |   24 +
 .../rest/play/benchmark/GraphUtilSpec.scala     |  124 ++
 .../rest/play/benchmark/JsonBenchmarkSpec.scala |   45 +
 .../benchmark/OrderingUtilBenchmarkSpec.scala   |   98 ++
 .../play/benchmark/SamplingBenchmarkSpec.scala  |   87 ++
 .../rest/play/controllers/PostProcessSpec.scala |  112 ++
 .../s2graph/spark/config/S2ConfigFactory.scala  |   20 +
 .../s2graph/spark/spark/HashMapParam.scala      |   55 +
 .../apache/s2graph/spark/spark/RDDUtil.scala    |   13 +
 .../apache/s2graph/spark/spark/SparkApp.scala   |  120 ++
 .../spark/spark/SubscriberListener.scala        |   20 +
 .../apache/s2graph/spark/spark/WithKafka.scala  |   69 +
 .../streaming/kafka/KafkaRDDFunctions.scala     |    3 -
 .../spark/streaming/kafka/StreamHelper.scala    |    4 -
 .../main/scala/s2/config/S2ConfigFactory.scala  |   27 -
 .../src/main/scala/s2/spark/HashMapParam.scala  |   55 -
 spark/src/main/scala/s2/spark/RDDUtil.scala     |   16 -
 spark/src/main/scala/s2/spark/SparkApp.scala    |  124 --
 .../scala/s2/spark/SubscriberListener.scala     |   24 -
 spark/src/main/scala/s2/spark/WithKafka.scala   |   69 -
 .../org/apache/s2graph/spark/SparkAppTest.scala |   17 +
 .../s2graph/spark/TestStreamingSpec.scala       |   31 +
 .../src/test/scala/s2/spark/SparkAppTest.scala  |   21 -
 .../test/scala/s2/spark/TestStreamingSpec.scala |   34 -
 377 files changed, 25150 insertions(+), 27480 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 4c3eadb..8b5fc63 100644
--- a/CHANGES
+++ b/CHANGES
@@ -69,6 +69,8 @@ Release 0.12.1 - unreleased
     S2GRAPH-5: Add Apache RAT to valid LICENSE errors. (Committed by DOYUNG 
YOON).
 
     S2GRAPH-17: Remove unnecessary abstraction layer, Storage. (Committed by 
DOYUNG YOON).
+    
+    S2GRAPH-57: Change package names into org.apahce.s2graph. (Committed by 
DOYUNG YOON).
 
   SUB TASKS
 

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/dev_support/docker-compose.yml
----------------------------------------------------------------------
diff --git a/dev_support/docker-compose.yml b/dev_support/docker-compose.yml
index 9818251..ba9d489 100644
--- a/dev_support/docker-compose.yml
+++ b/dev_support/docker-compose.yml
@@ -2,9 +2,6 @@ graph:
     image: s2rest_play:0.12.1-SNAPSHOT
     container_name: graph
     net: container:graph_hbase
-    links:
-        - graph_mysql
-        - graph_hbase
 
 graph_mysql:
     build: graph_mysql

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/main/scala/org/apache/s2graph/loader/spark/BulkLoadPartitioner.scala
----------------------------------------------------------------------
diff --git 
a/loader/src/main/scala/org/apache/s2graph/loader/spark/BulkLoadPartitioner.scala
 
b/loader/src/main/scala/org/apache/s2graph/loader/spark/BulkLoadPartitioner.scala
new file mode 100644
index 0000000..cf73736
--- /dev/null
+++ 
b/loader/src/main/scala/org/apache/s2graph/loader/spark/BulkLoadPartitioner.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s2graph.loader.spark
+
+import java.util
+import java.util.Comparator
+
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.spark.Partitioner
+
+/**
+ * A Partitioner implementation that will separate records to different
+ * HBase Regions based on region splits
+ *
+ * @param startKeys   The start keys for the given table
+ */
+class BulkLoadPartitioner(startKeys:Array[Array[Byte]])
+  extends Partitioner {
+
+  override def numPartitions: Int = startKeys.length
+
+  override def getPartition(key: Any): Int = {
+
+    val rowKey:Array[Byte] =
+      key match {
+        case qualifier: KeyFamilyQualifier =>
+          qualifier.rowKey
+        case _ =>
+          key.asInstanceOf[Array[Byte]]
+      }
+
+    val comparator: Comparator[Array[Byte]] = new Comparator[Array[Byte]] {
+      override def compare(o1: Array[Byte], o2: Array[Byte]): Int = {
+        Bytes.compareTo(o1, o2)
+      }
+    }
+    val partition = util.Arrays.binarySearch(startKeys, rowKey, comparator)
+    if (partition < 0) partition * -1 + -2
+    else partition
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/main/scala/org/apache/s2graph/loader/spark/FamilyHFileWriteOptions.scala
----------------------------------------------------------------------
diff --git 
a/loader/src/main/scala/org/apache/s2graph/loader/spark/FamilyHFileWriteOptions.scala
 
b/loader/src/main/scala/org/apache/s2graph/loader/spark/FamilyHFileWriteOptions.scala
new file mode 100644
index 0000000..a9886a2
--- /dev/null
+++ 
b/loader/src/main/scala/org/apache/s2graph/loader/spark/FamilyHFileWriteOptions.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s2graph.loader.spark
+
+import java.io.Serializable
+
+/**
+ * This object will hold optional data for how a given column family's
+ * writer will work
+ *
+ * @param compression       String to define the Compression to be used in the 
HFile
+ * @param bloomType         String to define the bloom type to be used in the 
HFile
+ * @param blockSize         The block size to be used in the HFile
+ * @param dataBlockEncoding String to define the data block encoding to be used
+ *                          in the HFile
+ */
+class FamilyHFileWriteOptions( val compression:String,
+                               val bloomType: String,
+                               val blockSize: Int,
+                               val dataBlockEncoding: String) extends 
Serializable

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseContext.scala
----------------------------------------------------------------------
diff --git 
a/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseContext.scala 
b/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseContext.scala
new file mode 100644
index 0000000..94e12af
--- /dev/null
+++ b/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseContext.scala
@@ -0,0 +1,849 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s2graph.loader.spark
+
+import java.net.InetSocketAddress
+import java.util
+
+import org.apache.hadoop.hbase.fs.HFileSystem
+import org.apache.hadoop.hbase._
+import org.apache.hadoop.hbase.io.compress.Compression
+import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
+import org.apache.hadoop.hbase.io.hfile.{CacheConfig, HFileContextBuilder}
+import org.apache.hadoop.hbase.regionserver.{HStore, StoreFile, BloomType}
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.rdd.RDD
+import org.apache.hadoop.conf.Configuration
+import HBaseRDDFunctions._
+import org.apache.hadoop.hbase.client._
+import scala.reflect.ClassTag
+import org.apache.spark.{Logging, SerializableWritable, SparkContext}
+import org.apache.hadoop.hbase.mapreduce.{TableMapReduceUtil,
+TableInputFormat, IdentityTableMapper}
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.mapreduce.Job
+import org.apache.spark.streaming.dstream.DStream
+import java.io._
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
+import org.apache.hadoop.fs.{Path, FileSystem}
+import scala.collection.mutable
+
+/**
+  * HBaseContext is a façade for HBase operations
+  * like bulk put, get, increment, delete, and scan
+  *
+  * HBaseContext will take the responsibilities
+  * of disseminating the configuration information
+  * to the working and managing the life cycle of HConnections.
+ */
+class HBaseContext(@transient sc: SparkContext,
+                   @transient config: Configuration,
+                   val tmpHdfsConfgFile: String = null)
+  extends Serializable with Logging {
+
+  @transient var credentials = SparkHadoopUtil.get.getCurrentUserCredentials()
+  @transient var tmpHdfsConfiguration:Configuration = config
+  @transient var appliedCredentials = false
+  @transient val job = Job.getInstance(config)
+  TableMapReduceUtil.initCredentials(job)
+  val broadcastedConf = sc.broadcast(new SerializableWritable(config))
+  val credentialsConf = sc.broadcast(new 
SerializableWritable(job.getCredentials))
+
+  LatestHBaseContextCache.latest = this
+
+  if (tmpHdfsConfgFile != null && config != null) {
+    val fs = FileSystem.newInstance(config)
+    val tmpPath = new Path(tmpHdfsConfgFile)
+    if (!fs.exists(tmpPath)) {
+      val outputStream = fs.create(tmpPath)
+      config.write(outputStream)
+      outputStream.close()
+    } else {
+      logWarning("tmpHdfsConfigDir " + tmpHdfsConfgFile + " exist!!")
+    }
+  }
+
+  /**
+   * A simple enrichment of the traditional Spark RDD foreachPartition.
+   * This function differs from the original in that it offers the
+   * developer access to a already connected HConnection object
+   *
+   * Note: Do not close the HConnection object.  All HConnection
+   * management is handled outside this method
+   *
+   * @param rdd  Original RDD with data to iterate over
+   * @param f    Function to be given a iterator to iterate through
+   *             the RDD values and a HConnection object to interact
+   *             with HBase
+   */
+  def foreachPartition[T](rdd: RDD[T],
+                          f: (Iterator[T], Connection) => Unit):Unit = {
+    rdd.foreachPartition(
+      it => hbaseForeachPartition(broadcastedConf, it, f))
+  }
+
+  /**
+   * A simple enrichment of the traditional Spark Streaming dStream foreach
+   * This function differs from the original in that it offers the
+   * developer access to a already connected HConnection object
+   *
+   * Note: Do not close the HConnection object.  All HConnection
+   * management is handled outside this method
+   *
+   * @param dstream  Original DStream with data to iterate over
+   * @param f        Function to be given a iterator to iterate through
+   *                 the DStream values and a HConnection object to
+   *                 interact with HBase
+   */
+  def foreachPartition[T](dstream: DStream[T],
+                    f: (Iterator[T], Connection) => Unit):Unit = {
+    dstream.foreachRDD((rdd, time) => {
+      foreachPartition(rdd, f)
+    })
+  }
+
+  /**
+   * A simple enrichment of the traditional Spark RDD mapPartition.
+   * This function differs from the original in that it offers the
+   * developer access to a already connected HConnection object
+   *
+   * Note: Do not close the HConnection object.  All HConnection
+   * management is handled outside this method
+   *
+   * @param rdd  Original RDD with data to iterate over
+   * @param mp   Function to be given a iterator to iterate through
+   *             the RDD values and a HConnection object to interact
+   *             with HBase
+   * @return     Returns a new RDD generated by the user definition
+   *             function just like normal mapPartition
+   */
+  def mapPartitions[T, R: ClassTag](rdd: RDD[T],
+                                   mp: (Iterator[T], Connection) => 
Iterator[R]): RDD[R] = {
+
+    rdd.mapPartitions[R](it => hbaseMapPartition[T, R](broadcastedConf,
+      it,
+      mp))
+
+  }
+
+  /**
+   * A simple enrichment of the traditional Spark Streaming DStream
+   * foreachPartition.
+   *
+   * This function differs from the original in that it offers the
+   * developer access to a already connected HConnection object
+   *
+   * Note: Do not close the HConnection object.  All HConnection
+   * management is handled outside this method
+   *
+   * Note: Make sure to partition correctly to avoid memory issue when
+   *       getting data from HBase
+   *
+   * @param dstream  Original DStream with data to iterate over
+   * @param f       Function to be given a iterator to iterate through
+   *                 the DStream values and a HConnection object to
+   *                 interact with HBase
+   * @return         Returns a new DStream generated by the user
+   *                 definition function just like normal mapPartition
+   */
+  def streamForeachPartition[T](dstream: DStream[T],
+                                f: (Iterator[T], Connection) => Unit): Unit = {
+
+    dstream.foreachRDD(rdd => this.foreachPartition(rdd, f))
+  }
+
+  /**
+   * A simple enrichment of the traditional Spark Streaming DStream
+   * mapPartition.
+   *
+   * This function differs from the original in that it offers the
+   * developer access to a already connected HConnection object
+   *
+   * Note: Do not close the HConnection object.  All HConnection
+   * management is handled outside this method
+   *
+   * Note: Make sure to partition correctly to avoid memory issue when
+   *       getting data from HBase
+   *
+   * @param dstream  Original DStream with data to iterate over
+   * @param f       Function to be given a iterator to iterate through
+   *                 the DStream values and a HConnection object to
+   *                 interact with HBase
+   * @return         Returns a new DStream generated by the user
+   *                 definition function just like normal mapPartition
+   */
+  def streamMapPartitions[T, U: ClassTag](dstream: DStream[T],
+                                f: (Iterator[T], Connection) => Iterator[U]):
+  DStream[U] = {
+    dstream.mapPartitions(it => hbaseMapPartition[T, U](
+      broadcastedConf,
+      it,
+      f))
+  }
+
+  /**
+   * A simple abstraction over the HBaseContext.foreachPartition method.
+   *
+   * It allow addition support for a user to take RDD
+   * and generate puts and send them to HBase.
+   * The complexity of managing the HConnection is
+   * removed from the developer
+   *
+   * @param rdd       Original RDD with data to iterate over
+   * @param tableName The name of the table to put into
+   * @param f         Function to convert a value in the RDD to a HBase Put
+   */
+  def bulkPut[T](rdd: RDD[T], tableName: TableName, f: (T) => Put) {
+
+    val tName = tableName.getName
+    rdd.foreachPartition(
+      it => hbaseForeachPartition[T](
+        broadcastedConf,
+        it,
+        (iterator, connection) => {
+          val m = connection.getBufferedMutator(TableName.valueOf(tName))
+          iterator.foreach(T => m.mutate(f(T)))
+          m.flush()
+          m.close()
+        }))
+  }
+
+  def applyCreds[T] (configBroadcast: 
Broadcast[SerializableWritable[Configuration]]){
+    credentials = SparkHadoopUtil.get.getCurrentUserCredentials()
+
+    logDebug("appliedCredentials:" + appliedCredentials + ",credentials:" + 
credentials)
+
+    if (!appliedCredentials && credentials != null) {
+      appliedCredentials = true
+
+      @transient val ugi = UserGroupInformation.getCurrentUser
+      ugi.addCredentials(credentials)
+      // specify that this is a proxy user
+      ugi.setAuthenticationMethod(AuthenticationMethod.PROXY)
+
+      ugi.addCredentials(credentialsConf.value.value)
+    }
+  }
+
+  /**
+   * A simple abstraction over the HBaseContext.streamMapPartition method.
+   *
+   * It allow addition support for a user to take a DStream and
+   * generate puts and send them to HBase.
+   *
+   * The complexity of managing the HConnection is
+   * removed from the developer
+   *
+   * @param dstream    Original DStream with data to iterate over
+   * @param tableName  The name of the table to put into
+   * @param f          Function to convert a value in
+   *                   the DStream to a HBase Put
+   */
+  def streamBulkPut[T](dstream: DStream[T],
+                       tableName: TableName,
+                       f: (T) => Put) = {
+    val tName = tableName.getName
+    dstream.foreachRDD((rdd, time) => {
+      bulkPut(rdd, TableName.valueOf(tName), f)
+    })
+  }
+
+  /**
+   * A simple abstraction over the HBaseContext.foreachPartition method.
+   *
+   * It allow addition support for a user to take a RDD and generate delete
+   * and send them to HBase.  The complexity of managing the HConnection is
+   * removed from the developer
+   *
+   * @param rdd       Original RDD with data to iterate over
+   * @param tableName The name of the table to delete from
+   * @param f         Function to convert a value in the RDD to a
+   *                  HBase Deletes
+   * @param batchSize       The number of delete to batch before sending to 
HBase
+   */
+  def bulkDelete[T](rdd: RDD[T], tableName: TableName,
+                    f: (T) => Delete, batchSize: Integer) {
+    bulkMutation(rdd, tableName, f, batchSize)
+  }
+
+  /**
+   * A simple abstraction over the HBaseContext.streamBulkMutation method.
+   *
+   * It allow addition support for a user to take a DStream and
+   * generate Delete and send them to HBase.
+   *
+   * The complexity of managing the HConnection is
+   * removed from the developer
+   *
+   * @param dstream    Original DStream with data to iterate over
+   * @param tableName  The name of the table to delete from
+   * @param f          function to convert a value in the DStream to a
+   *                   HBase Delete
+   * @param batchSize        The number of deletes to batch before sending to 
HBase
+   */
+  def streamBulkDelete[T](dstream: DStream[T],
+                          tableName: TableName,
+                          f: (T) => Delete,
+                          batchSize: Integer) = {
+    streamBulkMutation(dstream, tableName, f, batchSize)
+  }
+
+  /**
+   *  Under lining function to support all bulk mutations
+   *
+   *  May be opened up if requested
+   */
+  private def bulkMutation[T](rdd: RDD[T], tableName: TableName,
+                              f: (T) => Mutation, batchSize: Integer) {
+
+    val tName = tableName.getName
+    rdd.foreachPartition(
+      it => hbaseForeachPartition[T](
+        broadcastedConf,
+        it,
+        (iterator, connection) => {
+          val table = connection.getTable(TableName.valueOf(tName))
+          val mutationList = new java.util.ArrayList[Mutation]
+          iterator.foreach(T => {
+            mutationList.add(f(T))
+            if (mutationList.size >= batchSize) {
+              table.batch(mutationList, null)
+              mutationList.clear()
+            }
+          })
+          if (mutationList.size() > 0) {
+            table.batch(mutationList, null)
+            mutationList.clear()
+          }
+          table.close()
+        }))
+  }
+
+  /**
+   *  Under lining function to support all bulk streaming mutations
+   *
+   *  May be opened up if requested
+   */
+  private def streamBulkMutation[T](dstream: DStream[T],
+                                    tableName: TableName,
+                                    f: (T) => Mutation,
+                                    batchSize: Integer) = {
+    val tName = tableName.getName
+    dstream.foreachRDD((rdd, time) => {
+      bulkMutation(rdd, TableName.valueOf(tName), f, batchSize)
+    })
+  }
+
+  /**
+   * A simple abstraction over the HBaseContext.mapPartition method.
+   *
+   * It allow addition support for a user to take a RDD and generates a
+   * new RDD based on Gets and the results they bring back from HBase
+   *
+   * @param rdd     Original RDD with data to iterate over
+   * @param tableName        The name of the table to get from
+   * @param makeGet    function to convert a value in the RDD to a
+   *                   HBase Get
+   * @param convertResult This will convert the HBase Result object to
+   *                   what ever the user wants to put in the resulting
+   *                   RDD
+   * return            new RDD that is created by the Get to HBase
+   */
+  def bulkGet[T, U: ClassTag](tableName: TableName,
+                    batchSize: Integer,
+                    rdd: RDD[T],
+                    makeGet: (T) => Get,
+                    convertResult: (Result) => U): RDD[U] = {
+
+    val getMapPartition = new GetMapPartition(tableName,
+      batchSize,
+      makeGet,
+      convertResult)
+
+    rdd.mapPartitions[U](it =>
+      hbaseMapPartition[T, U](
+        broadcastedConf,
+        it,
+        getMapPartition.run))
+  }
+
+  /**
+   * A simple abstraction over the HBaseContext.streamMap method.
+   *
+   * It allow addition support for a user to take a DStream and
+   * generates a new DStream based on Gets and the results
+   * they bring back from HBase
+   *
+   * @param tableName     The name of the table to get from
+   * @param batchSize     The number of Gets to be sent in a single batch
+   * @param dStream       Original DStream with data to iterate over
+   * @param makeGet       Function to convert a value in the DStream to a
+   *                      HBase Get
+   * @param convertResult This will convert the HBase Result object to
+   *                      what ever the user wants to put in the resulting
+   *                      DStream
+   * @return              A new DStream that is created by the Get to HBase
+   */
+  def streamBulkGet[T, U: ClassTag](tableName: TableName,
+                                    batchSize: Integer,
+                                    dStream: DStream[T],
+                                    makeGet: (T) => Get,
+                                    convertResult: (Result) => U): DStream[U] 
= {
+
+    val getMapPartition = new GetMapPartition(tableName,
+      batchSize,
+      makeGet,
+      convertResult)
+
+    dStream.mapPartitions[U](it => hbaseMapPartition[T, U](
+      broadcastedConf,
+      it,
+      getMapPartition.run))
+  }
+
+  /**
+   * This function will use the native HBase TableInputFormat with the
+   * given scan object to generate a new RDD
+   *
+   *  @param tableName the name of the table to scan
+   *  @param scan      the HBase scan object to use to read data from HBase
+   *  @param f         function to convert a Result object from HBase into
+   *                   what the user wants in the final generated RDD
+   *  @return          new RDD with results from scan
+   */
+  def hbaseRDD[U: ClassTag](tableName: TableName, scan: Scan,
+                            f: ((ImmutableBytesWritable, Result)) => U): 
RDD[U] = {
+
+    val job: Job = Job.getInstance(getConf(broadcastedConf))
+
+    TableMapReduceUtil.initCredentials(job)
+    TableMapReduceUtil.initTableMapperJob(tableName, scan,
+      classOf[IdentityTableMapper], null, null, job)
+
+    sc.newAPIHadoopRDD(job.getConfiguration,
+      classOf[TableInputFormat],
+      classOf[ImmutableBytesWritable],
+      classOf[Result]).map(f)
+  }
+
+  /**
+   * A overloaded version of HBaseContext hbaseRDD that defines the
+   * type of the resulting RDD
+   *
+   *  @param tableName the name of the table to scan
+   *  @param scans     the HBase scan object to use to read data from HBase
+   *  @return          New RDD with results from scan
+   *
+   */
+  def hbaseRDD(tableName: TableName, scans: Scan):
+  RDD[(ImmutableBytesWritable, Result)] = {
+
+    hbaseRDD[(ImmutableBytesWritable, Result)](
+      tableName,
+      scans,
+      (r: (ImmutableBytesWritable, Result)) => r)
+  }
+
+  /**
+   *  underlining wrapper all foreach functions in HBaseContext
+   */
+  private def hbaseForeachPartition[T](configBroadcast:
+                                       
Broadcast[SerializableWritable[Configuration]],
+                                        it: Iterator[T],
+                                        f: (Iterator[T], Connection) => Unit) 
= {
+
+    val config = getConf(configBroadcast)
+
+    applyCreds(configBroadcast)
+    // specify that this is a proxy user
+    val connection = ConnectionFactory.createConnection(config)
+    f(it, connection)
+    connection.close()
+  }
+
+  private def getConf(configBroadcast: 
Broadcast[SerializableWritable[Configuration]]):
+  Configuration = {
+
+    if (tmpHdfsConfiguration == null && tmpHdfsConfgFile != null) {
+      val fs = FileSystem.newInstance(SparkHadoopUtil.get.conf)
+      val inputStream = fs.open(new Path(tmpHdfsConfgFile))
+      tmpHdfsConfiguration = new Configuration(false)
+      tmpHdfsConfiguration.readFields(inputStream)
+      inputStream.close()
+    }
+
+    if (tmpHdfsConfiguration == null) {
+      try {
+        tmpHdfsConfiguration = configBroadcast.value.value
+      } catch {
+        case ex: Exception => logError("Unable to getConfig from broadcast", 
ex)
+      }
+    }
+    tmpHdfsConfiguration
+  }
+
+  /**
+   *  underlining wrapper all mapPartition functions in HBaseContext
+   *
+   */
+  private def hbaseMapPartition[K, U](
+                                       configBroadcast:
+                                       
Broadcast[SerializableWritable[Configuration]],
+                                       it: Iterator[K],
+                                       mp: (Iterator[K], Connection) =>
+                                         Iterator[U]): Iterator[U] = {
+
+    val config = getConf(configBroadcast)
+    applyCreds(configBroadcast)
+
+    val connection = ConnectionFactory.createConnection(config)
+    val res = mp(it, connection)
+    connection.close()
+    res
+
+  }
+
+  /**
+   *  underlining wrapper all get mapPartition functions in HBaseContext
+   */
+  private class GetMapPartition[T, U](tableName: TableName,
+                                      batchSize: Integer,
+                                      makeGet: (T) => Get,
+                                      convertResult: (Result) => U)
+    extends Serializable {
+
+    val tName = tableName.getName
+
+    def run(iterator: Iterator[T], connection: Connection): Iterator[U] = {
+      val table = connection.getTable(TableName.valueOf(tName))
+
+      val gets = new java.util.ArrayList[Get]()
+      var res = List[U]()
+
+      while (iterator.hasNext) {
+        gets.add(makeGet(iterator.next()))
+
+        if (gets.size() == batchSize) {
+          val results = table.get(gets)
+          res = res ++ results.map(convertResult)
+          gets.clear()
+        }
+      }
+      if (gets.size() > 0) {
+        val results = table.get(gets)
+        res = res ++ results.map(convertResult)
+        gets.clear()
+      }
+      table.close()
+      res.iterator
+    }
+  }
+
+  /**
+   * Produces a ClassTag[T], which is actually just a casted ClassTag[AnyRef].
+   *
+   * This method is used to keep ClassTags out of the external Java API, as
+   * the Java compiler cannot produce them automatically. While this
+   * ClassTag-faking does please the compiler, it can cause problems at runtime
+   * if the Scala API relies on ClassTags for correctness.
+   *
+   * Often, though, a ClassTag[AnyRef] will not lead to incorrect behavior,
+   * just worse performance or security issues.
+   * For instance, an Array of AnyRef can hold any type T, but may lose 
primitive
+   * specialization.
+   */
+  private[spark]
+  def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
+
+  /**
+   * A Spark Implementation of HBase Bulk load
+   *
+   * This will take the content from an existing RDD then sort and shuffle
+   * it with respect to region splits.  The result of that sort and shuffle
+   * will be written to HFiles.
+   *
+   * After this function is executed the user will have to call
+   * LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase
+   *
+   * Also note this version of bulk load is different from past versions in
+   * that it includes the qualifier as part of the sort process. The
+   * reason for this is to be able to support rows will very large number
+   * of columns.
+   *
+   * @param rdd                            The RDD we are bulk loading from
+   * @param tableName                      The HBase table we are loading into
+   * @param flatMap                        A flapMap function that will make 
every
+   *                                       row in the RDD
+   *                                       into N cells for the bulk load
+   * @param stagingDir                     The location on the FileSystem to 
bulk load into
+   * @param familyHFileWriteOptionsMap     Options that will define how the 
HFile for a
+   *                                       column family is written
+   * @param compactionExclude              Compaction excluded for the HFiles
+   * @param maxSize                        Max size for the HFiles before they 
roll
+   * @tparam T                             The Type of values in the original 
RDD
+   */
+  def bulkLoad[T](rdd:RDD[T],
+                  tableName: TableName,
+                  flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])],
+                  stagingDir:String,
+                  familyHFileWriteOptionsMap:
+                  util.Map[Array[Byte], FamilyHFileWriteOptions] =
+                  new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
+                  compactionExclude: Boolean = false,
+                  maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):
+  Unit = {
+    val conn = ConnectionFactory.createConnection(config)
+    val regionLocator = conn.getRegionLocator(tableName)
+    val startKeys = regionLocator.getStartKeys
+    val defaultCompressionStr = config.get("hfile.compression",
+      Compression.Algorithm.NONE.getName)
+    val defaultCompression = 
Compression.getCompressionAlgorithmByName(defaultCompressionStr)
+//      HFileWriterImpl
+//      .compressionByName(defaultCompressionStr)
+    val now = System.currentTimeMillis()
+    val tableNameByteArray = tableName.getName
+
+    val familyHFileWriteOptionsMapInternal =
+      new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]
+
+    val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator()
+
+    while (entrySetIt.hasNext) {
+      val entry = entrySetIt.next()
+      familyHFileWriteOptionsMapInternal.put(new 
ByteArrayWrapper(entry.getKey), entry.getValue)
+    }
+
+    /**
+     *  This will return a new HFile writer when requested
+     *
+     * @param family       column family
+     * @param conf         configuration to connect to HBase
+     * @param favoredNodes nodes that we would like to write too
+     * @param fs           FileSystem object where we will be writing the 
HFiles to
+     * @return WriterLength object
+     */
+    def getNewWriter(family: Array[Byte], conf: Configuration,
+                     favoredNodes: Array[InetSocketAddress],
+                     fs:FileSystem,
+                     familydir:Path): WriterLength = {
+
+
+      var familyOptions = familyHFileWriteOptionsMapInternal.get(new 
ByteArrayWrapper(family))
+
+      if (familyOptions == null) {
+        familyOptions = new 
FamilyHFileWriteOptions(defaultCompression.toString,
+          BloomType.NONE.toString, HConstants.DEFAULT_BLOCKSIZE, 
DataBlockEncoding.NONE.toString)
+        familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(family), 
familyOptions)
+      }
+
+      val tempConf = new Configuration(conf)
+      tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f)
+      val contextBuilder = new HFileContextBuilder()
+        .withCompression(Algorithm.valueOf(familyOptions.compression))
+        .withChecksumType(HStore.getChecksumType(conf))
+        .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
+        .withBlockSize(familyOptions.blockSize)
+      contextBuilder.withDataBlockEncoding(DataBlockEncoding.
+        valueOf(familyOptions.dataBlockEncoding))
+      val hFileContext = contextBuilder.build()
+
+      if (null == favoredNodes) {
+        new WriterLength(0, new StoreFile.WriterBuilder(conf, new 
CacheConfig(tempConf), fs)
+          
.withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType))
+          
.withComparator(KeyValue.COMPARATOR).withFileContext(hFileContext).build())
+//          
.withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build())
+      } else {
+        new WriterLength(0,
+          new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new 
HFileSystem(fs))
+          
.withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType))
+            .withComparator(KeyValue.COMPARATOR).withFileContext(hFileContext)
+//          
.withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
+          .withFavoredNodes(favoredNodes).build())
+      }
+    }
+
+    val regionSplitPartitioner =
+      new BulkLoadPartitioner(startKeys)
+
+    //This is where all the magic happens
+    //Here we are going to do the following things
+    // 1. FlapMap every row in the RDD into key column value tuples
+    // 2. Then we are going to repartition sort and shuffle
+    // 3. Finally we are going to write out our HFiles
+    rdd.flatMap( r => flatMap(r)).
+      repartitionAndSortWithinPartitions(regionSplitPartitioner).
+      hbaseForeachPartition(this, (it, conn) => {
+
+      val conf = broadcastedConf.value.value
+      val fs = FileSystem.get(conf)
+      val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
+      var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
+      var rollOverRequested = false
+
+      /**
+       * This will roll all writers
+       */
+      def rollWriters(): Unit = {
+        writerMap.values.foreach( wl => {
+          if (wl.writer != null) {
+            logDebug("Writer=" + wl.writer.getPath +
+              (if (wl.written == 0) "" else ", wrote=" + wl.written))
+            close(wl.writer)
+          }
+        })
+        writerMap.clear()
+        rollOverRequested = false
+      }
+
+      /**
+       * This function will close a given HFile writer
+       * @param w The writer to close
+       */
+      def close(w:StoreFile.Writer): Unit = {
+        if (w != null) {
+          w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
+            Bytes.toBytes(System.currentTimeMillis()))
+          w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
+            Bytes.toBytes(regionSplitPartitioner.getPartition(previousRow)))
+          w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
+            Bytes.toBytes(true))
+          w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
+            Bytes.toBytes(compactionExclude))
+          w.appendTrackedTimestampsToMetadata()
+          w.close()
+        }
+      }
+
+      //Here is where we finally iterate through the data in this partition of 
the
+      //RDD that has been sorted and partitioned
+      it.foreach{ case (keyFamilyQualifier, cellValue:Array[Byte]) =>
+
+        //This will get a writer for the column family
+        //If there is no writer for a given column family then
+        //it will get created here.
+        val wl = writerMap.getOrElseUpdate(new 
ByteArrayWrapper(keyFamilyQualifier.family), {
+
+          val familyDir = new Path(stagingDir, 
Bytes.toString(keyFamilyQualifier.family))
+
+          fs.mkdirs(familyDir)
+
+          val loc:HRegionLocation = {
+            try {
+              val locator =
+                conn.getRegionLocator(TableName.valueOf(tableNameByteArray))
+              locator.getRegionLocation(keyFamilyQualifier.rowKey)
+            } catch {
+              case e: Throwable =>
+              logWarning("there's something wrong when locating rowkey: " +
+                Bytes.toString(keyFamilyQualifier.rowKey))
+                null
+            }
+          }
+          if (null == loc) {
+            if (log.isTraceEnabled) {
+              logTrace("failed to get region location, so use default writer: 
" +
+                Bytes.toString(keyFamilyQualifier.rowKey))
+            }
+            getNewWriter(family = keyFamilyQualifier.family, conf = conf, 
favoredNodes = null,
+              fs = fs, familydir = familyDir)
+          } else {
+            if (log.isDebugEnabled) {
+              logDebug("first rowkey: [" + 
Bytes.toString(keyFamilyQualifier.rowKey) + "]")
+            }
+            val initialIsa =
+              new InetSocketAddress(loc.getHostname, loc.getPort)
+            if (initialIsa.isUnresolved) {
+              if (log.isTraceEnabled) {
+                logTrace("failed to resolve bind address: " + loc.getHostname 
+ ":"
+                  + loc.getPort + ", so use default writer")
+              }
+              getNewWriter(keyFamilyQualifier.family, conf, null, fs, 
familyDir)
+            } else {
+              if(log.isDebugEnabled) {
+                logDebug("use favored nodes writer: " + 
initialIsa.getHostString)
+              }
+              getNewWriter(keyFamilyQualifier.family, conf,
+                Array[InetSocketAddress](initialIsa), fs, familyDir)
+            }
+          }
+        })
+
+        val keyValue =new KeyValue(keyFamilyQualifier.rowKey,
+          keyFamilyQualifier.family,
+          keyFamilyQualifier.qualifier,
+          now,cellValue)
+
+        wl.writer.append(keyValue)
+        wl.written += keyValue.getLength
+
+        rollOverRequested = rollOverRequested || wl.written > maxSize
+
+        //This will only roll if we have at least one column family file that 
is
+        //bigger then maxSize and we have finished a given row key
+        if (rollOverRequested && Bytes.compareTo(previousRow, 
keyFamilyQualifier.rowKey) != 0) {
+          rollWriters()
+        }
+
+        previousRow = keyFamilyQualifier.rowKey
+      }
+      //We have finished all the data so lets close up the writers
+      rollWriters()
+    })
+  }
+
+  /**
+   * This is a wrapper class around StoreFile.Writer.  The reason for the
+   * wrapper is to keep the length of the file along side the writer
+   *
+   * @param written The writer to be wrapped
+   * @param writer  The number of bytes written to the writer
+   */
+  class WriterLength(var written:Long, val writer:StoreFile.Writer)
+
+  /**
+   * This is a wrapper over a byte array so it can work as
+   * a key in a hashMap
+   *
+   * @param o1 The Byte Array value
+   */
+  class ByteArrayWrapper (val o1:Array[Byte])
+    extends Comparable[ByteArrayWrapper] with Serializable {
+    override def compareTo(o2: ByteArrayWrapper): Int = {
+      Bytes.compareTo(o1,o2.o1)
+    }
+    override def equals(o2: Any): Boolean = {
+      o2 match {
+        case wrapper: ByteArrayWrapper =>
+          Bytes.equals(o1, wrapper.o1)
+        case _ =>
+          false
+      }
+    }
+    override def hashCode():Int = {
+      Bytes.hashCode(o1)
+    }
+  }
+}
+
+object LatestHBaseContextCache {
+  var latest:HBaseContext = null
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseDStreamFunctions.scala
----------------------------------------------------------------------
diff --git 
a/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseDStreamFunctions.scala
 
b/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseDStreamFunctions.scala
new file mode 100644
index 0000000..80f289b
--- /dev/null
+++ 
b/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseDStreamFunctions.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s2graph.loader.spark
+
+import org.apache.hadoop.hbase.TableName
+import org.apache.hadoop.hbase.client._
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.spark.streaming.dstream.DStream
+
+import scala.reflect.ClassTag
+
+/**
+ * HBaseDStreamFunctions contains a set of implicit functions that can be
+ * applied to a Spark DStream so that we can easily interact with HBase
+ */
+object HBaseDStreamFunctions {
+
+  /**
+   * These are implicit methods for a DStream that contains any type of
+   * data.
+   *
+   * @param dStream  This is for dStreams of any type
+   * @tparam T       Type T
+   */
+  implicit class GenericHBaseDStreamFunctions[T](val dStream: DStream[T]) {
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's bulk
+     * put.  This will not return a new Stream.  Think of it like a foreach
+     *
+     * @param hc         The hbaseContext object to identify which
+     *                   HBase cluster connection to use
+     * @param tableName  The tableName that the put will be sent to
+     * @param f          The function that will turn the DStream values
+     *                   into HBase Put objects.
+     */
+    def hbaseBulkPut(hc: HBaseContext,
+                     tableName: TableName,
+                     f: (T) => Put): Unit = {
+      hc.streamBulkPut(dStream, tableName, f)
+    }
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's bulk
+     * get.  This will return a new DStream.  Think about it as a DStream map
+     * function.  In that every DStream value will get a new value out of
+     * HBase.  That new value will populate the newly generated DStream.
+     *
+     * @param hc             The hbaseContext object to identify which
+     *                       HBase cluster connection to use
+     * @param tableName      The tableName that the put will be sent to
+     * @param batchSize      How many gets to execute in a single batch
+     * @param f              The function that will turn the RDD values
+     *                       in HBase Get objects
+     * @param convertResult  The function that will convert a HBase
+     *                       Result object into a value that will go
+     *                       into the resulting DStream
+     * @tparam R             The type of Object that will be coming
+     *                       out of the resulting DStream
+     * @return               A resulting DStream with type R objects
+     */
+    def hbaseBulkGet[R: ClassTag](hc: HBaseContext,
+                     tableName: TableName,
+                     batchSize:Int, f: (T) => Get, convertResult: (Result) => 
R):
+    DStream[R] = {
+      hc.streamBulkGet[T, R](tableName, batchSize, dStream, f, convertResult)
+    }
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's bulk
+     * get.  This will return a new DStream.  Think about it as a DStream map
+     * function.  In that every DStream value will get a new value out of
+     * HBase.  That new value will populate the newly generated DStream.
+     *
+     * @param hc             The hbaseContext object to identify which
+     *                       HBase cluster connection to use
+     * @param tableName      The tableName that the put will be sent to
+     * @param batchSize      How many gets to execute in a single batch
+     * @param f              The function that will turn the RDD values
+     *                       in HBase Get objects
+     * @return               A resulting DStream with type R objects
+     */
+    def hbaseBulkGet(hc: HBaseContext,
+                     tableName: TableName, batchSize:Int,
+                     f: (T) => Get): DStream[(ImmutableBytesWritable, Result)] 
= {
+        hc.streamBulkGet[T, (ImmutableBytesWritable, Result)](
+          tableName, batchSize, dStream, f,
+          result => (new ImmutableBytesWritable(result.getRow), result))
+    }
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's bulk
+     * Delete.  This will not return a new DStream.
+     *
+     * @param hc         The hbaseContext object to identify which HBase
+     *                   cluster connection to use
+     * @param tableName  The tableName that the deletes will be sent to
+     * @param f          The function that will convert the DStream value into
+     *                   a HBase Delete Object
+     * @param batchSize  The number of Deletes to be sent in a single batch
+     */
+    def hbaseBulkDelete(hc: HBaseContext,
+                        tableName: TableName,
+                        f:(T) => Delete, batchSize:Int): Unit = {
+      hc.streamBulkDelete(dStream, tableName, f, batchSize)
+    }
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's
+     * foreachPartition method.  This will ack very much like a normal DStream
+     * foreach method but for the fact that you will now have a HBase 
connection
+     * while iterating through the values.
+     *
+     * @param hc  The hbaseContext object to identify which HBase
+     *            cluster connection to use
+     * @param f   This function will get an iterator for a Partition of an
+     *            DStream along with a connection object to HBase
+     */
+    def hbaseForeachPartition(hc: HBaseContext,
+                              f: (Iterator[T], Connection) => Unit): Unit = {
+      hc.streamForeachPartition(dStream, f)
+    }
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's
+     * mapPartitions method.  This will ask very much like a normal DStream
+     * map partitions method but for the fact that you will now have a
+     * HBase connection while iterating through the values
+     *
+     * @param hc  The hbaseContext object to identify which HBase
+     *            cluster connection to use
+     * @param f   This function will get an iterator for a Partition of an
+     *            DStream along with a connection object to HBase
+     * @tparam R  This is the type of objects that will go into the resulting
+     *            DStream
+     * @return    A resulting DStream of type R
+     */
+    def hbaseMapPartitions[R: ClassTag](hc: HBaseContext,
+                                        f: (Iterator[T], Connection) => 
Iterator[R]):
+    DStream[R] = {
+      hc.streamMapPartitions(dStream, f)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b8a15217/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseRDDFunctions.scala
----------------------------------------------------------------------
diff --git 
a/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseRDDFunctions.scala 
b/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseRDDFunctions.scala
new file mode 100644
index 0000000..b818a3c
--- /dev/null
+++ 
b/loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseRDDFunctions.scala
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s2graph.loader.spark
+
+import java.util
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hbase.{HConstants, TableName}
+import org.apache.hadoop.hbase.client._
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.spark.rdd.RDD
+
+import scala.collection.immutable.HashMap
+import scala.reflect.ClassTag
+
+/**
+ * HBaseRDDFunctions contains a set of implicit functions that can be
+ * applied to a Spark RDD so that we can easily interact with HBase
+ */
+object HBaseRDDFunctions
+{
+
+  /**
+   * These are implicit methods for a RDD that contains any type of
+   * data.
+   *
+   * @param rdd This is for rdd of any type
+   * @tparam T  This is any type
+   */
+  implicit class GenericHBaseRDDFunctions[T](val rdd: RDD[T]) {
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's bulk
+     * put.  This will not return a new RDD.  Think of it like a foreach
+     *
+     * @param hc         The hbaseContext object to identify which
+     *                   HBase cluster connection to use
+     * @param tableName  The tableName that the put will be sent to
+     * @param f          The function that will turn the RDD values
+     *                   into HBase Put objects.
+     */
+    def hbaseBulkPut(hc: HBaseContext,
+                     tableName: TableName,
+                     f: (T) => Put): Unit = {
+      hc.bulkPut(rdd, tableName, f)
+    }
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's bulk
+     * get.  This will return a new RDD.  Think about it as a RDD map
+     * function.  In that every RDD value will get a new value out of
+     * HBase.  That new value will populate the newly generated RDD.
+     *
+     * @param hc             The hbaseContext object to identify which
+     *                       HBase cluster connection to use
+     * @param tableName      The tableName that the put will be sent to
+     * @param batchSize      How many gets to execute in a single batch
+     * @param f              The function that will turn the RDD values
+     *                       in HBase Get objects
+     * @param convertResult  The function that will convert a HBase
+     *                       Result object into a value that will go
+     *                       into the resulting RDD
+     * @tparam R             The type of Object that will be coming
+     *                       out of the resulting RDD
+     * @return               A resulting RDD with type R objects
+     */
+    def hbaseBulkGet[R: ClassTag](hc: HBaseContext,
+                            tableName: TableName, batchSize:Int,
+                            f: (T) => Get, convertResult: (Result) => R): 
RDD[R] = {
+      hc.bulkGet[T, R](tableName, batchSize, rdd, f, convertResult)
+    }
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's bulk
+     * get.  This will return a new RDD.  Think about it as a RDD map
+     * function.  In that every RDD value will get a new value out of
+     * HBase.  That new value will populate the newly generated RDD.
+     *
+     * @param hc             The hbaseContext object to identify which
+     *                       HBase cluster connection to use
+     * @param tableName      The tableName that the put will be sent to
+     * @param batchSize      How many gets to execute in a single batch
+     * @param f              The function that will turn the RDD values
+     *                       in HBase Get objects
+     * @return               A resulting RDD with type R objects
+     */
+    def hbaseBulkGet(hc: HBaseContext,
+                                  tableName: TableName, batchSize:Int,
+                                  f: (T) => Get): RDD[(ImmutableBytesWritable, 
Result)] = {
+      hc.bulkGet[T, (ImmutableBytesWritable, Result)](tableName,
+        batchSize, rdd, f,
+        result => if (result != null && result.getRow != null) {
+          (new ImmutableBytesWritable(result.getRow), result)
+        } else {
+          null
+        })
+    }
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's bulk
+     * Delete.  This will not return a new RDD.
+     *
+     * @param hc         The hbaseContext object to identify which HBase
+     *                   cluster connection to use
+     * @param tableName  The tableName that the deletes will be sent to
+     * @param f          The function that will convert the RDD value into
+     *                   a HBase Delete Object
+     * @param batchSize  The number of Deletes to be sent in a single batch
+     */
+    def hbaseBulkDelete(hc: HBaseContext,
+                        tableName: TableName, f:(T) => Delete, batchSize:Int): 
Unit = {
+      hc.bulkDelete(rdd, tableName, f, batchSize)
+    }
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's
+     * foreachPartition method.  This will ack very much like a normal RDD
+     * foreach method but for the fact that you will now have a HBase 
connection
+     * while iterating through the values.
+     *
+     * @param hc  The hbaseContext object to identify which HBase
+     *            cluster connection to use
+     * @param f   This function will get an iterator for a Partition of an
+     *            RDD along with a connection object to HBase
+     */
+    def hbaseForeachPartition(hc: HBaseContext,
+                              f: (Iterator[T], Connection) => Unit): Unit = {
+      hc.foreachPartition(rdd, f)
+    }
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's
+     * mapPartitions method.  This will ask very much like a normal RDD
+     * map partitions method but for the fact that you will now have a
+     * HBase connection while iterating through the values
+     *
+     * @param hc  The hbaseContext object to identify which HBase
+     *            cluster connection to use
+     * @param f   This function will get an iterator for a Partition of an
+     *            RDD along with a connection object to HBase
+     * @tparam R  This is the type of objects that will go into the resulting
+     *            RDD
+     * @return    A resulting RDD of type R
+     */
+    def hbaseMapPartitions[R: ClassTag](hc: HBaseContext,
+                                        f: (Iterator[T], Connection) => 
Iterator[R]):
+    RDD[R] = {
+      hc.mapPartitions[T,R](rdd, f)
+    }
+
+    /**
+     * Implicit method that gives easy access to HBaseContext's
+     * bulkLoad method.
+     *
+     * A Spark Implementation of HBase Bulk load
+     *
+     * This will take the content from an existing RDD then sort and shuffle
+     * it with respect to region splits.  The result of that sort and shuffle
+     * will be written to HFiles.
+     *
+     * After this function is executed the user will have to call
+     * LoadIncrementalHFiles.doBulkLoad(...) to move the files into HBase
+     *
+     * Also note this version of bulk load is different from past versions in
+     * that it includes the qualifier as part of the sort process. The
+     * reason for this is to be able to support rows will very large number
+     * of columns.
+     *
+     * @param tableName                      The HBase table we are loading 
into
+     * @param flatMap                        A flapMap function that will make 
every row in the RDD
+     *                                       into N cells for the bulk load
+     * @param stagingDir                     The location on the FileSystem to 
bulk load into
+     * @param familyHFileWriteOptionsMap     Options that will define how the 
HFile for a
+     *                                       column family is written
+     * @param compactionExclude              Compaction excluded for the HFiles
+     * @param maxSize                        Max size for the HFiles before 
they roll
+     */
+    def hbaseBulkLoad(hc: HBaseContext,
+                         tableName: TableName,
+                         flatMap: (T) => Iterator[(KeyFamilyQualifier, 
Array[Byte])],
+                         stagingDir:String,
+                         familyHFileWriteOptionsMap:
+                         util.Map[Array[Byte], FamilyHFileWriteOptions] =
+                         new util.HashMap[Array[Byte], 
FamilyHFileWriteOptions](),
+                         compactionExclude: Boolean = false,
+                         maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):Unit 
= {
+      hc.bulkLoad(rdd, tableName,
+        flatMap, stagingDir, familyHFileWriteOptionsMap,
+        compactionExclude, maxSize)
+    }
+  }
+}

Reply via email to